GO SDK使用说明

本文为您介绍DataHub使用GO SDK时需准备的环境以及错误类型。

环境准备

前提准备

  • 需提供云账号AccessIdAccessKey,详情可参见:创建AccessKey

  • 需提供访问DataHub的服务地址。

  • 需获取DataHub Go SDK包。

    go get -u -insecure github.com/aliyun/aliyun-datahub-sdk-go/datahub

初始化DataHub Client

Datahub GO SDK提供的所有API接口均由 datahub.DataHub 接口实现,所以第一步就是初始化一个DataHub对象。

使用默认参数创建DataHub对象

  • 代码示例

    import "github.com/aliyun/aliyun-datahub-sdk-go/datahub"
    accessId := ""
    accessKey := ""
    endpoint := ""
    dh := datahub.New(accessId, accessKey, endpoint)
  • 使用自定义参数创建对象支持的参数对照表:

    参数

    参数类型

    默认值

    有效值

    描述

    UserAgent

    string

    -

    -

    用户代理

    CompressorType

    CompressorType

    NOCOMPRESS

    NOCOMPRESS(不压缩)、LZ4、DEFLATE、ZLIB

    传输时支持的压缩格式。

    EnableBinary

    bool

    true

    true/false

    主要在put/get record时,使用protobuf协议。DataHub版本未支持protobuf时需要手动指定enable_pbFalse

    HttpClient

    *http.Client

    datahub.DefaultHttpClient()

    -

    具体可参考net/http

    endpoint := ""
    accessId := ""
    accessKey := ""
    token := ""
    account := datahub.NewAliyunAccount(accessId, accessKey)
    // 临时AK鉴权
    // account := datahub.NewStsCredential(accessId, accessKey, token)
    config := datahub.NewDefaultConfig()
    config.CompressorType = datahub.DEFLATE
    config.EnableBinary = true;
    config.HttpClient = datahub.DefaultHttpClient()
    dh := datahub.NewClientWithConfig(endpoint, config, account)

更多操作

DataHub GO SDK 支持使用GO MOD进行包管理。

require (
    github.com/aliyun/aliyun-datahub-sdk-go/datahub v0.1.4
)

点位消费代码示例

func OffsetConsume() {
    accessId := ""
    accessKey := ""
    endpoint := "https://dh-cn-hangzhou.aliyuncs.com"
    dh := datahub.New(accessId, accessKey, endpoint)

    projectName := ""
    topicName := ""
    subId := ""
    shardId := "0"
    shardIds := []string{"0"}

    session, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("Open subscription session failed", err)
        return
    }

    offset := session.Offsets[shardId]
    var gc *datahub.GetCursorResult = nil

    //sequence < 0说明未消费
    if offset.Sequence < 0 {
        // 获取生命周期内第一条record的cursor
        gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
        if err != nil {
            fmt.Println("Get oldest cursor failed", err)
            return
        }
    } else {
        // 获取下一条记录的Cursor
        nextSequence := offset.Sequence + 1
        gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, nextSequence)

        if err != nil {
            //按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
            if _, ok := err.(*datahub.SeekOutOfRangeError); ok {
                fmt.Println("Get cursor by sequence success for SeekOutOfRangeError, will retry...")
                gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
                if err != nil {
                    fmt.Println("Get oldest cursor failed", err)
                    return
                }
            }
        }
    }

    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("Get topic failed", err)
        return
    }

    // 读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
    recordCount := 0
    limitNum := 100
    cursor := gc.Cursor
    for true {
        gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor, limitNum, topic.RecordSchema)

        if err != nil {
            fmt.Println("Get records failed", err)
            break
        }
        if gr.RecordCount == 0 {
            fmt.Println("No data, sleep 5 seconds...")
            time.Sleep(time.Second * 5)
            continue
        }

        for _, record := range gr.Records {
            // 处理数据,这里只打印
            data, _ := record.(*datahub.TupleRecord)
            fmt.Println(data.Values)

            recordCount += 1
            // 每1000条数据提交一次点位信息
            if recordCount%1000 == 0 {
                fmt.Println("Commit offset", record.GetSequence())
                offset.Sequence = record.GetSequence()
                offset.Timestamp = record.GetSystemTime()

                offsetMap := map[string]datahub.SubscriptionOffset{shardId: offset}
                err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap)
                if err != nil {
                    if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
                        fmt.Println("Subscription reset, will reopen...")
                        // 点位被重置,需要重新open session
                        session, err = dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
                        if err != nil {
                            fmt.Println("Reopen subscription session failed", err)
                            break
                        }
                        offset = session.Offsets[shardId]
                    } else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
                        fmt.Println("Subscription used by other one")
                        break
                    } else {
                        fmt.Println("Commit offset failed", err)
                        break
                    }
                }
                recordCount = 0
            }
        }
        cursor = gr.NextCursor
    }
}

错误类型

GO SDKdatahub的错误类型进行了整理,用户可以使用类型断言进行错误类型的判断,然后根据错误的类型进行相应的处理。其中错误类型中,DatahubClientError、LimitExceededError、ServiceTemporaryUnavailableError 属于可重试错误,除此之外,其余error属于不可重试错误,而DatahubClientError中包含部分可重试错误,例如server busy,server unavailable等,因此建议遇到可重试error时,可以在代码逻辑中添加重试逻辑,但应严格限制重试次数。

类名

错误码

描述

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

ResourceExistError

ResourceAlreadyExist

ProjectAlreadyExist

TopicAlreadyExist

ConnectorAlreadyExist

资源已存在(创建时如果资源已存在,就会抛出这个异常。

SeekOutOfRangeError

SeekOutOfRange

getCursor时,给的sequence不在有效范围内(通常数据已过期),或给的timestamp大于当前时间。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

NoPermissionError

NoPermission

OperationDenied

没有权限,通常是RAM配置不正确,或没有正确授权子账号。

NewShardSealedError

InvalidShardOperation

shard 处于CLOSED状态可读不可写,继续往CLOSEDshard 写数据,或读到最后一条数据后继续读取,会抛出该异常。

LimitExceededError

LimitExceeded

接口使用超限,参考限制描述

SubscriptionOfflineError

SubscriptionOffline

订阅处于下线状态不可用。

SubscriptionSessionInvalidError

OffsetSessionChanged

OffsetSessionClosed

订阅会话异常,使用订阅时会建立一个session,用于提交点位,如果有其他客户端使用该订阅,会得到该异常。

SubscriptionOffsetResetError

OffsetReseted

订阅点位被重置。

MalformedRecordError

MalformedRecord

ShardNotReady

非法的 Record 格式,可能的情况有:schema 不正确、包含非utf-8字符、客户端使用pb而服务端不支持等。

ServiceTemporaryUnavailableError

-

一般是网络问题,例如连接异常断开,通常重试即可。

DatahubClientError

其他所有,并且是所有异常的基类

如排除以上异常情况,通常重试即可,但应限制重试次数。

DatahubClientError

DataHub的基础错误类型,所有的error都继承了这个错误类型。DataHub的错误类型除了已经定义的错误类型,其余错误均属于DatahubClientError,其中包括服务器busy、服务器unavailable等可重试错误,用户可以在自己的代码逻辑中添加一些重试机制。

type DatahubClientError struct {
    StatusCode int    `json:"StatusCode"`   // Http status code
    RequestId  string `json:"RequestId"`    // Request-id to trace the request
    Code       string `json:"ErrorCode"`    // Datahub error code
    Message    string `json:"ErrorMessage"` // Error msg of the error code
}

Error示例

func example_error() {
    accessId := ""
    accessKey := ""
    endpoint := ""
    projectName := "datahub_go_test"
    maxRetry := 3
    dh := datahub.New(accessId, accessKey, endpoint)
    if err := dh.CreateProject(projectName, "project comment"); err != nil {
        if _, ok := err.(*datahub.InvalidParameterError); ok {
            fmt.Println("invalid parameter,please check your input parameter")
        } else if _, ok := err.(*datahub.ResourceExistError); ok {
            fmt.Println("project already exists")
        } else if _, ok := err.(*datahub.AuthorizationFailedError); ok {
            fmt.Println("accessId or accessKey err,please check your accessId and accessKey")
        } else if _, ok := err.(*datahub.LimitExceededError); ok {
            fmt.Println("limit exceed, so retry")
            for i := 0; i < maxRetry; i++ {
                // wait 5 seconds
                time.Sleep(5 * time.Second)
                if err := dh.CreateProject(projectName, "project comment"); err != nil {
                    fmt.Println("create project failed")
                    fmt.Println(err)
                } else {
                    fmt.Println("create project successful")
                    break
                }
            }
        } else {
            fmt.Println("unknown error")
            fmt.Println(err)
        }
    } else {
        fmt.Println("create project successful")
    }
}