本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
通过通道服务,您可以消费表中的数据。本文介绍如何使用Go SDK快速体验通道服务。使用前,您需要了解使用通道服务的注意事项。
注意事项
TunnelWorkerConfig 中默认会启动读数据和处理数据的线程池。如果使用的是单台机器,当需要启动多个 TunnelWorker 时,建议共用一个 TunnelWorkerConfig。
TunnelWorker 的初始化需要预热时间,该值受 TunnelWorkerConfig 中的 HeartbeatInterval 参数影响,默认为 30 s。
当客户端(TunnelWorker)没有被正常 shutdown 时(例如异常退出或者手动结束),TunnelWorker 会自动进行资源的回收,包括释放线程池,自动调用用户在 Channel 上注册的 shutdown 方法,关闭 Tunnel 连接等。
Tunnel 的增量日志保留时间,其数值与数据表中 Stream 的日志过期时长(最长时长为 7 天)保持一致,因此 Tunnel 的增量日志最多保留 7 天。
增量或者全量加增量类型 Tunnel 消费数据时,可能会出现以下情况:
当 Tunnel 处于全量阶段时,如果全量数据在增量日志保留时间内(最多保留 7 天)未能完成消费,将会触发
OTSTunnelExpired
错误,从而导致无法继续消费后续数据。如果您预计全量数据无法在指定时间内完成消费,请及时联系表格存储技术支持或者加入钉钉群 23307953(表格存储技术交流群-2)进行咨询。
当 Tunnel 处于增量阶段时,如果增量数据在增量日志保留时间内(最多保留 7 天)未能完成消费,Tunnel 将可能从最近可消费的数据处开始消费,因此存在漏消费数据的风险。
Tunnel 过期后,表格存储可能会禁用该 Tunnel。如果禁用状态持续超过 30 天,则该 Tunnel 将被彻底删除,删除后将无法恢复。
前提条件
已创建数据表。具体操作,请参见使用控制台创建数据表、使用命令行工具创建数据表或使用 SDK 创建数据表。
已获取实例域名地址(Endpoint)。具体操作,请参见获取实例 Endpoint。
已配置访问凭证。具体操作,请参见配置访问凭证。
体验通道服务
初始化 Tunnel client。
初始化 Tunnel client时,您可以使用长期访问凭证或者临时访问凭证进行签名认证。
使用长期访问凭证初始化
在运行本代码示例之前,请确保已设置环境变量
TABLESTORE_ACCESS_KEY_ID
和TABLESTORE_ACCESS_KEY_SECRET
,这两个变量分别对应阿里云账号或 RAM 用户的 AccessKey ID 和 AccessKey Secret。警告阿里云账号拥有资源的全部权限,AK 一旦泄露,会给系统带来巨大风险,不建议使用。推荐使用最小化授权的 RAM 用户的 AK。
//endpoint是表格存储实例endpoint,例如https://instance.cn-hangzhou.ots.aliyun.com。 //instance是实例名称。 //accessKeyId和accessKeySecret分别为阿里云账号或者RAM用户的AccessKey ID和AccessKey Secret。 endpoint := "yourEndpoint" instance := "yourInstance" accessKeyId := os.Getenv("TABLESTORE_ACCESS_KEY_ID") accessKeySecret := os.Getenv("TABLESTORE_ACCESS_KEY_SECRET") tunnelClient := tunnel.NewTunnelClient(endpoint, instance, accessKeyId, accessKeySecret)
使用临时访问凭证初始化
当您临时使用 GO SDK 访问 Tablestore 服务时,您可以通过 STS 服务颁发一个 STS 临时访问凭证。具体操作,请参见临时访问凭证。
Tunnel Client 内提供了 NewTunnelClientWithToken 接口用于使用临时访问凭证初始化 Tunnel Client。为了帮助您更好的使用该接口,文档中提供了一个带刷新临时访问凭证的示例代码。完整代码请参见附录:使用临时访问凭证初始化 Tunnel Client 的示例代码。
创建通道。
req := &tunnel.CreateTunnelRequest{ TableName: "<TABLE_NAME>", TunnelName: "<TUNNEL_NAME>", Type: tunnel.TunnelTypeBaseStream, //创建全量加增量类型的Tunnel。 } resp, err := tunnelClient.CreateTunnel(req) if err != nil { log.Fatal("create test tunnel failed", err) } log.Println("tunnel id is", resp.TunnelId)
根据业务自定义数据消费Callback函数,开始自动化的数据消费。
//根据业务自定义数据消费callback函数。 func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error { fmt.Println("user-defined information", ctx.CustomValue) for _, rec := range records { fmt.Println("tunnel record detail:", rec.String()) } fmt.Println("a round of records consumption finished") return nil } //配置callback到SimpleProcessFactory,配置消费端TunnelWorkerConfig。 workConfig := &tunnel.TunnelWorkerConfig{ ProcessorFactory: &tunnel.SimpleProcessFactory{ CustomValue: "user custom interface{} value", ProcessFunc: exampleConsumeFunction, }, } //使用TunnelDaemon持续消费指定tunnel。 tunnelId := "<TUNNEL_ID>" daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig) log.Fatal(daemon.Run())
附录:使用临时访问凭证初始化 Tunnel Client 的示例代码
import (
otscommon "github.com/aliyun/aliyun-tablestore-go-sdk/common"
"github.com/aliyun/aliyun-tablestore-go-sdk/tunnel"
"sync"
"time"
)
type RefreshClient struct {
lastRefresh time.Time
refreshIntervalInMin int
}
func NewRefreshClient(intervalInMin int) *RefreshClient {
return &RefreshClient{
refreshIntervalInMin: intervalInMin,
}
}
func (c *RefreshClient) IsExpired() bool {
now := time.Now()
if c.lastRefresh.IsZero() || now.Sub(c.lastRefresh) > time.Duration(c.refreshIntervalInMin)*time.Minute {
return true
}
return false
}
func (c *RefreshClient) Update() {
c.lastRefresh = time.Now()
}
type clientCredentials struct {
accessKeyID string
accessKeySecret string
securityToken string
}
func newClientCredentials(accessKeyID string, accessKeySecret string, securityToken string) *clientCredentials {
return &clientCredentials{accessKeyID: accessKeyID, accessKeySecret: accessKeySecret, securityToken: securityToken}
}
func (c *clientCredentials) GetAccessKeyID() string {
return c.accessKeyID
}
func (c *clientCredentials) GetAccessKeySecret() string {
return c.accessKeySecret
}
func (c *clientCredentials) GetSecurityToken() string {
return c.securityToken
}
type OTSCredentialsProvider struct {
refresh *RefreshClient
cred *clientCredentials
lock sync.Mutex
}
func NewOTSCredentialsProvider() *OTSCredentialsProvider {
return &OTSCredentialsProvider{
// 按需调整临时访问凭证刷新周期,需要小于StsToken的过期时间。
refresh: NewRefreshClient(30),
}
}
func (p *OTSCredentialsProvider) renewCredentials() error {
if p.cred == nil || p.refresh.IsExpired() {
// 此处需要获取用户的StsToken。调用RAM的AssumeRole接口会返回AccessKeyId、AccessKeySecret、SecurityToken和Expiration信息。
// 获取临时访问凭证后填写以下参数。关于RAM(STS) SDK的更多信息请参见RAM文档。
// resp, err := GetUserOtsStsToken()
accessKeyId := ""
accessKeySecret := ""
stsToken := ""
p.cred = newClientCredentials(accessKeyId, accessKeySecret, stsToken)
p.refresh.Update()
}
return nil
}
func (p *OTSCredentialsProvider) GetCredentials() otscommon.Credentials {
p.lock.Lock()
defer p.lock.Unlock()
if err := p.renewCredentials(); err != nil {
// log error
if p.cred == nil {
return newClientCredentials("", "", "")
}
}
return p.cred
}
// NewTunnelClientWithToken用于初始化一个带StsToken刷新功能的TunnelClient。
func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string) tunnel.TunnelClient {
return tunnel.NewTunnelClientWithToken(
endpoint,
instanceName,
"",
"",
"",
nil,
tunnel.SetCredentialsProvider(NewOTSCredentialsProvider()),
)
}