本文为您展示DataHub的 GO SDK的发布/订阅操作。
发布数据
向某个topic下发布数据记录时,每条数据记录需要指定该topic下的一个shard, 因此一般需要通过 listShard 接口查看下当前topic下的shard列表。使用PutRecords接口时注意检查返回结果是否数据发布失败的情况。
服务器2.12版本及之后版本开始支持PutRecordsByShard接口,低版本请使用PutRecords接口。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称 |
shardId | string | id of shard |
records | Records list to written. |
返回示例
type PutRecordsResult struct {
FailedRecordCount int `json:"FailedRecordCount"`
FailedRecords []FailedRecord `json:"FailedRecords"`
}
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类。 |
InvalidParameterError |
| 非法参数。 |
代码示例
// put tuple data
func putTupleData() {
topic, err := dh.GetTopic(projectName, topicName)
if err != nil {
fmt.Println("get topic failed")
fmt.Println(err)
return
}
fmt.Println("get topic successful")
records := make([]datahub.IRecord, 3)
record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record1.ShardId = "0"
record1.SetValueByName("field1", "TEST1")
record1.SetValueByName("field2", 1)
//you can add some attributes when put record
record1.SetAttribute("attribute", "test attribute")
records[0] = record1
record2 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record2.ShardId = "1"
record2.SetValueByName("field1", datahub.String("TEST2"))
record2.SetValueByName("field2", datahub.Bigint(2))
records[1] = record2
record3 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record3.ShardId = "2"
record3.SetValueByName("field1", datahub.String("TEST3"))
record3.SetValueByName("field2", datahub.Bigint(3))
records[2] = record3
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
result, err := dh.PutRecords(projectName, topicName, records)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("put record failed")
fmt.Println(err)
return
}
}
fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
for _, v := range result.FailedRecords {
fmt.Println(v)
}
break
}
if retryNum >= maxReTry {
fmt.Printf("put records failed ")
}
}
// put blob data
func putBlobData() {
records := make([]datahub.IRecord, 3)
record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
record1.ShardId = "0"
records[0] = record1
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.ShardId = "1"
record2.SetAttribute("attribute", "test attribute")
records[1] = record2
record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
record3.ShardId = "2"
records[2] = record3
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
result, err := dh.PutRecords(projectName, blobTopicName, records)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("put record failed")
fmt.Println(err)
return
}
}
fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
for _, v := range result.FailedRecords {
fmt.Println(v)
}
break
}
if retryNum >= maxReTry {
fmt.Printf("put records failed ")
}
}
// put data by shard
func putDataByShard() {
shardId := "0"
records := make([]datahub.IRecord, 3)
record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
records[0] = record1
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.SetAttribute("attribute", "test attribute")
records[1] = record2
record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
records[2] = record3
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
if err := dh.PutRecordsByShard(projectName, blobTopicName, shardId, records); err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("put record failed")
fmt.Println(err)
return
}
}
}
if retryNum >= maxReTry {
fmt.Printf("put records failed ")
}else {
fmt.Println("put record successful")
}
}
除了数据本身以外,在进行数据发布时,还可以添加和数据相关的额外信息,例如数据采集场景等。添加方式为下:
record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record1.SetAttribute("attribute","test attribute")
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.SetAttribute("attribute","test attribute")
订阅数据
订阅一个topic下的数据,同样需要指定对应的shard,同时需要指定读取游标位置,通过 getCursor 接口获取。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称。 |
shardId | string | id of shard |
ctype | CursorType | Which type used to get cursor |
param | int64 | Parameter used to get cursor.when use SEQUENCE and SYSTEM_TIME need to be set. |
ctype可通过以下四种方式获得:
OLDEST:表示获取的cursor指向当前有效数据中时间最久远的record
LATEST:表示获取的cursor指向当前最新的record
SEQUENCE: 表示获取的cursor指向该序列的record
SYSTEM_TIME: 表示获取的cursor指向该时间之后接收到的第一条record
返回示例
type GetCursorResult struct {
Cursor string `json:"Cursor"`
RecordTime int64 `json:"RecordTime"`
Sequence int64 `json:"Sequence"`
}
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
SeekOutOfRangeError |
| getCursor时,给定的sequence不在有效范围内(通常数据已过期),或给定的timestamp大于当前时间。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类 |
InvalidParameterError |
| 非法参数 |
ShardSealedError |
代码示例
从指定shard读取数据,需要指定从哪个cursor开始读,并指定读取的上限数据条数,如果从cursor到shard结尾的数据条数少于Limit,则返回实际的数据条数的有数据。
func cursor(dh datahub.DataHub, projectName, topicName string) {
shardId := "0"
gr, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
}else{
fmt.Println(gr)
}
gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.LATEST)
fmt.Println(err)
fmt.Println(gr)
var seq int64 = 10
gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, seq)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
}else{
fmt.Println(gr)
}
}
Tuple topic data
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称RecordSchema |
topicName | string | Topic名称。 |
shardId | string | id of shard |
cursor | string | The start cursor used to read data. |
limit | int | Max record size to read. |
recordSchema | RecordSchema | RecordSchema for the topic. |
返回示例
type GetRecordsResult struct {
NextCursor string `json:"NextCursor"`
RecordCount int `json:"RecordCount"`
StartSequence int64 `json:"StartSeq"`
Records []IRecord `json:"Records"`
RecordSchema *RecordSchema `json:"-"`
}
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类。 |
InvalidParameterError |
| 非法参数。 |
代码示例
func getTupleData() {
shardId := "1"
topic, err := dh.GetTopic(projectName, topicName)
if err != nil {
fmt.Println("get topic failed")
return
}
fmt.Println("get topic successful")
cursor, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
return
}
fmt.Println("get cursor successful")
limitNum := 100
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor.Cursor, limitNum, topic.RecordSchema)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("get record failed")
fmt.Println(err)
return
}
}
fmt.Println("get record successful")
for _, record := range gr.Records {
data, ok := record.(*datahub.TupleRecord)
if !ok {
fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
} else {
fmt.Println(data.Values)
}
}
break
}
if retryNum >= maxReTry {
fmt.Printf("get records failed ")
}
}
Blob topic data
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称。 |
shardId | string | id of shard |
cursor | string | The start cursor used to read data. |
limit | int | Max record size to read. |
返回示例
type GetRecordsResult struct {
NextCursor string `json:"NextCursor"`
RecordCount int `json:"RecordCount"`
StartSequence int64 `json:"StartSeq"`
Records []IRecord `json:"Records"`
RecordSchema *RecordSchema `json:"-"`
}
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类。 |
InvalidParameterError |
| 非法参数。 |
代码示例
func getBlobData() {
shardId := "1"
cursor, err := dh.GetCursor(projectName, blobTopicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
return
}
fmt.Println("get cursor successful")
limitNum := 100
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
gr, err := dh.GetBlobRecords(projectName, blobTopicName, shardId, cursor.Cursor, limitNum)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("get record failed")
fmt.Println(err)
return
}
}
fmt.Println("get record successful")
for _, record := range gr.Records {
data, ok := record.(*datahub.BlobRecord)
if !ok {
fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
} else {
fmt.Println(data.StoreData)
}
}
break
}
if retryNum >= maxReTry {
fmt.Printf("get records failed ")
}
}