本文为您展示DataHub的 GO SDK的发布/订阅操作。
发布数据
向某个topic下发布数据记录时,每条数据记录需要指定该topic下的一个shard, 因此一般需要通过 listShard 接口查看下当前topic下的shard列表。使用PutRecords接口时注意检查返回结果是否数据发布失败的情况。
服务器2.12版本及之后版本开始支持PutRecordsByShard接口,低版本请使用PutRecords接口。
参数说明
参数名  | 参数类型  | 参数说明  | 
projectName  | String  | 项目名称。  | 
topicName  | string  | Topic名称  | 
shardId  | string  | id of shard  | 
records  | Records list to written.  | 
返回示例
type PutRecordsResult struct {
    FailedRecordCount int            `json:"FailedRecordCount"`
    FailedRecords     []FailedRecord `json:"FailedRecords"`
}错误说明
错误类名  | 错误码  | 错误说明  | 
ResourceNotFoundError  | 
 
 
 
 
 
 
  | 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。  | 
AuthorizationFailedError  | 
  | Authorization 签名解析异常,检查AK是否填写正确。  | 
DatahubClientError  | -  | 其他所有,并且是所有异常的基类。  | 
InvalidParameterError  | 
 
  | 非法参数。  | 
代码示例
// put tuple data
func putTupleData() {
    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("get topic failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get topic successful")
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record1.ShardId = "0"
    record1.SetValueByName("field1", "TEST1")
    record1.SetValueByName("field2", 1)
    //you can add some attributes when put record
    record1.SetAttribute("attribute", "test attribute")
    records[0] = record1
    record2 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record2.ShardId = "1"
    record2.SetValueByName("field1", datahub.String("TEST2"))
    record2.SetValueByName("field2", datahub.Bigint(2))
    records[1] = record2
    record3 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record3.ShardId = "2"
    record3.SetValueByName("field1", datahub.String("TEST3"))
    record3.SetValueByName("field2", datahub.Bigint(3))
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        result, err := dh.PutRecords(projectName, topicName, records)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
        for _, v := range result.FailedRecords {
            fmt.Println(v)
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }
}
// put blob data
func putBlobData() {
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
    record1.ShardId = "0"
    records[0] = record1
    record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
    record2.ShardId = "1"
    record2.SetAttribute("attribute", "test attribute")
    records[1] = record2
    record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
    record3.ShardId = "2"
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        result, err := dh.PutRecords(projectName, blobTopicName, records)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
        for _, v := range result.FailedRecords {
            fmt.Println(v)
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }
}
// put data by shard
func putDataByShard() {
    shardId := "0"
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
    records[0] = record1
    record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
    record2.SetAttribute("attribute", "test attribute")
    records[1] = record2
    record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        if err := dh.PutRecordsByShard(projectName, blobTopicName, shardId, records); err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }else {
        fmt.Println("put record successful")
    }
}除了数据本身以外,在进行数据发布时,还可以添加和数据相关的额外信息,例如数据采集场景等。添加方式为下:
record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record1.SetAttribute("attribute","test attribute")
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.SetAttribute("attribute","test attribute")订阅数据
订阅一个topic下的数据,同样需要指定对应的shard,同时需要指定读取游标位置,通过 getCursor 接口获取。
参数说明
参数名  | 参数类型  | 参数说明  | 
projectName  | String  | 项目名称。  | 
topicName  | string  | Topic名称。  | 
shardId  | string  | id of shard  | 
ctype  | CursorType  | Which type used to get cursor  | 
param  | int64  | Parameter used to get cursor.when use SEQUENCE and SYSTEM_TIME need to be set.  | 
ctype可通过以下四种方式获得:
OLDEST:表示获取的cursor指向当前有效数据中时间最久远的record
LATEST:表示获取的cursor指向当前最新的record
SEQUENCE: 表示获取的cursor指向该序列的record
SYSTEM_TIME: 表示获取的cursor指向该时间之后接收到的第一条record
返回示例
type GetCursorResult struct {
    Cursor     string `json:"Cursor"`
    RecordTime int64  `json:"RecordTime"`
    Sequence   int64  `json:"Sequence"`
}错误说明
错误类名  | 错误码  | 错误说明  | 
ResourceNotFoundError  | 
 
 
 
 
 
 
  | 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。  | 
SeekOutOfRangeError  | 
  | getCursor时,给定的sequence不在有效范围内(通常数据已过期),或给定的timestamp大于当前时间。  | 
AuthorizationFailedError  | 
  | Authorization 签名解析异常,检查AK是否填写正确。  | 
DatahubClientError  | -  | 其他所有,并且是所有异常的基类  | 
InvalidParameterError  | 
 
  | 非法参数  | 
ShardSealedError  | 
代码示例
从指定shard读取数据,需要指定从哪个cursor开始读,并指定读取的上限数据条数,如果从cursor到shard结尾的数据条数少于Limit,则返回实际的数据条数的有数据。
func cursor(dh datahub.DataHub, projectName, topicName string) {
    shardId := "0"
    gr, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
    }else{
        fmt.Println(gr)
    }
    gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.LATEST)
    fmt.Println(err)
    fmt.Println(gr)
    var seq int64 = 10
    gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, seq)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
    }else{
        fmt.Println(gr)
    }
}Tuple topic data
参数说明
参数名  | 参数类型  | 参数说明  | 
projectName  | String  | 项目名称RecordSchema  | 
topicName  | string  | Topic名称。  | 
shardId  | string  | id of shard  | 
cursor  | string  | The start cursor used to read data.  | 
limit  | int  | Max record size to read.  | 
recordSchema  | RecordSchema  | RecordSchema for the topic.  | 
返回示例
type GetRecordsResult struct {
    NextCursor    string        `json:"NextCursor"`
    RecordCount   int           `json:"RecordCount"`
    StartSequence int64         `json:"StartSeq"`
    Records       []IRecord     `json:"Records"`
    RecordSchema  *RecordSchema `json:"-"`
}错误说明
错误类名  | 错误码  | 错误说明  | 
ResourceNotFoundError  | 
 
 
 
 
 
 
  | 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。  | 
AuthorizationFailedError  | 
  | Authorization 签名解析异常,检查AK是否填写正确。  | 
DatahubClientError  | -  | 其他所有,并且是所有异常的基类。  | 
InvalidParameterError  | 
 
  | 非法参数。  | 
代码示例
func getTupleData() {
    shardId := "1"
    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("get topic failed")
        return
    }
    fmt.Println("get topic successful")
    cursor, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get cursor successful")
    limitNum := 100
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor.Cursor, limitNum, topic.RecordSchema)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("get record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Println("get record successful")
        for _, record := range gr.Records {
            data, ok := record.(*datahub.TupleRecord)
            if !ok {
                fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
            } else {
                fmt.Println(data.Values)
            }
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("get records failed ")
    }
}Blob topic data
参数说明
参数名  | 参数类型  | 参数说明  | 
projectName  | String  | 项目名称。  | 
topicName  | string  | Topic名称。  | 
shardId  | string  | id of shard  | 
cursor  | string  | The start cursor used to read data.  | 
limit  | int  | Max record size to read.  | 
返回示例
type GetRecordsResult struct {
    NextCursor    string        `json:"NextCursor"`
    RecordCount   int           `json:"RecordCount"`
    StartSequence int64         `json:"StartSeq"`
    Records       []IRecord     `json:"Records"`
    RecordSchema  *RecordSchema `json:"-"`
}错误说明
错误类名  | 错误码  | 错误说明  | 
ResourceNotFoundError  | 
 
 
 
 
 
 
  | 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。  | 
AuthorizationFailedError  | 
  | Authorization 签名解析异常,检查AK是否填写正确。  | 
DatahubClientError  | -  | 其他所有,并且是所有异常的基类。  | 
InvalidParameterError  | 
 
  | 非法参数。  | 
代码示例
func getBlobData() {
    shardId := "1"
    cursor, err := dh.GetCursor(projectName, blobTopicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get cursor successful")
    limitNum := 100
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        gr, err := dh.GetBlobRecords(projectName, blobTopicName, shardId, cursor.Cursor, limitNum)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("get record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Println("get record successful")
        for _, record := range gr.Records {
            data, ok := record.(*datahub.BlobRecord)
            if !ok {
                fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
            } else {
                fmt.Println(data.StoreData)
            }
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("get records failed ")
    }
}