offset操作

本文为您展示DataHub的 Go SDKoffset操作。

初始化offset

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称。

subId

string

The id of the subscription.

shardIds

string

The id list of the shards.

返回示例

type OpenSubscriptionSessionResult struct {
    Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
// SubscriptionOffset
type SubscriptionOffset struct {
    Timestamp int64  `json:"Timestamp"`
    Sequence  int64  `json:"Sequence"`
    VersionId int64  `json:"Version"`
    SessionId *int64 `json:"SessionId"`
}

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

其他所有,并且是所有异常的基类。

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func openOffset(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    shardIds := []string{"0", "1", "2"}
    oss, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("open session failed")
        fmt.Println(err)
        return
    }
    fmt.Println("open session successful")
    fmt.Println(oss)
}

获取offset

获取subscription的当前点位信息。与OpenSubscriptionSession不同的是,GetSubscriptionOffse获取的点位信息中SubscriptionOffsetSessionIdnil,是无法进行commit点位操作的,因此GetSubscriptionOffset一般用来查看点位信息。

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称。

subId

string

The id of the subscription.

shardIds

string

The id list of the shards.

返回示例

type OpenSubscriptionSessionResult struct {
    Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
// SubscriptionOffset
type SubscriptionOffset struct {
    Timestamp int64  `json:"Timestamp"`
    Sequence  int64  `json:"Sequence"`
    VersionId int64  `json:"Version"`
    SessionId *int64 `json:"SessionId"`
}

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

其他所有,并且是所有异常的基类。

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func getOffset(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    shardIds := []string{"0", "1", "2"}
    gss, err := dh.GetSubscriptionOffset(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("get session failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get session successful")
    fmt.Println(gss)
}

更新offset

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称。

subId

string

The id of the subscription.

offsets

map[string]SubscriptionOffset

The offset map of shards.

返回示例

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

其他所有,并且是所有异常的基类。

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func updateOffset() {
    shardIds := []string{"0", "1", "2"}
    oss, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("open session failed")
        fmt.Println(err)
    }
    fmt.Println("open session successful")
    fmt.Println(oss)
    offset := oss.Offsets["0"]
    // set offset message
    offset.Sequence = 900
    offset.Timestamp = 1565593166690
    offsetMap := map[string]datahub.SubscriptionOffset{
        "0": offset,
    }
    if err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap); err != nil {
        if _, ok := err.(*datahub.SubscriptionOfflineError); ok {
            fmt.Println("the subscription has offline")
        } else if _, ok := err.(*datahub.SubscriptionSessionInvalidError); ok {
            fmt.Println("the subscription is open elsewhere")
        } else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
            fmt.Println("the subscription is reset elsewhere")
        } else {
            fmt.Println(err)
        }
        fmt.Println("update offset failed")
        return
    }
    fmt.Println("update offset successful")
}

重置offset

重置offset可以将offset重置到某个时间点上,重置之后,获取的offset信息中,VersionId会+1,之前的session失效,无法进行更新点位操作。

参数说明

参数名

参数类型

参数说明

projectName

String

项目名称。

topicName

string

Topic名称。

subId

string

The id of the subscription.

offsets

map[string]SubscriptionOffset

The offset map of shards.

返回示例

错误说明

错误类名

错误码

错误说明

ResourceNotFoundError

ResourceNotFound

NoSuchProject

NoSuchTopic

NoSuchShard

NoSuchSubscription

NoSuchConnector

NoSuchMeteringInfo

访问的资源不存在(注:进行Split/Merge操作后,立即发送其他请求,有可能会抛出该异常 )。

AuthorizationFailedError

Unauthorized

Authorization 签名解析异常,检查AK是否填写正确。

DatahubClientError

-

其他所有,并且是所有异常的基类。

InvalidParameterError

InvalidParameter

InvalidCursor

非法参数。

代码示例

func resetOffset(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    offset := datahub.SubscriptionOffset{
        Timestamp: 1565593166690,
    }
    offsetMap := map[string]datahub.SubscriptionOffset{
        "1": offset,
    }
    if err := dh.ResetSubscriptionOffset(projectName, topicName, subId, offsetMap); err != nil {
        fmt.Println("reset offset failed")
        fmt.Println(err)
        return
    }
    fmt.Println("reset offset successful")
}