快速开始

使用Go SDK快速体验通道服务。使用前,您需要了解使用通道服务的注意事项。

注意事项

  • TunnelWorkerConfig中默认会启动读数据和处理数据的线程池。如果使用的是单台机器,则会启动多个TunnelWorker,强烈建议共用一个TunnelWorkerConfig。

  • 由于Tunnel的增量日志最多会保留7天(具体的值和数据表的Stream的日志过期时间一致),因此在使用全量加增量类型或者增量类型的Tunnel消费数据时,会出现如下情况:

    • Tunnel处于全量阶段时,如果全量数据在7天内没有消费完成,则此Tunnel会出现OTSTunnelExpired错误,导致后续数据无法继续消费。如果您预估全量数据无法在7天内消费完成,请及时联系表格存储技术支持或者加入钉钉群23307953(表格存储技术交流群-2)进行咨询。

    • Tunnel处于增量阶段时,如果增量数据超过7天没有消费,Tunnel会从最近可以消费的数据开始消费,因此可能会出现漏消费数据风险。

      说明

      如果增量数据超过7天(具体值和数据表的Stream的日志过期时间一致)没有消费,则数据会出现过期的情况,当Tunnel过期超过一段时间(默认7天)后,表格存储会禁用掉该Tunnel,即该Tunnel不能再用于消费数据。

  • TunnelWorker的初始化需要预热时间,该值受TunnelWorkerConfig中的HeartbeatInterval参数影响,可以通过TunnelWorkerConfig中的time方法配置,默认值为30s,最小值为5s。

  • Tunnel从全量切换至增量阶段时,全量的Channel会结束,增量的Channel会启动,此阶段会有初始化时间,该值也受TunnelWorkerConfig中的HeartbeatInterval参数影响。

  • 当客户端(TunnelWorker)没有被正常shutdown时(例如异常退出或者手动结束),TunnelWorker会自动进行资源的回收,包括释放线程池,自动调用用户在Channel上注册的shutdown方法,关闭Tunnel连接等。

前提条件

  • 已安装通道服务。具体操作,请参见安装

  • 已确定要使用的Endpoint。具体操作,请参见确定Endpoint

  • 已配置密钥。具体操作,请参见初始化

  • 已将密钥配置到环境变量中。具体操作,请参见初始化

    表格存储使用OTS_AK_ENV环境变量名表示阿里云账号或者RAM用户的AccessKey ID,使用OTS_SK_ENV环境变量名表示对应AccessKey Secret,请根据实际配置。

体验通道服务

  1. 初始化Tunnel client。

    初始化Tunnel client时,您可以使用阿里云账号以及RAM用户的AccessKey(包括AccessKey IDAccessKey Secret)或者使用从STS获取的临时访问凭证(包括AccessKey ID、AccessKey SecretSecurityToken)进行签名认证。

    • 通过阿里云账号以及RAM用户的AccessKey进行初始化

      重要
      • 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维。

      • 阿里云账号的AccessKey泄露会威胁该账号下所有资源的安全。为保证账号安全,强烈建议您给RAM用户创建AccessKey,不要给阿里云账号创建AccessKey。具体操作,请参见创建RAM用户的AccessKey

      获取到AccessKey IDAccessKey Secret后,您可以按照如下示例代码初始化Tunnel client。

      //endpoint是表格存储实例endpoint,例如https://instance.cn-hangzhou.ots.aliyun.com。
      //instance是实例名称。
      //accessKeyIdaccessKeySecret分别为阿里云账号或者RAM用户的AccessKey IDAccessKey Secret。
      accessKeyId := os.Getenv("OTS_AK_ENV")
      accessKeySecret := os.Getenv("OTS_SK_ENV")
      tunnelClient := tunnel.NewTunnelClient(endpoint, instance, accessKeyId, accessKeySecret)                    
    • 通过从STS获取的临时访问凭证进行初始化

      说明

      如果要进行临时访问授权,请使用此方式进行初始化。关于配置临时用户权限的更多信息,请参见通过RAM PolicyRAM用户授权

      Tunnel Client内提供了NewTunnelClientWithToken接口用于使用临时访问凭证初始化Tunnel Client。为了帮助您更好的使用该接口,文档中提供了一个带刷新临时访问凭证的示例代码。完整代码请参见附录:使用临时访问凭证初始化Tunnel Client的示例代码

  2. 创建通道。

    req := &tunnel.CreateTunnelRequest{
       TableName:  "testTable",
       TunnelName: "testTunnel",
       Type:       tunnel.TunnelTypeBaseStream, //创建全量加增量类型的Tunnel。
    }
    resp, err := tunnelClient.CreateTunnel(req)
    if err != nil {
       log.Fatal("create test tunnel failed", err)
    }
    log.Println("tunnel id is", resp.TunnelId)
  3. 根据业务自定义数据消费Callback函数,开始自动化的数据消费。

    //根据业务自定义数据消费callback函数。
    func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error {
        fmt.Println("user-defined information", ctx.CustomValue)
        for _, rec := range records {
            fmt.Println("tunnel record detail:", rec.String())
        }
        fmt.Println("a round of records consumption finished")
        return nil
    }
    
    //配置callbackSimpleProcessFactory,配置消费端TunnelWorkerConfig。
    workConfig := &tunnel.TunnelWorkerConfig{
       ProcessorFactory: &tunnel.SimpleProcessFactory{
          CustomValue: "user custom interface{} value",
          ProcessFunc: exampleConsumeFunction,
       },
    }
    
    //使用TunnelDaemon持续消费指定tunnel。
    daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig)
    log.Fatal(daemon.Run())

附录:使用临时访问凭证初始化Tunnel Client的示例代码

说明

关于调用RAMAssumeRole接口从STS获取临时访问凭证的更多信息,请参见STS SDK概览AssumeRole - 获取扮演角色的临时身份凭证

import (
    otscommon "github.com/aliyun/aliyun-tablestore-go-sdk/common"
    "github.com/aliyun/aliyun-tablestore-go-sdk/tunnel"
    "sync"
    "time"
)

type RefreshClient struct {
    lastRefresh          time.Time
    refreshIntervalInMin int
}

func NewRefreshClient(intervalInMin int) *RefreshClient {
    return &RefreshClient{
        refreshIntervalInMin: intervalInMin,
    }
}

func (c *RefreshClient) IsExpired() bool {
    now := time.Now()
    if c.lastRefresh.IsZero() || now.Sub(c.lastRefresh) > time.Duration(c.refreshIntervalInMin)*time.Minute {
        return true
    }

    return false
}

func (c *RefreshClient) Update() {
    c.lastRefresh = time.Now()
}

type clientCredentials struct {
    accessKeyID     string
    accessKeySecret string
    securityToken   string
}

func newClientCredentials(accessKeyID string, accessKeySecret string, securityToken string) *clientCredentials {
    return &clientCredentials{accessKeyID: accessKeyID, accessKeySecret: accessKeySecret, securityToken: securityToken}
}

func (c *clientCredentials) GetAccessKeyID() string {
    return c.accessKeyID
}

func (c *clientCredentials) GetAccessKeySecret() string {
    return c.accessKeySecret
}

func (c *clientCredentials) GetSecurityToken() string {
    return c.securityToken
}

type OTSCredentialsProvider struct {
    refresh *RefreshClient
    cred    *clientCredentials
    lock    sync.Mutex
}

func NewOTSCredentialsProvider() *OTSCredentialsProvider {
    return &OTSCredentialsProvider{
        // 按需调整临时访问凭证刷新周期,需要小于StsToken的过期时间。
        refresh: NewRefreshClient(30),
    }
}

func (p *OTSCredentialsProvider) renewCredentials() error {
    if p.cred == nil || p.refresh.IsExpired() {
        // 此处需要获取用户的StsToken。调用RAM的AssumeRole接口会返回AccessKeyId、AccessKeySecret、SecurityToken和Expiration信息。
        // 获取临时访问凭证后填写以下参数。关于RAM(STS) SDK的更多信息请参见RAM文档。 
        // resp, err := GetUserOtsStsToken()
        accessKeyId := ""
        accessKeySecret := ""
        stsToken := ""
        p.cred = newClientCredentials(accessKeyId, accessKeySecret, stsToken)
        p.refresh.Update()
    }

    return nil
}

func (p *OTSCredentialsProvider) GetCredentials() otscommon.Credentials {
    p.lock.Lock()
    defer p.lock.Unlock()

    if err := p.renewCredentials(); err != nil {
        // log error
        if p.cred == nil {
            return newClientCredentials("", "", "")
        }
    }

    return p.cred
}

// NewTunnelClientWithToken用于初始化一个带StsToken刷新功能的TunnelClient。
func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string) tunnel.TunnelClient {
    return tunnel.NewTunnelClientWithToken(
        endpoint,
        instanceName,
        "",
        "",
        "",
        nil,
        tunnel.SetCredentialsProvider(NewOTSCredentialsProvider()),
    )
}