全部产品
云市场

Go SDK介绍

更新时间:2019-09-20 19:50:16

快速上手

Datahub相关的基本概念

详情参见Datahub基本概念

准备工作

  • 访问DataHub服务需要使用阿里云认证账号,需要提供阿里云accessId及accessKey。 同时需要提供可访问的DataHub服务地址。
  • 获取Datahub Go SDK包
  1. go get -u -insecure github.com/aliyun/aliyun-datahub-sdk-go/datahub
  • Datahub GO SDK提供的所有API接口均由 datahub.DataHub 接口实现,所以第一步就是初始化一个DataHub对象。可以直接创建默认参数的Datahub对象:
  1. import "github.com/aliyun/aliyun-datahub-sdk-go/datahub"
  2. accessId := ""
  3. accessKey := ""
  4. endpoint := ""
  5. dh := datahub.New(accessId, accessKey, endpoint)
  • 也可以使用自定义参数进行配置,目前支持配置的参数有:

    参数| 参数类型 | 参数选项 | 参数含义|-|-|-|-|UserAgent | string | - | 用户名代理CompressorType | CompressorType | NOCOMPRESS、LZ4、DEFLATE、ZLIB |传输时支持的压缩格式,默认为NOCOMPRESS,不压缩EnableBinary | bool | true/false | 主要在put/get record时,使用protobuf协议。Datahub版本未支持protobuf时需要手动指定enable_pb为FalseHttpClient | *http.Client |- | 具体可参考net/http

  • 因为go中的bool默认为false,所以使用自定义参数,除非特别需要,建议指定EnableBinary:true

    1. accessId := ""
    2. accessKey := ""
    3. endpoint := ""
    4. config := &datahub.Config{
    5. UserAgent:"***",
    6. EnableBinary:true,
    7. CompressorType:datahub.LZ4,
    8. HttpClient:&http.Client{},
    9. }
    10. dh := datahub.NewClientWithConfig(accessId, accessKey, endpoint,config)

接口示例

project 操作

项目(Project)是DataHub数据的基本组织单元,下面包含多个Topic。值得注意的是,DataHub的项目空间与MaxCompute的项目空间是相互独立的。用户在MaxCompute中创建的项目不能复用于DataHub,需要独立创建。

创建Project

CreateProject(projectName, comment string) error

  • 参数

    • projectName: project name
    • comment: project comment
  • return

  • error

    • ResourceExistError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func createProjet(dh datahub.DataHub, projectName string) {
  2. if err := dh.CreateProject(projectName, "project comment"); err != nil {
  3. fmt.Println("create project failed")
  4. fmt.Println(err)
  5. return
  6. }
  7. fmt.Println("create successful")
  8. }

删除Project

DeleteProject接口删除project。

DeleteProject(projectName string) error

  • 参数

    • projectName: project name
  • return

  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func deleteProject(dh datahub.DataHub, projectName string) {
  2. if err := dh.DeleteProject("123"); err != nil {
  3. fmt.Println("delete project failed")
  4. fmt.Println(err)
  5. return
  6. }
  7. fmt.Println("delete project successful")
  8. }

列出Project

ListProject 接口列出project。

ListProject() (*ListProjectResult, error)

  • 参数

  • return

  1. type ListProjectResult struct {
  2. ProjectNames []string `json:"ProjectNames"`
  3. }
  • error

    • AuthorizationFailedError
    • DatahubClientError
  • 示例

  1. func listProject(dh datahub.DataHub, projectName string) {
  2. lp, err := dh.ListProject()
  3. if err != nil {
  4. fmt.Println("get project list failed")
  5. fmt.Println(err)
  6. return
  7. }
  8. fmt.Println("get project list successful")
  9. for _, projectName := range lp.ProjectNames {
  10. fmt.Println(projectName)
  11. }
  12. }

查询Project

GetProject查询project

GetProject(projectName string) (*GetProjectResult, error)

  • 参数
    • projectName: project name
  • return
  1. type GetProjectResult struct {
  2. CreateTime int64 `json:"CreateTime"`
  3. LastModifyTime int64 `json:"LastModifyTime"`
  4. Comment string `json"Comment"`
  5. }
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func getProject(dh datahub.DataHub, projectName string) {
  2. gp, err := dh.GetProject(projectName)
  3. if err != nil {
  4. fmt.Println("get project message failed")
  5. fmt.Println(err)
  6. return
  7. }
  8. fmt.Println("get project message successful")
  9. fmt.Println(*gp)
  10. }

更新project

UpdateProject(projectName, comment string) error

  • 参数
    • projectName: project name
    • comment: project comment
  • return
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func updateProject(dh datahub.DataHub, projectName string) {
  2. if err := dh.UpdateProject(projectName, "new project comment"); err != nil {
  3. fmt.Println("update project comment failed")
  4. fmt.Println(err)
  5. return
  6. }
  7. fmt.Println("update project comment successful")
  8. }

topic操作

Topic是 DataHub 订阅和发布的最小单位,用户可以用Topic来表示一类或者一种流数据。目前支持Tuple与Blob两种类型。Tuple类型的Topic支持类似于数据库的记录的数据,每条记录包含多个列。Blob类型的Topic仅支持写入一块二进制数据。

创建Topic

Tuple Topic

CreateTupleTopic(projectName, topicName, comment string, shardCount, lifeCycle int, recordSchema *RecordSchema) error

Tuple类型Topic写入的数据是有格式的,需要指定Record Schema,目前支持以下几种数据类型:

类型 含义 值域
BIGINT 8字节有符号整型 -9223372036854775807 ~ 9223372036854775807
DOUBLE 8字节双精度浮点数 -1.0 10^308 ~ 1.0 10^308
BOOLEAN 布尔类型 True/False或true/false或0/1
TIMESTAMP 时间戳类型 表示到微秒的时间戳类型
STRING 字符串,只支持UTF-8编码 单个STRING列最长允许1MB
  • 参数
    • projectName: project name
    • topicName: topic name
    • comment: topic comment
    • lifeCycle: The expire time of the data (Unit: DAY). The data written before that time is not accessible.
    • recordSchema: The records schema of this topic.
  • return
  • error

    • ResourceExistError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func Example_CreateTupleTopic(dh datahub.DataHub, projectName, topicName string) {
  2. recordSchema := datahub.NewRecordSchema()
  3. recordSchema.AddField(datahub.Field{Name: "bigint_field", Type: datahub.BIGINT, AllowNull: true}).
  4. AddField(datahub.Field{Name: "timestamp_field", Type: datahub.TIMESTAMP, AllowNull: false}).
  5. AddField(datahub.Field{Name: "string_field", Type: datahub.STRING}).
  6. AddField(datahub.Field{Name: "double_field", Type: datahub.DOUBLE}).
  7. AddField(datahub.Field{Name: "boolean_field", Type: datahub.BOOLEAN})
  8. if err := dh.CreateTupleTopic(projectName, topicName, "topic comment", 5, 7, recordSchema); err != nil {
  9. fmt.Println("create topic failed")
  10. fmt.Println(err)
  11. return
  12. }
  13. fmt.Println("create topic successful")
  14. }
Blob Topic

CreateBlobTopic(projectName, topicName, comment string, shardCount, lifeCycle int) error

  • 参数
    • projectName: project name
    • topicName: topic name
    • comment: topic comment
    • lifeCycle: The expire time of the data (Unit: DAY). The data written before that time is not accessible.
  • return
  • error

    • ResourceExistError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func Example_CreateBlobTopic(dh datahub.DataHub, projectName, topicName string) {
  2. if err := dh.CreateBlobTopic(projectName, topicName, "topic comment", 5, 7); err != nil {
  3. fmt.Println("create topic failed")
  4. fmt.Println(err)
  5. return
  6. }
  7. fmt.Println("create topic successful")
  8. }

删除Topic

DeleteTopic(projectName, topicName string) error

  • 参数

    • projectName: project name
    • topicName: topic name
  • return

  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func ExampleDataHub_DeleteTopic(dh datahub.DataHub, projectName, topicName string) {
  2. if err := dh.DeleteTopic(projectName, topicName); err != nil {
  3. fmt.Println("delete failed")
  4. fmt.Println(err)
  5. return
  6. }
  7. fmt.Println("delete successful")
  8. }

列出Topic

ListTopic(projectName string) (*ListTopicResult, error)

  • 参数

    • projectName: project name
  • return

  1. type ListTopicResult struct {
  2. TopicNames [] string `json:"TopicNames"`
  3. }
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func ExampleDataHub_ListTopic(dh datahub.DataHub, projectName, topicName string) {
  2. lt, err := dh.ListTopic(projectName)
  3. if err != nil {
  4. fmt.Println("get topic list failed")
  5. fmt.Println(err)
  6. return
  7. }
  8. fmt.Println("get topic list successful")
  9. fmt.Println(lt)
  10. }

更新Topic

UpdateTopic(projectName, topicName, comment string) error

  • 参数
    • projectName: project name
    • topicName: topic name
    • comment: topic comment
  • return
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func ExampleDataHub_UpdateTopic(dh datahub.DataHub, projectName, topicName string) {
  2. if err := dh.UpdateTopic(projectName, topicName, "new topic comment"); err != nil {
  3. fmt.Println("update topic comment failed")
  4. fmt.Println(err)
  5. return
  6. }
  7. fmt.Println("update topic comment successful")
  8. }

schema类型

schema是用来标明数据存储的名称和对应类型的,在创建tuple topic 和 读写 record 的时候用到。因为网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。schema就是一个Field对象的slice,Field包含三个参数,第一个参数是field的名称,第二个是field的类型,第三个参数是bool值,True表示field的值允许为空, False表示field的值不能为空。

获取schema

对于已创建的Tuple topic,可以使用get_topic接口来获取schema信息

  • 示例
  1. func getSchema(dh datahub.DataHub, projectName, topicName string) {
  2. gt, err := dh.GetTopic(projectName, "topic_test")
  3. if err != nil {
  4. fmt.Println("get topic failed")
  5. fmt.Println(err)
  6. return
  7. } else {
  8. schema := gt.RecordSchema
  9. fmt.Println(schema)
  10. }
  11. }

定义schema

要创建新的tuple topic,需要自己定义schema,schema可以通过以下方式进行初始化。

  • 直接创建
  1. func createSchema1(dh datahub.DataHub, projectName, topicName string) {
  2. fields := []datahub.Field{
  3. {"field1", datahub.STRING, true},
  4. {"field2", datahub.BIGINT, false},
  5. }
  6. schema := datahub.RecordSchema{
  7. fields,
  8. }
  9. fmt.Println(schema)
  10. }
  • 逐个对schema进行set
  1. func createSchema2(dh datahub.DataHub, projectName, topicName string) {
  2. recordSchema := datahub.NewRecordSchema()
  3. recordSchema.AddField(datahub.Field{Name: "bigint_field", Type: datahub.BIGINT, AllowNull: true}).
  4. AddField(datahub.Field{Name: "timestamp_field", Type: datahub.TIMESTAMP, AllowNull: false}).
  5. AddField(datahub.Field{Name: "string_field", Type: datahub.STRING}).
  6. AddField(datahub.Field{Name: "double_field", Type: datahub.DOUBLE}).
  7. AddField(datahub.Field{Name: "boolean_field", Type: datahub.BOOLEAN})
  8. }
  • 通过json字符串定义schema
  1. func createSchema3(dh datahub.DataHub, projectName, topicName string) {
  2. str := ""
  3. schema, err := datahub.NewRecordSchemaFromJson(str)
  4. if err != nil {
  5. fmt.Println("create recordSchema failed")
  6. fmt.Println(err)
  7. return
  8. }
  9. fmt.Println("create recordSchema successful")
  10. fmt.Println(schema)
  11. }

json字符串的格式如下:

“{“fields”:[{“type”:”BIGINT”,”name”:”a”},{“type”:”STRING”,”name”:”b”}]}”

shard 操作

Shard表示对一个Topic进行数据传输的并发通道,每个Shard会有对应的ID。每个Shard会有多种状态: Opening - 启动中,Active - 启动完成可服务。每个Shard启用以后会占用一定的服务端资源,建议按需申请Shard数量。shard可以进行合并和分裂,当数据量增大时,可以采用分裂shard来增加数据通道,提高数据写入的并发量,当数据量减小时,应该合并shard减少服务器资源浪费。例如淘宝在双11期间,数据量骤增,这个时候每个shard的写入压力过大,便可以增加shard提高写入效率,在双11过后,数据量明显降低,则需要合并shard。

列出shard

ListShard(projectName, topicName string) (*ListShardResult, error)

  • 参数
    • projectName: project name
    • topicName: topic name
  • return
  1. type SplitShardResult struct {
  2. NewShards []ShardEntry `json:"NewShards"`
  3. }
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func ExampleDataHub_ListShard() {
  2. ls, err := dh.ListShard(projectName, topicName)
  3. if err != nil {
  4. fmt.Println("get shard list failed")
  5. fmt.Println(err)
  6. return
  7. }
  8. fmt.Println("get shard list successful")
  9. for _, shard := range ls.Shards {
  10. fmt.Println(shard)
  11. }
  12. }

分裂shard

只有处于ACTIVE状态的shard才可以进行分裂,分裂成功后,会生成两个新的shard,同时原shard状态会变为CLOSED。分裂shard时,需要指定splitKey,可以采用系调用第一个method,系统将会自动生成spiltKey,如果有特殊需求,则可以采用第二个method自己指定spiltKey。spiltKey规则可以参考基本概念中的Shard Hash Key Range

SplitShard(projectName, topicName, shardId string) (*SplitShardResult, error)

SplitShardWithSplitKey(projectName, topicName, shardId, splitKey string) (*SplitShardResult, error)

  • 参数

    • projectName: project name
    • topicName: topic name
    • shardId: The shard which to split
    • splitKey: The split key which is used to split shard
  • return

  1. type SplitShardResult struct {
  2. NewShards []ShardEntry `json:"NewShards"`
  3. }
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func ExampleDataHub_SplitShard() {
  2. // the shardId of you want to split
  3. shardId := "0"
  4. ss, err := dh.SplitShard(projectName, topicName, shardId)
  5. if err != nil {
  6. fmt.Println("split shard failed")
  7. fmt.Println(err)
  8. return
  9. }
  10. fmt.Println("split shard successful")
  11. fmt.Println(ss)
  12. // After splitting, you need to wait for all shard states to be ready
  13. // before you can perform related operations.
  14. dh.WaitAllShardsReady(projectName, topicName)
  15. }

合并shard

合并两个shard时,要求两个shard必须是相邻的,并且状态都是ACTIVE。

MergeShard(projectName, topicName, shardId, adjacentShardId string) (*MergeShardResult, error)

  • 参数

    • projectName: project name
    • topicName: topic name
    • shardId: The shard which will be merged
    • adjacentShardId: The adjacent shard of the specified shard.
  • 示例

  • return
  1. type MergeShardResult struct {
  2. ShardId string `json:"ShardId"`
  3. BeginHashKey string `json:"BeginHashKey"`
  4. EndHashKey string `json:"EndHashKey"`
  5. }
  • error
    • ResourceNotFoundError
    • InvalidOperationError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
    • ShardSealedError
  1. func ExampleDataHub_MergeShard() {
  2. shardId := "3"
  3. adjacentShardId := "4"
  4. ms, err := dh.MergeShard(projectName, topicName, shardId, adjacentShardId)
  5. if err != nil {
  6. fmt.Println("merge shard failed")
  7. fmt.Println(err)
  8. return
  9. }
  10. fmt.Println("merge shard successful")
  11. fmt.Println(ms)
  12. // After splitting, you need to wait for all shard states to be ready
  13. // before you can perform related operations.
  14. dh.WaitAllShardsReady(projectName, topicName)
  15. }

数据发布/订阅

处于ACTIVE和CLOSED状态的shard都可以进行数据订阅,但是只有处于ACTIVE状态的shard可以进行数据发布,向CLOSED状态的shard发布数据会直接返回ShardSealedError错误,处于CLOSED状态的shard读取数据到末尾时也会返回ShardSealedError错误,表示不会有新的数据。

发布数据

向某个topic下发布数据记录时,每条数据记录需要指定该topic下的一个shard, 因此一般需要通过 listShard 接口查看下当前topic下的shard列表。使用PutRecords接口时注意检查返回结果是否数据发布失败的情况。

PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error)

PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) error

服务器2.12版本及之后版本开始支持PutRecordsByShard接口,低版本请使用PutRecords接口。

  • 参数

    • projectName: project name
    • topicName: topic name
    • shardId : id of shard
    • records: Records list to written.
  • return

  1. type PutRecordsResult struct {
  2. FailedRecordCount int `json:"FailedRecordCount"`
  3. FailedRecords []FailedRecord `json:"FailedRecords"`
  4. }
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. // put tuple data
  2. func putTupleData() {
  3. topic, err := dh.GetTopic(projectName, topicName)
  4. if err != nil {
  5. fmt.Println("get topic failed")
  6. fmt.Println(err)
  7. return
  8. }
  9. fmt.Println("get topic successful")
  10. records := make([]datahub.IRecord, 3)
  11. record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
  12. record1.ShardId = "0"
  13. record1.SetValueByName("field1", "TEST1")
  14. record1.SetValueByName("field2", 1)
  15. //you can add some attributes when put record
  16. record1.SetAttribute("attribute", "test attribute")
  17. records[0] = record1
  18. record2 := datahub.NewTupleRecord(topic.RecordSchema, 0)
  19. record2.ShardId = "1"
  20. record2.SetValueByName("field1", datahub.String("TEST2"))
  21. record2.SetValueByName("field2", datahub.Bigint(2))
  22. records[1] = record2
  23. record3 := datahub.NewTupleRecord(topic.RecordSchema, 0)
  24. record3.ShardId = "2"
  25. record3.SetValueByName("field1", datahub.String("TEST3"))
  26. record3.SetValueByName("field2", datahub.Bigint(3))
  27. records[2] = record3
  28. maxReTry := 3
  29. retryNum := 0
  30. for retryNum < maxReTry {
  31. result, err := dh.PutRecords(projectName, topicName, records)
  32. if err != nil {
  33. if _, ok := err.(*datahub.LimitExceededError); ok {
  34. fmt.Println("maybe qps exceed limit,retry")
  35. retryNum++
  36. time.Sleep(5 * time.Second)
  37. continue
  38. } else {
  39. fmt.Println("put record failed")
  40. fmt.Println(err)
  41. return
  42. }
  43. }
  44. fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
  45. for _, v := range result.FailedRecords {
  46. fmt.Println(v)
  47. }
  48. break
  49. }
  50. if retryNum >= maxReTry {
  51. fmt.Printf("put records failed ")
  52. }
  53. }
  54. // put blob data
  55. func putBlobData() {
  56. records := make([]datahub.IRecord, 3)
  57. record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
  58. record1.ShardId = "0"
  59. records[0] = record1
  60. record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
  61. record2.ShardId = "1"
  62. record2.SetAttribute("attribute", "test attribute")
  63. records[1] = record2
  64. record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
  65. record3.ShardId = "2"
  66. records[2] = record3
  67. maxReTry := 3
  68. retryNum := 0
  69. for retryNum < maxReTry {
  70. result, err := dh.PutRecords(projectName, blobTopicName, records)
  71. if err != nil {
  72. if _, ok := err.(*datahub.LimitExceededError); ok {
  73. fmt.Println("maybe qps exceed limit,retry")
  74. retryNum++
  75. time.Sleep(5 * time.Second)
  76. continue
  77. } else {
  78. fmt.Println("put record failed")
  79. fmt.Println(err)
  80. return
  81. }
  82. }
  83. fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
  84. for _, v := range result.FailedRecords {
  85. fmt.Println(v)
  86. }
  87. break
  88. }
  89. if retryNum >= maxReTry {
  90. fmt.Printf("put records failed ")
  91. }
  92. }
  93. // put data by shard
  94. func putDataByShard() {
  95. shardId := "0"
  96. records := make([]datahub.IRecord, 3)
  97. record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
  98. records[0] = record1
  99. record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
  100. record2.SetAttribute("attribute", "test attribute")
  101. records[1] = record2
  102. record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
  103. records[2] = record3
  104. maxReTry := 3
  105. retryNum := 0
  106. for retryNum < maxReTry {
  107. if err := dh.PutRecordsByShard(projectName, blobTopicName, shardId, records); err != nil {
  108. if _, ok := err.(*datahub.LimitExceededError); ok {
  109. fmt.Println("maybe qps exceed limit,retry")
  110. retryNum++
  111. time.Sleep(5 * time.Second)
  112. continue
  113. } else {
  114. fmt.Println("put record failed")
  115. fmt.Println(err)
  116. return
  117. }
  118. }
  119. }
  120. if retryNum >= maxReTry {
  121. fmt.Printf("put records failed ")
  122. }else {
  123. fmt.Println("put record successful")
  124. }
  125. }

除了数据本身以外,在进行数据发布时,还可以添加和数据相关的额外信息,例如数据采集场景等。添加方式为

  1. record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
  2. record1.SetAttribute("attribute","test attribute")
  3. record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
  4. record2.SetAttribute("attribute","test attribute")

订阅数据

订阅一个topic下的数据,同样需要指定对应的shard,同时需要指定读取游标位置,通过 getCursor 接口获取。

GetCursor(projectName, topicName, shardId string, ctype CursorType, param …int64) (*GetCursorResult, error)

  • 参数
    • projectName: project name
    • topicName: topic name
    • shardId: The id of the shard.
    • ctype: Which type used to get cursor.可以通过四种方式获取:OLDEST, LATEST, SEQUENCE, SYSTEM_TIME。
      • OLDEST: 表示获取的cursor指向当前有效数据中时间最久远的record
      • LATEST: 表示获取的cursor指向当前最新的record
      • SEQUENCE: 表示获取的cursor指向该序列的record
      • SYSTEM_TIME: 表示获取的cursor指向该时间之后接收到的第一条record
    • param: Parameter used to get cursor.when use SEQUENCE and SYSTEM_TIME need to be set.
  • return
  1. type GetCursorResult struct {
  2. Cursor string `json:"Cursor"`
  3. RecordTime int64 `json:"RecordTime"`
  4. Sequence int64 `json:"Sequence"`
  5. }
  • error

    • ResourceNotFoundError
    • SeekOutOfRangeError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
    • ShardSealedError
  • 示例

  1. func cursor(dh datahub.DataHub, projectName, topicName string) {
  2. shardId := "0"
  3. gr, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
  4. if err != nil {
  5. fmt.Println("get cursor failed")
  6. fmt.Println(err)
  7. }else{
  8. fmt.Println(gr)
  9. }
  10. gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.LATEST)
  11. fmt.Println(err)
  12. fmt.Println(gr)
  13. var seq int64 = 10
  14. gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, seq)
  15. if err != nil {
  16. fmt.Println("get cursor failed")
  17. fmt.Println(err)
  18. }else{
  19. fmt.Println(gr)
  20. }
  21. }

从指定shard读取数据,需要指定从哪个cursor开始读,并指定读取的上限数据条数,如果从cursor到shard结尾少于Limit条数的数据,则返回实际的条数的数据。

Tuple topic data

GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema RecordSchema) (GetRecordsResult, error)

  • 参数

    • projectName: project name
    • topicName: topic name
    • shardId: The id of the shard.
    • cursor: The start cursor used to read data.
    • limit:Max record size to read.
    • recordSchema: RecordSchema for the topic.
  • return

  1. type GetRecordsResult struct {
  2. NextCursor string `json:"NextCursor"`
  3. RecordCount int `json:"RecordCount"`
  4. StartSequence int64 `json:"StartSeq"`
  5. Records []IRecord `json:"Records"`
  6. RecordSchema *RecordSchema `json:"-"`
  7. }
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func getTupleData() {
  2. shardId := "1"
  3. topic, err := dh.GetTopic(projectName, topicName)
  4. if err != nil {
  5. fmt.Println("get topic failed")
  6. return
  7. }
  8. fmt.Println("get topic successful")
  9. cursor, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
  10. if err != nil {
  11. fmt.Println("get cursor failed")
  12. fmt.Println(err)
  13. return
  14. }
  15. fmt.Println("get cursor successful")
  16. limitNum := 100
  17. maxReTry := 3
  18. retryNum := 0
  19. for retryNum < maxReTry {
  20. gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor.Cursor, limitNum, topic.RecordSchema)
  21. if err != nil {
  22. if _, ok := err.(*datahub.LimitExceededError); ok {
  23. fmt.Println("maybe qps exceed limit,retry")
  24. retryNum++
  25. time.Sleep(5 * time.Second)
  26. continue
  27. } else {
  28. fmt.Println("get record failed")
  29. fmt.Println(err)
  30. return
  31. }
  32. }
  33. fmt.Println("get record successful")
  34. for _, record := range gr.Records {
  35. data, ok := record.(*datahub.TupleRecord)
  36. if !ok {
  37. fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
  38. } else {
  39. fmt.Println(data.Values)
  40. }
  41. }
  42. break
  43. }
  44. if retryNum >= maxReTry {
  45. fmt.Printf("get records failed ")
  46. }
  47. }

Blob topic data

GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error)

  • 参数

    • projectName: project name
    • topicName: topic name
    • shardId: The id of the shard.
    • cursor: The start cursor used to read data.
    • limit:Max record size to read.
  • return

  1. type GetRecordsResult struct {
  2. NextCursor string `json:"NextCursor"`
  3. RecordCount int `json:"RecordCount"`
  4. StartSequence int64 `json:"StartSeq"`
  5. Records []IRecord `json:"Records"`
  6. RecordSchema *RecordSchema `json:"-"`
  7. }
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func getBlobData() {
  2. shardId := "1"
  3. cursor, err := dh.GetCursor(projectName, blobTopicName, shardId, datahub.OLDEST)
  4. if err != nil {
  5. fmt.Println("get cursor failed")
  6. fmt.Println(err)
  7. return
  8. }
  9. fmt.Println("get cursor successful")
  10. limitNum := 100
  11. maxReTry := 3
  12. retryNum := 0
  13. for retryNum < maxReTry {
  14. gr, err := dh.GetBlobRecords(projectName, blobTopicName, shardId, cursor.Cursor, limitNum)
  15. if err != nil {
  16. if _, ok := err.(*datahub.LimitExceededError); ok {
  17. fmt.Println("maybe qps exceed limit,retry")
  18. retryNum++
  19. time.Sleep(5 * time.Second)
  20. continue
  21. } else {
  22. fmt.Println("get record failed")
  23. fmt.Println(err)
  24. return
  25. }
  26. }
  27. fmt.Println("get record successful")
  28. for _, record := range gr.Records {
  29. data, ok := record.(*datahub.BlobRecord)
  30. if !ok {
  31. fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
  32. } else {
  33. fmt.Println(data.StoreData)
  34. }
  35. }
  36. break
  37. }
  38. if retryNum >= maxReTry {
  39. fmt.Printf("get records failed ")
  40. }
  41. }

meter操作

metering info是对shard的资源占用情况的统计信息,一小时更新一次。

GetMeterInfo(projectName, topicName, shardId string) (*GetMeterInfoResult, error)

  • 参数
    • projectName: project name
    • topicName: topic name
    • shardId: The id of the shard.
  • return
  1. type GetMeterInfoResult struct {
  2. ActiveTime int64 `json:"ActiveTime"`
  3. Storage int64 `json:"Storage"`
  4. }
  • error- error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func meter(dh datahub.DataHub, projectName, topicName string) {
  2. shardId := "0"
  3. gmi, err := dh.GetMeterInfo(projectName, topicName, shardId)
  4. if err != nil {
  5. fmt.Println("get meter information failed")
  6. return
  7. }
  8. fmt.Println("get meter information successful")
  9. fmt.Println(gmi)
  10. }

connector操作

DataHub Connector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(原ODPS)、OSS(Object Storage Service,阿里云对象存储服务)、ES(Elasticsearch)、ADS(AnalyticDB for MySQL,分析型数据库MySQL版)、MYSQL、FC(Function Compute、函数计算)、OTS(Open Table Store、表格存储)、DataHub中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。

这里所有的示例代码均以MaxCompute为例。MaxCompute Config的配置信息可以参考同步数据到MaxCompute

datahub2.14.0版本之后将接口参数connectorType修改connectorId(createConnector除外),不过接口依旧兼容2.14.0之前版本,只需将参数connectorType转为string作为参数即可。

  • 使用示例
  1. gcr, err := dh.GetConnector(projectName, topicName, string(datahub.SinkOdps))

创建connector

CreateConnector(projectName, topicName string, cType ConnectorType, columnFields []string, config interface{}) (*CreateConnectorResult, error)

  • 参数
    • projectName: project name
    • topicName: topic name
    • cType: The type of connector which you want create.
    • columnFields: Which fields you want synchronize.
    • config: Detail config of specified connector type.
  • return
  1. type CreateConnectorResult struct {
  2. ConnectorId string `json:"ConnectorId"`
  3. }
  • error

    • ResourceNotFoundError
    • ResourceExistError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func createConnector(dh datahub.DataHub, projectName, topicName string) {
  2. odpsEndpoint := ""
  3. odpsProject := "datahub_test"
  4. odpsTable := "datahub_go_example"
  5. odpsAccessId := ""
  6. odpsAccessKey := "="
  7. odpsTimeRange := 60
  8. odpsPartitionMode := datahub.SystemTimeMode
  9. connectorType := datahub.SinkOdps
  10. odpsPartitionConfig := datahub.NewPartitionConfig()
  11. odpsPartitionConfig.AddConfig("ds", "%Y%m%d")
  12. odpsPartitionConfig.AddConfig("hh", "%H")
  13. odpsPartitionConfig.AddConfig("mm", "%M")
  14. sinkOdpsConfig := datahub.SinkOdpsConfig{
  15. Endpoint: odpsEndpoint,
  16. Project: odpsProject,
  17. Table: odpsTable,
  18. AccessId: odpsAccessId,
  19. AccessKey: odpsAccessKey,
  20. TimeRange: odpsTimeRange,
  21. PartitionMode: odpsPartitionMode,
  22. PartitionConfig: *odpsPartitionConfig,
  23. }
  24. fileds := []string{"field1", "field2"}
  25. if err := dh.CreateConnector(projectName, topicName, connectorType, fileds, *sinkOdpsConfig); err != nil {
  26. fmt.Println("create odps connector failed")
  27. fmt.Println(err)
  28. return
  29. }
  30. fmt.Println("create odps connector successful")
  31. }

列出connector

ListConnector(projectName, topicName string) (*ListConnectorResult, error)

  • 参数
    • projectName: project name
    • topicName: topic name
  • return
  1. type ListConnectorResult struct {
  2. ConnectorIds []string `json:"Connectors"`
  3. }
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func listConnector(dh datahub.DataHub, projectName, topicName string) {
  2. lc, err := dh.ListConnector(projectName, topicName)
  3. if err != nil {
  4. fmt.Println("get connector list failed")
  5. fmt.Println(err)
  6. return
  7. }
  8. fmt.Println("get connector list successful")
  9. fmt.Println(lc)
  10. }

查询connector

GetConnector(projectName, topicName, connectorId string) (*GetConnectorResult, error)

  • 参数
    • projectName: project name
    • topicName: topic name
    • connectorId: The id of the connector
  • return
  1. type GetConnectorResult struct {
  2. CreateTime int64 `json:"CreateTime"`
  3. LastModifyTime int64 `json:"LastModifyTime"`
  4. ConnectorId string `json:"ConnectorId"`
  5. ClusterAddress string `json:"ClusterAddress"`
  6. Type ConnectorType `json:"Type"`
  7. State ConnectorState `json:"State"`
  8. ColumnFields []string `json:"ColumnFields"`
  9. ExtraConfig map[string]string `json:"ExtraInfo"`
  10. Creator string `json:"Creator"`
  11. Owner string `json:"Owner"`
  12. Config interface{} `json:"Config"`
  13. }
  • error
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例
  1. func getConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
  2. gcr, err := dh.GetConnector(projectName, topicName, connectorId)
  3. if err != nil {
  4. fmt.Println("get odps conector failed")
  5. fmt.Println(err)
  6. return
  7. }
  8. fmt.Println("get odps conector successful")
  9. fmt.Println(*gcr)
  10. }

更新connector配置

UpdateConnector(projectName, topicName, connectorId string, config interface{}) error

  • 参数
    • projectName: project name
    • topicName: topic name
    • connectorId: The id of the connector.
    • config: Detail config of specified connector type.
  • return
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func updateConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
  2. gc, err := dh.GetConnector(projectName, topicName, connectorId)
  3. if err != nil {
  4. fmt.Println("get odps connector failed")
  5. fmt.Println(err)
  6. return
  7. }
  8. config, ok := gc.Config.(datahub.SinkOdpsConfig)
  9. if !ok {
  10. fmt.Println("convert config to SinkOdpsConfig failed")
  11. return
  12. }
  13. // modify the config
  14. config.TimeRange = 200
  15. if err := dh.UpdateConnector(projectName, topicName, connectorId, config); err != nil {
  16. fmt.Println("update odps config failed")
  17. fmt.Println(err)
  18. return
  19. }
  20. fmt.Println("update odps config successful")
  21. }

删除connector

DeleteConnector(projectName, topicName, connectorId string) error

  • 参数
    • projectName: project name
    • topicName: topic name
    • connectorId: The id of the connector.
  • return
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func deleteConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
  2. if err := dh.DeleteConnector(projectName, topicName, connectorId); err != nil {
  3. fmt.Println("delete odps connector failed")
  4. fmt.Println(err)
  5. return
  6. }
  7. fmt.Println("delete odps connector successful")
  8. }

查询connector shard状态

可以获取某个topic下所有shard的状态信息,也可以获取topic下指定shard的状态信息。

GetConnectorShardStatus(projectName, topicName, connectorId string) (*GetConnectorShardStatusResult, error)

GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId string) (*ConnectorShardStatusEntry, error)

  • 参数

    • projectName: project name
    • topicName: topic name
    • shardId: The id of the shard.
    • connectorId: The id of the connector.
  • return

  1. // getConnectorShardStatus
  2. type GetConnectorShardStatusResult struct {
  3. ShardStatus map[string]ConnectorShardStatusEntry `json:"ShardStatusInfos"`
  4. }
  5. // GetConnectorShardStatusByShard
  6. type ConnectorShardStatusEntry struct {
  7. StartSequence int64 `json:"StartSequence"`
  8. EndSequence int64 `json:"EndSequence"`
  9. CurrentSequence int64 `json:"CurrentSequence"`
  10. CurrentTimestamp int64 `json:"CurrentTimestamp"`
  11. UpdateTime int64 `json:"UpdateTime"`
  12. State ConnectorShardState `json:"State"`
  13. LastErrorMessage string `json:"LastErrorMessage"`
  14. DiscardCount int64 `json:"DiscardCount"`
  15. DoneTime int64 `json:"DoneTime"`
  16. WorkerAddress string `json:"WorkerAddress"`
  17. }
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func getConnectorShardStatus(dh datahub.DataHub, projectName, topicName, connectorId string) {
  2. gcs, err := dh.GetConnectorShardStatus(projectName, topicName, connectorId)
  3. if err != nil {
  4. fmt.Println("get connector shard status failed")
  5. fmt.Println(err)
  6. return
  7. }
  8. fmt.Println("get connector shard status successful")
  9. for shard, status := range gcs.ShardStatus {
  10. fmt.Println(shard, status.State)
  11. }
  12. shardId := "0"
  13. gc, err := dh.GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId)
  14. if err != nil {
  15. fmt.Println("get connector shard status failed")
  16. fmt.Println(err)
  17. return
  18. }
  19. fmt.Println("get connector shard status successful")
  20. fmt.Println(*gc)
  21. }

重启connector shard

可以重启topic下的所有shard,也可以重启topic下的指定shard。

ReloadConnector(projectName, topicName, connectorId string) error

ReloadConnectorByShard(projectName, topicName, connectorId, shardId string) error

  • 参数
    • projectName: project name
    • topicName: topic name
    • connectorId: The id of the connector.
    • shardId: The id of the shard.
  • return
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func reloadConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
  2. if err := dh.ReloadConnector(projectName, topicName, connectorId); err != nil {
  3. fmt.Println("reload connector shard failed")
  4. fmt.Println(err)
  5. return
  6. }
  7. fmt.Println("reload connector shard successful")
  8. shardId := "2"
  9. if err := dh.ReloadConnectorByShard(projectName, topicName, connectorId, shardId); err != nil {
  10. fmt.Println("reload connector shard failed")
  11. fmt.Println(err)
  12. return
  13. }
  14. fmt.Println("reload connector shard successful")
  15. }

添加新field

可以给connector添加指定列,但要求datahub的topic中和odps都存在对应的列。

AppendConnectorField(projectName, topicName, connectorId, fieldName string) error

  • 参数
    • projectName: project name
    • topicName: topic name
    • connectorId: The id of the connector.
    • fieldName: The name of the field.
  • return
  • error

    • ResourceNotFoundError
    • InvalidParameterError
  • 示例

  1. func appendConnectorField(dh datahub.DataHub, projectName, topicName, connectorId string) {
  2. if err := dh.AppendConnectorField(projectName, topicName, connectorId, "field2"); err != nil {
  3. fmt.Println("append filed failed")
  4. fmt.Println(err)
  5. return
  6. }
  7. fmt.Println("append filed successful")
  8. }

更新connector状态

connector状态分两种,CONNECTOR_PAUSED和CONNECTOR_RUNNING,分别表示停止和运行中。

UpdateConnectorState(projectName, topicName, connectorId string, state ConnectorState) error

  • 参数
    • projectName: project name
    • topicName: topic name
    • connectorId: The id of the connector.
    • state:The state of the connector which you want update.
  • return
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func updateConnectorState(dh datahub.DataHub, projectName, topicName, connectorId string) {
  2. if err := dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorStopped); err != nil {
  3. fmt.Println("update connector state failed")
  4. fmt.Println(err)
  5. return
  6. }
  7. fmt.Println("update connector state successful")
  8. if err := dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorRunning); err != nil {
  9. fmt.Println("update connector state failed")
  10. fmt.Println(err)
  11. return
  12. }
  13. fmt.Println("update connector state successful")
  14. }

更新connector点位信息

UpdateConnectorOffset(projectName, topicName, connectorId, shardId string, offset ConnectorOffset) error

  • 参数

    • projectName: project name
    • topicName: topic name
    • shardId: The id of the shard.
    • connectorId: The id of the connector.
    • offset: The connector offset.
  • return

  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func updateConnectorOffset(dh datahub.DataHub, projectName, topicName, connectorId string) {
  2. shardId := "10"
  3. offset := datahub.ConnectorOffset{
  4. Timestamp: 1565864139000,
  5. Sequence: 104,
  6. }
  7. dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorStopped)
  8. defer dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorRunning)
  9. if err := dh.UpdateConnectorOffset(projectName, topicName, connectorId, shardId, offset); err != nil {
  10. fmt.Println("update connector offset failed")
  11. fmt.Println(err)
  12. return
  13. }
  14. fmt.Println("update connector offset successful")
  15. }

查询connector完成时间

只有MaxCompute可以查询完成时间。

GetConnectorDoneTime(projectName, topicName, connectorId string) (*GetConnectorDoneTimeResult, error)

  • 参数
    • projectName: project name
    • topicName: topic name
    • connectorId: The id of the connector.
  • return
  1. type GetConnectorDoneTimeResult struct {
  2. DoneTime int64 `json:"DoneTime"`
  3. }
  • error
    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例
  1. func doneTime(dh datahub.DataHub, projectName, topicName, connectorId string) {
  2. gcd, err := dh.GetConnectorDoneTime(projectName, topicName, connectorId)
  3. if err != nil {
  4. fmt.Println("get connector done time failed")
  5. fmt.Println(err)
  6. return
  7. }
  8. fmt.Println("get connector done time successful")
  9. fmt.Println(gcd.DoneTime)
  10. }

subscription操作

订阅服务提供了服务端保存用户消费点位的功能,只需要通过简单配置和处理,就可以实现高可用的点位存储服务。

创建subscription

CreateSubscription(projectName, topicName, comment string) error

  • 参数

    • projectName: project name
    • topicName: topic name
    • comment: subscription comment
  • return

  • error

    • ResourceExistError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func createSubscription() {
  2. csr, err := dh.CreateSubscription(projectName, topicName, "sub comment")
  3. if err != nil {
  4. fmt.Println("create subscription failed")
  5. fmt.Println(err)
  6. return
  7. }
  8. fmt.Println("create subscription successful")
  9. fmt.Println(*csr)
  10. }

删除subscription

DeleteSubscription(projectName, topicName, subId string) error

  • 参数

    • projectName: project name
    • topicName: topic name
    • subId: The id of the subscription.
  • return

  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func delSubscription(dh datahub.DataHub, projectName, topicName string) {
  2. subId := "1565577384801DCN0O"
  3. if err := dh.DeleteSubscription(projectName, topicName, subId); err != nil {
  4. fmt.Println("delete subscription failed")
  5. return
  6. }
  7. fmt.Println("delete subscription successful")
  8. }

查询subscription

GetSubscription(projectName, topicName, subId string) (*GetSubscriptionResult, error)

  • 参数
    • projectName: project name
    • topicName: topic name
    • subId: The id of the subscription.
  • return
  1. type GetSubscriptionResult struct {
  2. SubscriptionEntry
  3. }
  • error

    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func getSubscription(dh datahub.DataHub, projectName, topicName string) {
  2. subId := "1565577384801DCN0O"
  3. gs, err := dh.GetSubscription(projectName, topicName, subId)
  4. if err != nil {
  5. fmt.Println("get subscription failed")
  6. fmt.Println(err)
  7. return
  8. }
  9. fmt.Println("get subscription successful")
  10. fmt.Println(gs)
  11. }

列出subscription

通过pageIndex和pageSize获取指定范围的subscription信息,如pageIndex=1, pageSize=10,获取1-10个subscription; pageIndex=2, pageSize=5则获取6-10的subscription。

ListSubscription(projectName, topicName string, pageIndex, pageSize int) (*ListSubscriptionResult, error)

  • 参数
    • projectName: project name
    • topicName: topic name
    • pageIndex: The page index used to list subscriptions. - pageSize: The page size used to list subscriptions.
  • return
  1. type ListSubscriptionResult struct {
  2. TotalCount int64 `json:"TotalCount"`
  3. Subscriptions []SubscriptionEntry `json:"Subscriptions"`
  4. }
  • error

    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func listSubscription(dh datahub.DataHub, projectName, topicName string) {
  2. pageIndex := 1
  3. pageSize := 5
  4. ls, err := dh.ListSubscription(projectName, topicName, pageIndex, pageSize)
  5. if err != nil {
  6. fmt.Println("get subscription list failed")
  7. fmt.Println(err)
  8. return
  9. }
  10. fmt.Println("get subscription list successful")
  11. for _, sub := range ls.Subscriptions {
  12. fmt.Println(sub)
  13. }
  14. }

更新subscription

目前仅支持更新subscription comment

UpdateSubscription(projectName, topicName, subId, comment string) error

  • 参数

    • projectName: project name
    • topicName: topic name
    • subId: The id of the subscription.
    • comment: subcription comment
  • return

  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func updateSubscription(dh datahub.DataHub, projectName, topicName string) {
  2. subId := "1565580329258VXSY8"
  3. if err := dh.UpdateSubscription(projectName, topicName, subId, "new sub comment"); err != nil {
  4. fmt.Println("update subscription comment failed")
  5. fmt.Println(err)
  6. return
  7. }
  8. fmt.Println("update subscription comment successful")
  9. }

更新subscription状态

subscription 有两种状态,SUB_OFFLINE 和 SUB_ONLINE,分别表示离线和在线。

UpdateSubscriptionState(projectName, topicName, subId string, state SubscriptionState) error

  • 参数

    • projectName: project name
    • topicName: topic name
    • subId: The id of the subscription.
    • state: The state you want to change.
  • return

  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func updateSubState(dh datahub.DataHub, projectName, topicName string) {
  2. subId := "1565580329258VXSY8"
  3. if err := dh.UpdateSubscriptionState(projectName, topicName, subId, datahub.SUB_OFFLINE); err != nil {
  4. fmt.Println("update subscription state failed")
  5. fmt.Println(err)
  6. return
  7. }
  8. fmt.Println("update subscription state successful")
  9. }

offset操作

一个subscription创建后,初始状态是未消费的,要使用subscription服务提供的点位存储功能,需要进行一些offset操作。

初始化offset

初始化subscrition是使用subscription进行点位操作的第一步。一个subscription不支持并行操作,如果需要在多个进程中消费同一份数据,则需要使用不同的subscription。调用OpenSubscriptionSession之后,获取的点位信息中,SessionId会+1,并且之前的session失效,无法进行更新offset操作。

OpenSubscriptionSession(projectName, topicName, subId string, shardIds []string) (*OpenSubscriptionSessionResult, error)

  • 参数
    • projectName: project name
    • topicName: topic name
    • subId: The id of the subscription.
    • shardIds: The id list of the shards.
  • return
  1. type OpenSubscriptionSessionResult struct {
  2. Offsets map[string]SubscriptionOffset `json:"Offsets"`
  3. }
  4. // SubscriptionOffset
  5. type SubscriptionOffset struct {
  6. Timestamp int64 `json:"Timestamp"`
  7. Sequence int64 `json:"Sequence"`
  8. VersionId int64 `json:"Version"`
  9. SessionId *int64 `json:"SessionId"`
  10. }
  • error
    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例
  1. func openOffset(dh datahub.DataHub, projectName, topicName string) {
  2. subId := "1565580329258VXSY8"
  3. shardIds := []string{"0", "1", "2"}
  4. oss, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
  5. if err != nil {
  6. fmt.Println("open session failed")
  7. fmt.Println(err)
  8. return
  9. }
  10. fmt.Println("open session successful")
  11. fmt.Println(oss)
  12. }

获取offset

获取subscription的当前点位信息。与OpenSubscriptionSession不同的是,GetSubscriptionOffse获取的点位信息中SubscriptionOffset的SessionId为nil,是无法进行commit点位操作的,因此GetSubscriptionOffset一般用来查看点位信息。

GetSubscriptionOffset(projectName, topicName, subId string, shardIds []string) (*GetSubscriptionOffsetResult, error)

  • 参数

    • projectName: project name
    • topicName: topic name
    • subId: The id of the subscription.
    • shardIds: The id list of the shards.
  • return

  1. type OpenSubscriptionSessionResult struct {
  2. Offsets map[string]SubscriptionOffset `json:"Offsets"`
  3. }
  4. // SubscriptionOffset
  5. type SubscriptionOffset struct {
  6. Timestamp int64 `json:"Timestamp"`
  7. Sequence int64 `json:"Sequence"`
  8. VersionId int64 `json:"Version"`
  9. SessionId *int64 `json:"SessionId"`
  10. }
  • error

    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例

  1. func getOffset(dh datahub.DataHub, projectName, topicName string) {
  2. subId := "1565580329258VXSY8"
  3. shardIds := []string{"0", "1", "2"}
  4. gss, err := dh.GetSubscriptionOffset(projectName, topicName, subId, shardIds)
  5. if err != nil {
  6. fmt.Println("get session failed")
  7. fmt.Println(err)
  8. return
  9. }
  10. fmt.Println("get session successful")
  11. fmt.Println(gss)
  12. }

更新offset

更新点位时会验证versionId和sessionId,必须与当前session一致才会更新成功。更新点位时,需要同时设置Timestamp和Sequence,才会更新为有效点位,如果两者不对应,则会更新点位到Timestamp对应的点位,建议更新点位时,选择record中对应的Timestamp和Sequence进行点位更新。

CommitSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) error

  • 参数

    • projectName: project name
    • topicName: topic name
    • subId: The id of the subscription.
    • offsets: The offset map of shards.
  • return

  • error
    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例
  1. func updateOffset() {
  2. shardIds := []string{"0", "1", "2"}
  3. oss, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
  4. if err != nil {
  5. fmt.Println("open session failed")
  6. fmt.Println(err)
  7. }
  8. fmt.Println("open session successful")
  9. fmt.Println(oss)
  10. offset := oss.Offsets["0"]
  11. // set offset message
  12. offset.Sequence = 900
  13. offset.Timestamp = 1565593166690
  14. offsetMap := map[string]datahub.SubscriptionOffset{
  15. "0": offset,
  16. }
  17. if err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap); err != nil {
  18. if _, ok := err.(*datahub.SubscriptionOfflineError); ok {
  19. fmt.Println("the subscription has offline")
  20. } else if _, ok := err.(*datahub.SubscriptionSessionInvalidError); ok {
  21. fmt.Println("the subscription is open elsewhere")
  22. } else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
  23. fmt.Println("the subscription is reset elsewhere")
  24. } else {
  25. fmt.Println(err)
  26. }
  27. fmt.Println("update offset failed")
  28. return
  29. }
  30. fmt.Println("update offset successful")
  31. }

重置offset

重置offset可以将offset重置到某个时间点上,重置之后,并且获取的offset信息中,VersionId会+1,之前的session失效,无法进行更新点位操作。

ResetSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) error

  • 参数

    • projectName: project name
    • topicName: topic name
    • subId: The id of the subscription.
    • offsets: The offset map of shards.
  • return

  • error
    • ResourceNotFoundError
    • AuthorizationFailedError
    • DatahubClientError
    • InvalidParameterError
  • 示例
  1. func resetOffset(dh datahub.DataHub, projectName, topicName string) {
  2. subId := "1565580329258VXSY8"
  3. offset := datahub.SubscriptionOffset{
  4. Timestamp: 1565593166690,
  5. }
  6. offsetMap := map[string]datahub.SubscriptionOffset{
  7. "1": offset,
  8. }
  9. if err := dh.ResetSubscriptionOffset(projectName, topicName, subId, offsetMap); err != nil {
  10. fmt.Println("reset offset failed")
  11. fmt.Println(err)
  12. return
  13. }
  14. fmt.Println("reset offset successful")
  15. }

error类型

GO SDK对datahub的错误类型进行了整理,用户可以使用类型断言进行错误类型的判断,然后根据错误的类型进行响应的处理。其中错误类型中,除DatahubClientError和LimitExceededError之外,其余均属于不可重试错误,而DatahubClientError中包含部分可重试错误,例如server busy,server unavailable等,因此建议遇到DatahubClientError和LimitExceededError时,可以在代码逻辑中添加重试逻辑,但应严格限制重试次数。

类名 错误码 描述
InvalidParameterError InvalidParameter, InvalidCursor 非法参数
ResourceNotFoundError ResourceNotFound, NoSuchProject, NoSuchTopic, NoSuchShard, NoSuchSubscription, NoSuchConnector, NoSuchMeteringInfo 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )
ResourceExistError ResourceAlreadyExist, ProjectAlreadyExist, TopicAlreadyExist, ConnectorAlreadyExist 资源已存在(创建时如果资源已存在,就会抛出这个异常
SeekOutOfRangeError SeekOutOfRange getCursor时,给的sequence不在有效范围内(通常数据已过期),或给的timestamp大于当前时间
AuthorizationFailedError Unauthorized Authorization 签名解析异常,检查AK是否填写正确
NoPermissionError NoPermission, OperationDenied 没有权限,通常是RAM配置不正确,或没有正确授权子账号
NewShardSealedError InvalidShardOperation shard 处于CLOSED状态可读不可写,继续往CLOSED的shard 写数据,或读到最后一条数据后继续读取,会抛出该异常
LimitExceededError LimitExceeded 接口使用超限,参考 限制描述
SubscriptionOfflineError SubscriptionOffline 订阅处于下线状态不可用
SubscriptionSessionInvalidError OffsetSessionChanged, OffsetSessionClosed 订阅会话异常,使用订阅时会建立一个session,用于提交点位,如果有其他客户端使用该订阅,会得到该异常
SubscriptionOffsetResetError OffsetReseted 订阅点位被重置
MalformedRecordError MalformedRecord 非法的 Record 格式,可能的情况有:schema 不正确、包含非utf-8字符、客户端使用pb而服务端不支持、等等
DatahubClientError 其他所有,并且是所有异常的基类 如排除以上异常情况,通常重试即可,但应限制重试次数

DatahubClientError

datahub的基础错误类型,所有的error都继承了这个错误类型。datahub的错误类型除了已经定义的错误类型,其余错误均属于DatahubClientError,其中包括服务器busy、服务器unavailable等可重试错误,用户可以在自己的代码逻辑中添加一些重试机制。

  1. type DatahubClientError struct {
  2. StatusCode int `json:"StatusCode"` // Http status code
  3. RequestId string `json:"RequestId"` // Request-id to trace the request
  4. Code string `json:"ErrorCode"` // Datahub error code
  5. Message string `json:"ErrorMessage"` // Error msg of the error code
  6. }

error使用示例:

  1. func example_error() {
  2. accessId := ""
  3. accessKey := ""
  4. endpoint := ""
  5. projectName := "datahub_go_test"
  6. maxRetry := 3
  7. dh := datahub.New(accessId, accessKey, endpoint)
  8. if err := dh.CreateProject(projectName, "project comment"); err != nil {
  9. if _, ok := err.(*datahub.InvalidParameterError); ok {
  10. fmt.Println("invalid parameter,please check your input parameter")
  11. } else if _, ok := err.(*datahub.ResourceExistError); ok {
  12. fmt.Println("project already exists")
  13. } else if _, ok := err.(*datahub.AuthorizationFailedError); ok {
  14. fmt.Println("accessId or accessKey err,please check your accessId and accessKey")
  15. } else if _, ok := err.(*datahub.LimitExceededError); ok {
  16. fmt.Println("limit exceed, so retry")
  17. for i := 0; i < maxRetry; i++ {
  18. // wait 5 seconds
  19. time.Sleep(5 * time.Second)
  20. if err := dh.CreateProject(projectName, "project comment"); err != nil {
  21. fmt.Println("create project failed")
  22. fmt.Println(err)
  23. } else {
  24. fmt.Println("create project successful")
  25. break
  26. }
  27. }
  28. } else {
  29. fmt.Println("unknown error")
  30. fmt.Println(err)
  31. }
  32. } else {
  33. fmt.Println("create project successful")
  34. }
  35. }