本文为您展示DataHub的 GO SDK的Topic操作。
创建Topic
创建Tuple Topic
Tuple类型的Topic写入数据是具有格式的,需指定Record Schema,以下为支持的数据类型:
类型 | 含义 | 值域 |
BIGINT | 8字节有符号整型 |
|
DOUBLE | 8字节双精度浮点数 |
|
BOOLEAN | 布尔类型 | 支持以下类型:
|
TIMESTAMP | 时间戳类型 | 表示到微秒的时间戳类型。 |
STRING | 字符串,只支持UTF-8编码 | 单个STRING列最长允许2MB。 |
DECIMAL | s |
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称 |
comment | string | Topic说明 |
lifeCycle | int | The expire time of the data (Unit: DAY). The data written before that time is not accessible. |
recordSchema | RecordSchema | The records schema of this topic. |
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceExistError |
| 资源已存在(创建时如果资源已存在,就会抛出这个异常)。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 如排除以上异常情况,通常重试即可,但应限制重试次数。 |
InvalidParameterError |
| 非法参数 |
代码示例
func Example_CreateTupleTopic(dh datahub.DataHub, projectName, topicName string) {
recordSchema := datahub.NewRecordSchema()
recordSchema.AddField(datahub.Field{Name: "bigint_field", Type: datahub.BIGINT, AllowNull: true}).
AddField(datahub.Field{Name: "timestamp_field", Type: datahub.TIMESTAMP, AllowNull: false}).
AddField(datahub.Field{Name: "string_field", Type: datahub.STRING}).
AddField(datahub.Field{Name: "double_field", Type: datahub.DOUBLE}).
AddField(datahub.Field{Name: "boolean_field", Type: datahub.BOOLEAN})
if err := dh.CreateTupleTopic(projectName, topicName, "topic comment", 5, 7, recordSchema); err != nil {
fmt.Println("create topic failed")
fmt.Println(err)
return
}
fmt.Println("create topic successful")
}
创建Blob Topic
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称 |
comment | string | Topic说明 |
lifeCycle | int | The expire time of the data (Unit: DAY). The data written before that time is not accessible. |
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceExistError |
| 资源已存在(创建时如果资源已存在,就会抛出这个异常)。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类。 |
InvalidParameterError |
| 非法参数。 |
代码示例
func Example_CreateBlobTopic(dh datahub.DataHub, projectName, topicName string) {
if err := dh.CreateBlobTopic(projectName, topicName, "topic comment", 5, 7); err != nil {
fmt.Println("create topic failed")
fmt.Println(err)
return
}
fmt.Println("create topic successful")
}
删除Topic
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称 |
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | 如排除以上异常情况,通常重试即可,但应限制重试次数。 | |
InvalidParameterError |
| 非法参数。 |
代码示例
func ExampleDataHub_DeleteTopic(dh datahub.DataHub, projectName, topicName string) {
if err := dh.DeleteTopic(projectName, topicName); err != nil {
fmt.Println("delete failed")
fmt.Println(err)
return
}
fmt.Println("delete successful")
}
列出Topic
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
返回示例
type ListTopicResult struct {
TopicNames [] string `json:"TopicNames"`
}
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | 如排除以上异常情况,通常重试即可,但应限制重试次数。 | |
InvalidParameterError |
| 非法参数。 |
代码示例
func ExampleDataHub_ListTopic(dh datahub.DataHub, projectName, topicName string) {
lt, err := dh.ListTopic(projectName)
if err != nil {
fmt.Println("get topic list failed")
fmt.Println(err)
return
}
fmt.Println("get topic list successful")
fmt.Println(lt)
}
更新Topic
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称。 |
comment | string | Topic说明。 |
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | 如排除以上异常情况,通常重试即可,但应限制重试次数。 | |
InvalidParameterError |
| 非法参数。 |
代码示例
func ExampleDataHub_UpdateTopic(dh datahub.DataHub, projectName, topicName string) {
if err := dh.UpdateTopic(projectName, topicName, "new topic comment"); err != nil {
fmt.Println("update topic comment failed")
fmt.Println(err)
return
}
fmt.Println("update topic comment successful")
}
更多操作
在创建Tuple topic 和读写 record 的时候用到schema来标明数据存储的名称和对应类型。
网络传输中,数据都是以字符串的形式发送,需要schema来转换成对应的类型。schema就是一个Field对象的slice,Field包含三个参数,第一个参数是field的名称,第二个是field的类型,第三个参数是bool值,True表示field的值允许为空, False表示field的值不能为空。
获取Schema
对于已创建的Tuple topic,可以使用get_topic接口来获取schema信息,以下为代码示例:
func getSchema(dh datahub.DataHub, projectName, topicName string) {
gt, err := dh.GetTopic(projectName, "topic_test")
if err != nil {
fmt.Println("get topic failed")
fmt.Println(err)
return
} else {
schema := gt.RecordSchema
fmt.Println(schema)
}
}
定义Schema
要创建新的Tuple topic,需要自己定义Schema,schema可以通过以下三种方式进行初始化:
直接创建Schema
func createSchema1() {
fields := []datahub.Field{
{"bigint_field", datahub.BIGINT, true},
{"timestamp_field", datahub.TIMESTAMP, false},
{"string_field", datahub.STRING, false},
{"double_field", datahub.DOUBLE, false},
{"boolean_field", datahub.BOOLEAN, true},
{"decimal_field", datahub.DECIMAL, false},
}
schema := datahub.RecordSchema{
fields,
}
fmt.Println(schema)
}
逐个对Schema进行初始化
func createSchema2() {
recordSchema := datahub.NewRecordSchema()
recordSchema.AddField(datahub.Field{Name: "bigint_field", Type: datahub.BIGINT, AllowNull: true}).
AddField(datahub.Field{Name: "timestamp_field", Type: datahub.TIMESTAMP, AllowNull: false}).
AddField(datahub.Field{Name: "string_field", Type: datahub.STRING}).
AddField(datahub.Field{Name: "double_field", Type: datahub.DOUBLE}).
AddField(datahub.Field{Name: "boolean_field", Type: datahub.BOOLEAN}).
AddField(datahub.Field{Name: "decimal_field", Type: datahub.DECIMAL})
}
通过JSON字符串定义schema
func createSchema3() {
str := ""
schema, err := datahub.NewRecordSchemaFromJson(str)
if err != nil {
fmt.Println("create recordSchema failed")
fmt.Println(err)
return
}
fmt.Println("create recordSchema successful")
fmt.Println(schema)
}
JSON字符串的格式为:“{“fields”:[{“type”:”BIGINT”,”name”:”a”},{“type”:”STRING”,”name”:”b”}]}”