使用Go SDK快速体验通道服务。使用前,您需要了解使用通道服务的注意事项。

注意事项

  • TunnelWorkerConfig中默认会启动读数据和处理数据的线程池。如果使用的是单台机器,则会启动多个TunnelWorker,强烈建议共用一个TunnelWorkerConfig。
  • 在创建全量加增量类型的Tunnel时,由于Tunnel的增量日志最多会保留7天(具体的值和数据表的Stream的日志过期时间一致),全量数据如果在7天内没有消费完成,则此Tunnel进入增量阶段会出现OTSTunnelExpired错误,导致增量数据无法消费。如果您预估全量数据无法在7天内消费完成,请及时联系表格存储技术支持或者加入钉钉群23307953(表格存储技术交流群-2)进行咨询。
  • TunnelWorker的初始化需要预热时间,该值受TunnelWorkerConfig中的HeartbeatInterval参数影响,可以通过TunnelWorkerConfig中的time方法配置,默认值为30s,最小值为5s。
  • 当Tunnel从全量切换至增量阶段时,全量的Channel会结束,增量的Channel会启动,此阶段会有初始化时间,该值也受TunnelWorkerConfig中的HeartbeatInterval参数影响。
  • 当客户端(TunnelWorker)没有被正常shutdown时(例如异常退出或者手动结束),TunnelWorker会自动进行资源的回收,包括释放线程池,自动调用用户在Channel上注册的shutdown方法,关闭Tunnel连接等。

体验通道服务

  1. 初始化Tunnel client。
    //endpoint是表格存储实例endpoint,例如https://instance.cn-hangzhou.ots.aliyun.com。
    //instance是实例名称。
    //accessKeyId和accessKeySecret分别为访问表格存储服务的AccessKey的Id和Secret。
    tunnelClient := tunnel.NewTunnelClient(endpoint, instance,
       accessKeyId, accessKeySecret)                    
  2. 创建通道。
    req := &tunnel.CreateTunnelRequest{
       TableName:  "testTable",
       TunnelName: "testTunnel",
       Type:       tunnel.TunnelTypeBaseStream, //创建全量加增量类型的Tunnel。
    }
    resp, err := tunnelClient.CreateTunnel(req)
    if err != nil {
       log.Fatal("create test tunnel failed", err)
    }
    log.Println("tunnel id is", resp.TunnelId)
  3. 根据业务自定义数据消费Callback函数,开始自动化的数据消费。
    //根据业务自定义数据消费callback函数。
    func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error {
        fmt.Println("user-defined information", ctx.CustomValue)
        for _, rec := range records {
            fmt.Println("tunnel record detail:", rec.String())
        }
        fmt.Println("a round of records consumption finished")
        return nil
    }
    
    //配置callback到SimpleProcessFactory,配置消费端TunnelWorkerConfig。
    workConfig := &tunnel.TunnelWorkerConfig{
       ProcessorFactory: &tunnel.SimpleProcessFactory{
          CustomValue: "user custom interface{} value",
          ProcessFunc: exampleConsumeFunction,
       },
    }
    
    //使用TunnelDaemon持续消费指定tunnel。
    daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig)
    log.Fatal(daemon.Run())