本文主要介绍如何使用Java SDK最小化的体验通道服务, 完整的代码在文末的附录中。

  1. 初始化 Tunnel Client。
    // endPoint 为表格存储实例 endPoint,如https://instance.cn-hangzhou.ots.aliyuncs.com。
    // accessKeyId 和 accessKeySecret 分别为访问表格存储服务的 AccessKey 的 Id 和 Secret。
    // instanceName 为实例名称。
    final String endPoint = "";
    final String accessKeyId = "";
    final String accessKeySecret = "";
    final String instanceName = "";
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. 创建新通道。

    请提前建好一张测试表或者使用已有的一张表。若需要新建测试表,可以使用SyncClient 中的 createTable 方法或者使用官网控制台等方式来创建。

    // 支持创建三种类型的 Tunnel:TunnelType.BaseData(全量)、TunnelType.Stream(增量)、 TunnelType.BaseAndStream(全量加增量)。
    // 以下示例为创建全量加增量类型的 Tunnel, 若需创建其它类型的 Tunnel, 则将 CreateTunnelRequest 中的TunnelType 设置为相应的类型即可。
    final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    // tunnelId 用于后续 TunnelWorker 的初始化,该值同样可以通过 ListTunnel 或者 DescribeTunnel 获取。
    String tunnelId = resp.getTunnelId(); 
    System.out.println("Create Tunnel, Id: " + tunnelId);
  3. 用户自定义数据消费Callback, 开始自动化的数据消费。
    // 用户自定义数据消费Callback, 即实现IChannelProcessor接口(process和shutdown)
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            // 注: ProcessRecordsInput里面包含有拉取到的数据。
            System.out.println("Default record processor, would print records count");
            System.out.println(
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                // Mock Record Process.
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    // TunnelWorkerConfig默认会启动读数据和处理数据的线程池,若单台机器会启动多个TunnelWorker,
    // 强烈建议共用一个TunnelWorkerConfig.TunnelWorkerConfig里面还有更多的高级参数,
    TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    // 配置TunnelWorker,并启动自动化的数据处理任务。
    TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        worker.shutdown();
        tunnelClient.shutdown();
    }
  4. 注意事项
    • TunnelWorkerConfig里面默认会启动读数据和处理数据的线程池,若单台机器会启动多个TunnelWorker, 强烈建议共用一个TunnelWorkerConfig。
    • 在创建全量+增量类型的Tunnel时,由于Tunnel的增量日志最多会保留7天(具体的值和表的Stream的过期时间一致),全量数据如果在7天内没有消费完成,则此Tunnel进入增量阶段会出现OTSTunnelExpired的错误,导致增量数据无法消费。若您预估全量数据无法在7天内完成,请及时联系 表格存储技术支持或添加钉钉群11789671进行咨询。
    • TunnelWorker会有一个初始化的预热时间,这个值受到TunnelWorkerConfig里面的heartbeatIntervalInSec参数影响,可以通过TunnelWorkerConfig里面的setHeartbeatIntervalInSec方法配置,默认为30s,最小支持调整到5s,具体的原理可以参见数据消费框架原理介绍数据消费框架配置详解
    • 当Tunnel从全量切换至增量阶段时,全量的Channel会结束,增量的Channel被拉起,这个阶段会有初始化时间,同样的,这个值受到TunnelWorkerConfig里面的heartbeatIntervalInSec参数影响。
    • 当客户端(TunnelWorker)没有被正常shutdown时(例如异常退出或者手动结束), TunnelWorker会自动帮用户进行资源的回收,包括释放线程池,自动调用用户在Channel上注册的shutdown方法,关闭Tunnel连接等。
  5. 附录:完整代码
    import com.alicloud.openservices.tablestore.TunnelClient;
    import com.alicloud.openservices.tablestore.TunnelClient;
    import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
    import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
    import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
    import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
    import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
    import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
    import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
    public class TunnelQuickStart {
        private static class SimpleProcessor implements IChannelProcessor {
            @Override
            public void process(ProcessRecordsInput input) {
                System.out.println("Default record processor, would print records count");
                System.out.println(
                    String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
                try {
                    // Mock Record Process.
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            @Override
            public void shutdown() {
                System.out.println("Mock shutdown");
            }
        }
        public static void main() throws Exception {
            // 1. 初始化Tunnel Client。
            final String endPoint = "";
            final String accessKeyId = "";
            final String accessKeySecret = "";
            final String instanceName = "";
            TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
            // 2. 创建Tunnel(此步骤需要提前建好一张测试表,可以使用SyncClient的createTable或者使用官网控制台等方式来创建)。
            final String tableName = "testTable";
            final String tunnelName = "testTunnel";
            CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
            CreateTunnelResponse resp = tunnelClient.createTunnel(request);
            // tunnelId会用于后续TunnelWorker的初始化, 该值同样可以通过ListTunnel或者DescribeTunnel获取。
            String tunnelId = resp.getTunnelId();
            System.out.println("Create Tunnel, Id: " + tunnelId);
            // 3. 用户自定义数据消费Callback, 开始自动化的数据消费。
            // TunnelWorkerConfig里面还有更多的高级参数,这里不做展开,会有专门的文档介绍。
            TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
            TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
            try {
                worker.connectAndWorking();
            } catch (Exception e) {
                e.printStackTrace();
                worker.shutdown();
                tunnelClient.shutdown();
            }
        }
    }