本文为您介绍DataHub使用GO SDK时需准备的环境以及错误类型。
环境准备
前提准备
需提供云账号AccessId和AccessKey,详情可参见:创建AccessKey。
需提供访问DataHub的服务地址。
需获取DataHub Go SDK包。
go get -u -insecure github.com/aliyun/aliyun-datahub-sdk-go/datahub
初始化DataHub Client
Datahub GO SDK提供的所有API接口均由 datahub.DataHub 接口实现,所以第一步就是初始化一个DataHub对象。
使用默认参数创建DataHub对象
代码示例
import "github.com/aliyun/aliyun-datahub-sdk-go/datahub" accessId := "" accessKey := "" endpoint := "" dh := datahub.New(accessId, accessKey, endpoint)
使用自定义参数创建对象支持的参数对照表:
参数
参数类型
默认值
有效值
描述
UserAgent
string
-
-
用户代理
CompressorType
CompressorType
NOCOMPRESS
NOCOMPRESS(不压缩)、LZ4、DEFLATE、ZLIB
传输时支持的压缩格式。
EnableBinary
bool
true
true/false
主要在put/get record时,使用protobuf协议。DataHub版本未支持protobuf时需要手动指定enable_pb为False
HttpClient
*http.Client
datahub.DefaultHttpClient()
-
具体可参考net/http
endpoint := "" accessId := "" accessKey := "" token := "" account := datahub.NewAliyunAccount(accessId, accessKey) // 临时AK鉴权 // account := datahub.NewStsCredential(accessId, accessKey, token) config := datahub.NewDefaultConfig() config.CompressorType = datahub.DEFLATE config.EnableBinary = true; config.HttpClient = datahub.DefaultHttpClient() dh := datahub.NewClientWithConfig(endpoint, config, account)
更多操作
DataHub GO SDK 支持使用GO MOD进行包管理。
require (
github.com/aliyun/aliyun-datahub-sdk-go/datahub v0.1.4
)
点位消费代码示例
func OffsetConsume() {
accessId := ""
accessKey := ""
endpoint := "https://dh-cn-hangzhou.aliyuncs.com"
dh := datahub.New(accessId, accessKey, endpoint)
projectName := ""
topicName := ""
subId := ""
shardId := "0"
shardIds := []string{"0"}
session, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
if err != nil {
fmt.Println("Open subscription session failed", err)
return
}
offset := session.Offsets[shardId]
var gc *datahub.GetCursorResult = nil
//sequence < 0说明未消费
if offset.Sequence < 0 {
// 获取生命周期内第一条record的cursor
gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("Get oldest cursor failed", err)
return
}
} else {
// 获取下一条记录的Cursor
nextSequence := offset.Sequence + 1
gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, nextSequence)
if err != nil {
//按照SEQUENCE getCursor可能报SeekOutOfRange错误,表示当前cursor的数据已过期
if _, ok := err.(*datahub.SeekOutOfRangeError); ok {
fmt.Println("Get cursor by sequence success for SeekOutOfRangeError, will retry...")
gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("Get oldest cursor failed", err)
return
}
}
}
}
topic, err := dh.GetTopic(projectName, topicName)
if err != nil {
fmt.Println("Get topic failed", err)
return
}
// 读取并保存点位,这里以读取Tuple数据为例,并且每1000条记录保存一次点位
recordCount := 0
limitNum := 100
cursor := gc.Cursor
for true {
gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor, limitNum, topic.RecordSchema)
if err != nil {
fmt.Println("Get records failed", err)
break
}
if gr.RecordCount == 0 {
fmt.Println("No data, sleep 5 seconds...")
time.Sleep(time.Second * 5)
continue
}
for _, record := range gr.Records {
// 处理数据,这里只打印
data, _ := record.(*datahub.TupleRecord)
fmt.Println(data.Values)
recordCount += 1
// 每1000条数据提交一次点位信息
if recordCount%1000 == 0 {
fmt.Println("Commit offset", record.GetSequence())
offset.Sequence = record.GetSequence()
offset.Timestamp = record.GetSystemTime()
offsetMap := map[string]datahub.SubscriptionOffset{shardId: offset}
err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap)
if err != nil {
if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
fmt.Println("Subscription reset, will reopen...")
// 点位被重置,需要重新open session
session, err = dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
if err != nil {
fmt.Println("Reopen subscription session failed", err)
break
}
offset = session.Offsets[shardId]
} else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
fmt.Println("Subscription used by other one")
break
} else {
fmt.Println("Commit offset failed", err)
break
}
}
recordCount = 0
}
}
cursor = gr.NextCursor
}
}
错误类型
GO SDK对datahub的错误类型进行了整理,用户可以使用类型断言进行错误类型的判断,然后根据错误的类型进行相应的处理。其中错误类型中,DatahubClientError、LimitExceededError、ServiceTemporaryUnavailableError 属于可重试错误,除此之外,其余error属于不可重试错误,而DatahubClientError中包含部分可重试错误,例如server busy,server unavailable等,因此建议遇到可重试error时,可以在代码逻辑中添加重试逻辑,但应严格限制重试次数。
类名 | 错误码 | 描述 |
InvalidParameterError |
| 非法参数 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
ResourceExistError |
| 资源已存在(创建时如果资源已存在,就会抛出这个异常。 |
SeekOutOfRangeError |
| getCursor时,给的sequence不在有效范围内(通常数据已过期),或给的timestamp大于当前时间。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
NoPermissionError |
| 没有权限,通常是RAM配置不正确,或没有正确授权子账号。 |
NewShardSealedError |
| shard 处于CLOSED状态可读不可写,继续往CLOSED的shard 写数据,或读到最后一条数据后继续读取,会抛出该异常。 |
LimitExceededError |
| 接口使用超限,参考限制描述。 |
SubscriptionOfflineError |
| 订阅处于下线状态不可用。 |
SubscriptionSessionInvalidError |
| 订阅会话异常,使用订阅时会建立一个session,用于提交点位,如果有其他客户端使用该订阅,会得到该异常。 |
SubscriptionOffsetResetError |
| 订阅点位被重置。 |
MalformedRecordError |
| 非法的 Record 格式,可能的情况有:schema 不正确、包含非utf-8字符、客户端使用pb而服务端不支持等。 |
ServiceTemporaryUnavailableError | - | 一般是网络问题,例如连接异常断开,通常重试即可。 |
DatahubClientError | 其他所有,并且是所有异常的基类 | 如排除以上异常情况,通常重试即可,但应限制重试次数。 |
DatahubClientError
DataHub的基础错误类型,所有的error都继承了这个错误类型。DataHub的错误类型除了已经定义的错误类型,其余错误均属于DatahubClientError,其中包括服务器busy、服务器unavailable等可重试错误,用户可以在自己的代码逻辑中添加一些重试机制。
type DatahubClientError struct {
StatusCode int `json:"StatusCode"` // Http status code
RequestId string `json:"RequestId"` // Request-id to trace the request
Code string `json:"ErrorCode"` // Datahub error code
Message string `json:"ErrorMessage"` // Error msg of the error code
}
Error示例
func example_error() {
accessId := ""
accessKey := ""
endpoint := ""
projectName := "datahub_go_test"
maxRetry := 3
dh := datahub.New(accessId, accessKey, endpoint)
if err := dh.CreateProject(projectName, "project comment"); err != nil {
if _, ok := err.(*datahub.InvalidParameterError); ok {
fmt.Println("invalid parameter,please check your input parameter")
} else if _, ok := err.(*datahub.ResourceExistError); ok {
fmt.Println("project already exists")
} else if _, ok := err.(*datahub.AuthorizationFailedError); ok {
fmt.Println("accessId or accessKey err,please check your accessId and accessKey")
} else if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("limit exceed, so retry")
for i := 0; i < maxRetry; i++ {
// wait 5 seconds
time.Sleep(5 * time.Second)
if err := dh.CreateProject(projectName, "project comment"); err != nil {
fmt.Println("create project failed")
fmt.Println(err)
} else {
fmt.Println("create project successful")
break
}
}
} else {
fmt.Println("unknown error")
fmt.Println(err)
}
} else {
fmt.Println("create project successful")
}
}