本文为您展示DataHub的 GO SDK的Connector操作。
Connector 说明
DataHub Connector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(原ODPS)、OSS(Object Storage Service,阿里云对象存储服务)、ES(Elasticsearch)、ADS(AnalyticDB for MySQL,分析型数据库MySQL版)、MySQL、FC(Function Compute、函数计算)、OTS(Open Table Store、表格存储)、DataHub中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。这里所有的示例代码均以MaxCompute为例。MaxCompute Config的配置信息可以参考创建同步MaxCompute。
datahub2.14.0版本之后将接口参数connectorType修改connectorId(createConnector除外),不过接口依旧兼容2.14.0之前版本,只需将参数connectorType转为string作为参数即可。
转换示例如下:
1. gcr, err := dh.GetConnector(projectName, topicName, string(datahub.SinkOdps))
创建Connector
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称。 |
cType | ConnectorType | The type of connector which you want create. |
columnFields | string | 需要同步的字段。 |
sinkStartTime | int64 | 工作开始时间。 |
config | interface | connector配置详情。 |
返回示例
type CreateConnectorResult struct {
ConnectorId string `json:"ConnectorId"`
}
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
ResourceExistError |
| 资源已存在(创建时如果资源已存在,就会抛出这个异常)。 |
AuthorizationFailedError |
| 资源已存在(创建时如果资源已存在,就会抛出这个异常)。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类 |
InvalidParameterError |
| 非法参数 |
代码示例
func createConnector(dh datahub.DataHub, projectName, topicName string) {
odpsEndpoint := ""
odpsProject := "datahub_test"
odpsTable := "datahub_go_example"
odpsAccessId := ""
odpsAccessKey := "="
odpsTimeRange := 60
odpsPartitionMode := datahub.SystemTimeMode
connectorType := datahub.SinkOdps
odpsPartitionConfig := datahub.NewPartitionConfig()
odpsPartitionConfig.AddConfig("ds", "%Y%m%d")
odpsPartitionConfig.AddConfig("hh", "%H")
odpsPartitionConfig.AddConfig("mm", "%M")
sinkOdpsConfig := datahub.SinkOdpsConfig{
Endpoint: odpsEndpoint,
Project: odpsProject,
Table: odpsTable,
AccessId: odpsAccessId,
AccessKey: odpsAccessKey,
TimeRange: odpsTimeRange,
PartitionMode: odpsPartitionMode,
PartitionConfig: *odpsPartitionConfig,
}
fileds := []string{"field1", "field2"}
if err := dh.CreateConnector(projectName, topicName, connectorType, fileds, *sinkOdpsConfig); err != nil {
fmt.Println("create odps connector failed")
fmt.Println(err)
return
}
fmt.Println("create odps connector successful")
}
列出connector
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称 |
返回示例
type ListConnectorResult struct {
ConnectorIds []string `json:"Connectors"`
}
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类。 |
InvalidParameterError |
| 非法参数。 |
代码示例
func listConnector(dh datahub.DataHub, projectName, topicName string) {
lc, err := dh.ListConnector(projectName, topicName)
if err != nil {
fmt.Println("get connector list failed")
fmt.Println(err)
return
}
fmt.Println("get connector list successful")
fmt.Println(lc)
}
查询connector
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称 |
connectorId | The id of the connector |
返回示例
type GetConnectorResult struct {
CreateTime int64 `json:"CreateTime"`
LastModifyTime int64 `json:"LastModifyTime"`
ConnectorId string `json:"ConnectorId"`
ClusterAddress string `json:"ClusterAddress"`
Type ConnectorType `json:"Type"`
State ConnectorState `json:"State"`
ColumnFields []string `json:"ColumnFields"`
ExtraConfig map[string]string `json:"ExtraInfo"`
Creator string `json:"Creator"`
Owner string `json:"Owner"`
Config interface{} `json:"Config"`
}
错误说明
错误类名 | 错误码 | 错误说明 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类 |
InvalidParameterError |
| 非法参数。 |
代码示例
func getConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
gcr, err := dh.GetConnector(projectName, topicName, connectorId)
if err != nil {
fmt.Println("get odps conector failed")
fmt.Println(err)
return
}
fmt.Println("get odps conector successful")
fmt.Println(*gcr)
}
更新connector配置
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | TOPICname |
connectorId | string | 需要更新的connector ID。 |
config | interface | connector配置详情。 |
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类 |
InvalidParameterError |
| 非法参数。 |
代码示例
func updateConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
gc, err := dh.GetConnector(projectName, topicName, connectorId)
if err != nil {
fmt.Println("get odps connector failed")
fmt.Println(err)
return
}
config, ok := gc.Config.(datahub.SinkOdpsConfig)
if !ok {
fmt.Println("convert config to SinkOdpsConfig failed")
return
}
// modify the config
config.TimeRange = 200
if err := dh.UpdateConnector(projectName, topicName, connectorId, config); err != nil {
fmt.Println("update odps config failed")
fmt.Println(err)
return
}
fmt.Println("update odps config successful")
}
删除connector
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称 |
connectorId | string | The id of the connector. |
返回示例
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类 |
InvalidParameterError |
| 非法参数。 |
代码示例
func deleteConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
if err := dh.DeleteConnector(projectName, topicName, connectorId); err != nil {
fmt.Println("delete odps connector failed")
fmt.Println(err)
return
}
fmt.Println("delete odps connector successful")
}
查询connector shard状态
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称。 |
shardId | string | The id of the shard. |
connectorId | string | The id of the connector. |
返回示例
// getConnectorShardStatus
type GetConnectorShardStatusResult struct {
ShardStatus map[string]ConnectorShardStatusEntry `json:"ShardStatusInfos"`
}
// GetConnectorShardStatusByShard
type ConnectorShardStatusEntry struct {
StartSequence int64 `json:"StartSequence"`
EndSequence int64 `json:"EndSequence"`
CurrentSequence int64 `json:"CurrentSequence"`
CurrentTimestamp int64 `json:"CurrentTimestamp"`
UpdateTime int64 `json:"UpdateTime"`
State ConnectorShardState `json:"State"`
LastErrorMessage string `json:"LastErrorMessage"`
DiscardCount int64 `json:"DiscardCount"`
DoneTime int64 `json:"DoneTime"`
WorkerAddress string `json:"WorkerAddress"`
}
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类 |
InvalidParameterError |
| 非法参数。 |
代码示例
func getConnectorShardStatus(dh datahub.DataHub, projectName, topicName, connectorId string) {
gcs, err := dh.GetConnectorShardStatus(projectName, topicName, connectorId)
if err != nil {
fmt.Println("get connector shard status failed")
fmt.Println(err)
return
}
fmt.Println("get connector shard status successful")
for shard, status := range gcs.ShardStatus {
fmt.Println(shard, status.State)
}
shardId := "0"
gc, err := dh.GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId)
if err != nil {
fmt.Println("get connector shard status failed")
fmt.Println(err)
return
}
fmt.Println("get connector shard status successful")
fmt.Println(*gc)
}
重启connector shard
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称。 |
shardId | string | The id of the shard. |
connectorId | string | The id of the connector. |
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类 |
InvalidParameterError |
| 非法参数。 |
代码示例
func reloadConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
if err := dh.ReloadConnector(projectName, topicName, connectorId); err != nil {
fmt.Println("reload connector shard failed")
fmt.Println(err)
return
}
fmt.Println("reload connector shard successful")
shardId := "2"
if err := dh.ReloadConnectorByShard(projectName, topicName, connectorId, shardId); err != nil {
fmt.Println("reload connector shard failed")
fmt.Println(err)
return
}
fmt.Println("reload connector shard successful")
}
添加新field
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称 |
connectorId | string | The id of the connector. |
fieldName | string | The name of the field. |
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
InvalidParameterError |
| 非法参数。 |
代码示例
func appendConnectorField(dh datahub.DataHub, projectName, topicName, connectorId string) {
if err := dh.AppendConnectorField(projectName, topicName, connectorId, "field2"); err != nil {
fmt.Println("append filed failed")
fmt.Println(err)
return
}
fmt.Println("append filed successful")
}
更新connector状态
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称 |
connectorId | string | The id of the connector. |
state | ConnectorState | The state of the connector which you want update. |
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类 |
InvalidParameterError |
| 非法参数。 |
代码示例
func updateConnectorState(dh datahub.DataHub, projectName, topicName, connectorId string) {
if err := dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorStopped); err != nil {
fmt.Println("update connector state failed")
fmt.Println(err)
return
}
fmt.Println("update connector state successful")
if err := dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorRunning); err != nil {
fmt.Println("update connector state failed")
fmt.Println(err)
return
}
fmt.Println("update connector state successful")
}
更新connector点位信息
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称。 |
shardId | string | The id of the shard. |
connectorId | string | The id of the connector. |
offset | ConnectorOffset | The connector offset. |
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类 |
InvalidParameterError |
| 非法参数。 |
代码示例
func updateConnectorOffset(dh datahub.DataHub, projectName, topicName, connectorId string) {
shardId := "10"
offset := datahub.ConnectorOffset{
Timestamp: 1565864139000,
Sequence: 104,
}
dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorStopped)
defer dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorRunning)
if err := dh.UpdateConnectorOffset(projectName, topicName, connectorId, shardId, offset); err != nil {
fmt.Println("update connector offset failed")
fmt.Println(err)
return
}
fmt.Println("update connector offset successful")
}
查询connector完成时间
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称。 |
connectorId | string | The id of the connector. |
返回示例
1. type GetConnectorDoneTimeResult struct {
2. DoneTime int64 `json:"DoneTime"`
3. }
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类 |
InvalidParameterError |
| 非法参数。 |
代码示例
func doneTime(dh datahub.DataHub, projectName, topicName, connectorId string) {
gcd, err := dh.GetConnectorDoneTime(projectName, topicName, connectorId)
if err != nil {
fmt.Println("get connector done time failed")
fmt.Println(err)
return
}
fmt.Println("get connector done time successful")
fmt.Println(gcd.DoneTime)
}