Topic操作

本文为您展示DataHub的 GO SDKTopic操作。

创建Topic

创建Tuple Topic

Tuple类型的Topic写入数据是具有格式的,需指定Record Schema,以下为支持的数据类型:

类型

含义

值域

BIGINT

8字节有符号整型

-9223372036854775807 ~ 9223372036854775807

DOUBLE

8字节双精度浮点数

-1.0 _10^308 ~ 1.0 _10^308

BOOLEAN

布尔类型

支持以下类型:

  • True/False

  • true/false

  • 0/1

TIMESTAMP

时间戳类型

表示到微秒的时间戳类型。

STRING

字符串,只支持UTF-8编码

单个STRING列最长允许2MB。

DECIMAL

s

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称

comment

string

Topic说明

lifeCycle

int

The expire time of the data (Unit: DAY). The data written before that time is not accessible.

recordSchema

RecordSchema

The records schema of this topic.

错误说明

错误类名

错误码

错误说明

ResourceExistError

ResourceAlreadyExist

ProjectAlreadyExist

TopicAlreadyExist

ConnectorAlreadyExist

资源已存在(创建时如果资源已存在,就会抛出这个异常)。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

如排除以上异常情况,通常重试即可,但应限制重试次数。

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数

代码示例

func Example_CreateTupleTopic(dh datahub.DataHub, projectName, topicName string) {
    recordSchema := datahub.NewRecordSchema()
    recordSchema.AddField(datahub.Field{Name: "bigint_field", Type: datahub.BIGINT, AllowNull: true}).
        AddField(datahub.Field{Name: "timestamp_field", Type: datahub.TIMESTAMP, AllowNull: false}).
        AddField(datahub.Field{Name: "string_field", Type: datahub.STRING}).
        AddField(datahub.Field{Name: "double_field", Type: datahub.DOUBLE}).
        AddField(datahub.Field{Name: "boolean_field", Type: datahub.BOOLEAN})
    if err := dh.CreateTupleTopic(projectName, topicName, "topic comment", 5, 7, recordSchema); err != nil {
        fmt.Println("create topic failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create topic successful")
}

创建Blob Topic

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称

comment

string

Topic说明

lifeCycle

int

The expire time of the data (Unit: DAY). The data written before that time is not accessible.

错误说明

错误类名

错误码

错误说明

ResourceExistError

ResourceAlreadyExist

ProjectAlreadyExist

TopicAlreadyExist

ConnectorAlreadyExist

资源已存在(创建时如果资源已存在,就会抛出这个异常)。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

其他所有,并且是所有异常的基类。

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func Example_CreateBlobTopic(dh datahub.DataHub, projectName, topicName string) {
    if err := dh.CreateBlobTopic(projectName, topicName, "topic comment", 5, 7); err != nil {
        fmt.Println("create topic failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create topic successful")
}

删除Topic

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

如排除以上异常情况,通常重试即可,但应限制重试次数。

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func ExampleDataHub_DeleteTopic(dh datahub.DataHub, projectName, topicName string) {
    if err := dh.DeleteTopic(projectName, topicName); err != nil {
        fmt.Println("delete failed")
        fmt.Println(err)
        return
    }
    fmt.Println("delete successful")
}

列出Topic

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

返回示例

type ListTopicResult struct {
    TopicNames [] string `json:"TopicNames"`
}

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

如排除以上异常情况,通常重试即可,但应限制重试次数。

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func ExampleDataHub_ListTopic(dh datahub.DataHub, projectName, topicName string) {
    lt, err := dh.ListTopic(projectName)
    if err != nil {
        fmt.Println("get topic list failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get topic list successful")
    fmt.Println(lt)
}

更新Topic

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称。

comment

string

Topic说明。

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

如排除以上异常情况,通常重试即可,但应限制重试次数。

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func ExampleDataHub_UpdateTopic(dh datahub.DataHub, projectName, topicName string) {
    if err := dh.UpdateTopic(projectName, topicName, "new topic comment"); err != nil {
        fmt.Println("update topic comment failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update topic comment successful")
}

更多操作

在创建Tuple topic 和读写 record 的时候用到schema来标明数据存储的名称和对应类型。

说明

网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。schema就是一个Field对象的slice,Field包含三个参数,第一个参数是field的名称,第二个是field的类型,第三个参数是bool值,True表示field的值允许为空, False表示field的值不能为空。

获取Schema

对于已创建的Tuple topic,可以使用get_topic接口来获取schema信息,以下为代码示例:

func getSchema(dh datahub.DataHub, projectName, topicName string) {
    gt, err := dh.GetTopic(projectName, "topic_test")
    if err != nil {
        fmt.Println("get topic failed")
        fmt.Println(err)
        return
    } else {
        schema := gt.RecordSchema
        fmt.Println(schema)
    }
}

定义Schema

要创建新的Tuple topic,需要自己定义Schema,schema可以通过以下三种方式进行初始化:

直接创建Schema

func createSchema1() {
    fields := []datahub.Field{
        {"bigint_field", datahub.BIGINT, true},
        {"timestamp_field", datahub.TIMESTAMP, false},
        {"string_field", datahub.STRING, false},
        {"double_field", datahub.DOUBLE, false},
        {"boolean_field", datahub.BOOLEAN, true},
        {"decimal_field", datahub.DECIMAL, false},
    }
    schema := datahub.RecordSchema{
        fields,
    }
    fmt.Println(schema)
}

逐个对Schema进行初始化

func createSchema2() {
    recordSchema := datahub.NewRecordSchema()
    recordSchema.AddField(datahub.Field{Name: "bigint_field", Type: datahub.BIGINT, AllowNull: true}).
        AddField(datahub.Field{Name: "timestamp_field", Type: datahub.TIMESTAMP, AllowNull: false}).
        AddField(datahub.Field{Name: "string_field", Type: datahub.STRING}).
        AddField(datahub.Field{Name: "double_field", Type: datahub.DOUBLE}).
        AddField(datahub.Field{Name: "boolean_field", Type: datahub.BOOLEAN}).
        AddField(datahub.Field{Name: "decimal_field", Type: datahub.DECIMAL})
}

通过JSON字符串定义schema

func createSchema3() {
    str := ""
    schema, err := datahub.NewRecordSchemaFromJson(str)
    if err != nil {
        fmt.Println("create recordSchema failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create recordSchema successful")
    fmt.Println(schema)
}
说明

JSON字符串的格式为:“{“fields”:[{“type”:”BIGINT”,”name”:”a”},{“type”:”STRING”,”name”:”b”}]}”