本文为您展示DataHub的 Go SDK的offset操作。
初始化offset
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称。 |
subId | string | The id of the subscription. |
shardIds | string | The id list of the shards. |
返回示例
type OpenSubscriptionSessionResult struct {
Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
// SubscriptionOffset
type SubscriptionOffset struct {
Timestamp int64 `json:"Timestamp"`
Sequence int64 `json:"Sequence"`
VersionId int64 `json:"Version"`
SessionId *int64 `json:"SessionId"`
}
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类。 |
InvalidParameterError |
| 非法参数。 |
代码示例
func openOffset(dh datahub.DataHub, projectName, topicName string) {
subId := "1565580329258VXSY8"
shardIds := []string{"0", "1", "2"}
oss, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
if err != nil {
fmt.Println("open session failed")
fmt.Println(err)
return
}
fmt.Println("open session successful")
fmt.Println(oss)
}
获取offset
获取subscription的当前点位信息。与OpenSubscriptionSession不同的是,GetSubscriptionOffse获取的点位信息中SubscriptionOffset的SessionId为nil,是无法进行commit点位操作的,因此GetSubscriptionOffset一般用来查看点位信息。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称。 |
subId | string | The id of the subscription. |
shardIds | string | The id list of the shards. |
返回示例
type OpenSubscriptionSessionResult struct {
Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
// SubscriptionOffset
type SubscriptionOffset struct {
Timestamp int64 `json:"Timestamp"`
Sequence int64 `json:"Sequence"`
VersionId int64 `json:"Version"`
SessionId *int64 `json:"SessionId"`
}
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类。 |
InvalidParameterError |
| 非法参数。 |
代码示例
func getOffset(dh datahub.DataHub, projectName, topicName string) {
subId := "1565580329258VXSY8"
shardIds := []string{"0", "1", "2"}
gss, err := dh.GetSubscriptionOffset(projectName, topicName, subId, shardIds)
if err != nil {
fmt.Println("get session failed")
fmt.Println(err)
return
}
fmt.Println("get session successful")
fmt.Println(gss)
}
更新offset
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称。 |
subId | string | The id of the subscription. |
offsets | map[string]SubscriptionOffset | The offset map of shards. |
返回示例
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类。 |
InvalidParameterError |
| 非法参数。 |
代码示例
func updateOffset() {
shardIds := []string{"0", "1", "2"}
oss, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
if err != nil {
fmt.Println("open session failed")
fmt.Println(err)
}
fmt.Println("open session successful")
fmt.Println(oss)
offset := oss.Offsets["0"]
// set offset message
offset.Sequence = 900
offset.Timestamp = 1565593166690
offsetMap := map[string]datahub.SubscriptionOffset{
"0": offset,
}
if err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap); err != nil {
if _, ok := err.(*datahub.SubscriptionOfflineError); ok {
fmt.Println("the subscription has offline")
} else if _, ok := err.(*datahub.SubscriptionSessionInvalidError); ok {
fmt.Println("the subscription is open elsewhere")
} else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
fmt.Println("the subscription is reset elsewhere")
} else {
fmt.Println(err)
}
fmt.Println("update offset failed")
return
}
fmt.Println("update offset successful")
}
重置offset
重置offset可以将offset重置到某个时间点上,重置之后,获取的offset信息中,VersionId会+1,之前的session失效,无法进行更新点位操作。
参数说明
参数名 | 参数类型 | 参数说明 |
projectName | String | 项目名称。 |
topicName | string | Topic名称。 |
subId | string | The id of the subscription. |
offsets | map[string]SubscriptionOffset | The offset map of shards. |
返回示例
错误说明
错误类名 | 错误码 | 错误说明 |
ResourceNotFoundError |
| 访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。 |
AuthorizationFailedError |
| Authorization 签名解析异常,检查AK是否填写正确。 |
DatahubClientError | - | 其他所有,并且是所有异常的基类。 |
InvalidParameterError |
| 非法参数。 |
代码示例
func resetOffset(dh datahub.DataHub, projectName, topicName string) {
subId := "1565580329258VXSY8"
offset := datahub.SubscriptionOffset{
Timestamp: 1565593166690,
}
offsetMap := map[string]datahub.SubscriptionOffset{
"1": offset,
}
if err := dh.ResetSubscriptionOffset(projectName, topicName, subId, offsetMap); err != nil {
fmt.Println("reset offset failed")
fmt.Println(err)
return
}
fmt.Println("reset offset successful")
}