Connector操作

本文为您展示DataHub的 GO SDKConnector操作。

Connector 说明

DataHub Connector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute(原ODPS)、OSS(Object Storage Service,阿里云对象存储服务)、ES(Elasticsearch)、ADS(AnalyticDB for MySQL,分析型数据库MySQL版)、MySQL、FC(Function Compute、函数计算)、OTS(Open Table Store、表格存储)、DataHub中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在其他云产品中使用这份数据。这里所有的示例代码均以MaxCompute为例。MaxCompute Config的配置信息可以参考创建同步MaxCompute

说明

datahub2.14.0版本之后将接口参数connectorType修改connectorId(createConnector除外),不过接口依旧兼容2.14.0之前版本,只需将参数connectorType转为string作为参数即可。

转换示例如下:

1. gcr, err := dh.GetConnector(projectName, topicName, string(datahub.SinkOdps))

创建Connector

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称。

cType

ConnectorType

The type of connector which you want create.

columnFields

string

需要同步的字段。

sinkStartTime

int64

工作开始时间。

config

interface

connector配置详情。

返回示例

type CreateConnectorResult struct {
    ConnectorId string `json:"ConnectorId"`
 }

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

ResourceExistError

ResourceAlreadyExist

ProjectAlreadyExist

TopicAlreadyExist

ConnectorAlreadyExist

资源已存在(创建时如果资源已存在,就会抛出这个异常)。

AuthorizationFailedError

ResourceAlreadyExist

ProjectAlreadyExist

TopicAlreadyExist

ConnectorAlreadyExist

资源已存在(创建时如果资源已存在,就会抛出这个异常)。

DatahubClientError

-

其他所有,并且是所有异常的基类

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数

代码示例

func createConnector(dh datahub.DataHub, projectName, topicName string) {
    odpsEndpoint := ""
    odpsProject := "datahub_test"
    odpsTable := "datahub_go_example"
    odpsAccessId := ""
    odpsAccessKey := "="
    odpsTimeRange := 60
    odpsPartitionMode := datahub.SystemTimeMode
    connectorType := datahub.SinkOdps
    odpsPartitionConfig := datahub.NewPartitionConfig()
    odpsPartitionConfig.AddConfig("ds", "%Y%m%d")
    odpsPartitionConfig.AddConfig("hh", "%H")
    odpsPartitionConfig.AddConfig("mm", "%M")
    sinkOdpsConfig := datahub.SinkOdpsConfig{
        Endpoint:        odpsEndpoint,
        Project:         odpsProject,
        Table:           odpsTable,
        AccessId:        odpsAccessId,
        AccessKey:       odpsAccessKey,
        TimeRange:       odpsTimeRange,
        PartitionMode:   odpsPartitionMode,
        PartitionConfig: *odpsPartitionConfig,
    }
    fileds := []string{"field1", "field2"}
    if err := dh.CreateConnector(projectName, topicName, connectorType, fileds, *sinkOdpsConfig); err != nil {
        fmt.Println("create odps connector failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create odps connector successful")
}

列出connector

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称

返回示例

type ListConnectorResult struct {
    ConnectorIds []string `json:"Connectors"`
}

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

其他所有,并且是所有异常的基类。

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func listConnector(dh datahub.DataHub, projectName, topicName string) {
    lc, err := dh.ListConnector(projectName, topicName)
    if err != nil {
        fmt.Println("get connector list failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get connector list successful")
    fmt.Println(lc)
}

查询connector

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称

connectorId

The id of the connector

返回示例

type GetConnectorResult struct {
    CreateTime     int64             `json:"CreateTime"`
    LastModifyTime int64             `json:"LastModifyTime"`
    ConnectorId    string            `json:"ConnectorId"`
    ClusterAddress string            `json:"ClusterAddress"`
    Type           ConnectorType     `json:"Type"`
    State          ConnectorState    `json:"State"`
    ColumnFields   []string          `json:"ColumnFields"`
    ExtraConfig    map[string]string `json:"ExtraInfo"`
    Creator        string            `json:"Creator"`
    Owner          string            `json:"Owner"`
    Config         interface{}       `json:"Config"`
}

错误说明

错误类名

错误码

错误说明

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

其他所有,并且是所有异常的基类

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func getConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gcr, err := dh.GetConnector(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("get odps conector failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get odps conector successful")
    fmt.Println(*gcr)
}

更新connector配置

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

TOPICname

connectorId

string

需要更新的connector ID。

config

interface

connector配置详情。

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

其他所有,并且是所有异常的基类

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func updateConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gc, err := dh.GetConnector(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("get odps connector failed")
        fmt.Println(err)
        return
    }
    config, ok := gc.Config.(datahub.SinkOdpsConfig)
    if !ok {
        fmt.Println("convert config to SinkOdpsConfig failed")
        return
    }
    // modify the config
    config.TimeRange = 200
    if err := dh.UpdateConnector(projectName, topicName, connectorId, config); err != nil {
        fmt.Println("update odps config failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update odps config successful")
}

删除connector

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称

connectorId

string

The id of the connector.

返回示例

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

其他所有,并且是所有异常的基类

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

 func deleteConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
     if err := dh.DeleteConnector(projectName, topicName, connectorId); err != nil {
         fmt.Println("delete odps connector failed")
         fmt.Println(err)
         return
     }
    fmt.Println("delete odps connector successful")
 }

查询connector shard状态

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称。

shardId

string

The id of the shard.

connectorId

string

The id of the connector.

返回示例

// getConnectorShardStatus
type GetConnectorShardStatusResult struct {
    ShardStatus map[string]ConnectorShardStatusEntry `json:"ShardStatusInfos"`
}
// GetConnectorShardStatusByShard
type ConnectorShardStatusEntry struct {
    StartSequence    int64               `json:"StartSequence"`
    EndSequence      int64               `json:"EndSequence"`
    CurrentSequence  int64               `json:"CurrentSequence"`
    CurrentTimestamp int64               `json:"CurrentTimestamp"`
    UpdateTime       int64               `json:"UpdateTime"`
    State            ConnectorShardState `json:"State"`
    LastErrorMessage string              `json:"LastErrorMessage"`
    DiscardCount     int64               `json:"DiscardCount"`
    DoneTime         int64               `json:"DoneTime"`
    WorkerAddress    string              `json:"WorkerAddress"`
}

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

其他所有,并且是所有异常的基类

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func getConnectorShardStatus(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gcs, err := dh.GetConnectorShardStatus(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("get connector shard status failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get connector shard status successful")
    for shard, status := range gcs.ShardStatus {
        fmt.Println(shard, status.State)
    }
    shardId := "0"
    gc, err := dh.GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId)
    if err != nil {
        fmt.Println("get connector shard status failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get connector shard status successful")
    fmt.Println(*gc)
}

重启connector shard

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称。

shardId

string

The id of the shard.

connectorId

string

The id of the connector.

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

其他所有,并且是所有异常的基类

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func reloadConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
    if err := dh.ReloadConnector(projectName, topicName, connectorId); err != nil {
        fmt.Println("reload connector shard failed")
        fmt.Println(err)
        return
    }
    fmt.Println("reload connector shard successful")
    shardId := "2"
    if err := dh.ReloadConnectorByShard(projectName, topicName, connectorId, shardId); err != nil {
        fmt.Println("reload connector shard failed")
        fmt.Println(err)
        return
    }
    fmt.Println("reload connector shard successful")
}

添加新field

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称

connectorId

string

The id of the connector.

fieldName

string

The name of the field.

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func appendConnectorField(dh datahub.DataHub, projectName, topicName, connectorId string) {
    if err := dh.AppendConnectorField(projectName, topicName, connectorId, "field2"); err != nil {
        fmt.Println("append filed failed")
        fmt.Println(err)
        return
    }
    fmt.Println("append filed successful")
}

更新connector状态

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称

connectorId

string

The id of the connector.

state

ConnectorState

The state of the connector which you want update.

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

其他所有,并且是所有异常的基类

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func updateConnectorState(dh datahub.DataHub, projectName, topicName, connectorId string) {
    if err := dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorStopped); err != nil {
        fmt.Println("update connector state failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update connector state successful")
    if err := dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorRunning); err != nil {
        fmt.Println("update connector state failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update connector state successful")
}

更新connector点位信息

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称。

shardId

string

The id of the shard.

connectorId

string

The id of the connector.

offset

ConnectorOffset

The connector offset.

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

其他所有,并且是所有异常的基类

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func updateConnectorOffset(dh datahub.DataHub, projectName, topicName, connectorId string) {
    shardId := "10"
    offset := datahub.ConnectorOffset{
        Timestamp: 1565864139000,
        Sequence:  104,
    }
    dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorStopped)
    defer dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorRunning)
    if err := dh.UpdateConnectorOffset(projectName, topicName, connectorId, shardId, offset); err != nil {
        fmt.Println("update connector offset failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update connector offset successful")
}

查询connector完成时间

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称。

connectorId

string

The id of the connector.

返回示例

1. type GetConnectorDoneTimeResult struct {
2.     DoneTime int64 `json:"DoneTime"`
3. }

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

其他所有,并且是所有异常的基类

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func doneTime(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gcd, err := dh.GetConnectorDoneTime(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("get connector done time failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get connector done time successful")
    fmt.Println(gcd.DoneTime)
}