文档

快速入门

更新时间:

在表格存储管理控制台快速体验通道服务功能。

创建数据通道

  1. 登录表格存储控制台
  2. 概览页面,单击实例名称或在操作列单击实例管理

  3. 实例详情页签的数据表列表区域,单击数据表名称后选择实时消费通道页签或单击fig_001后选择实时消费通道

  4. 实时消费通道页签,单击创建通道

  5. 创建通道对话框,输入通道名,并选择通道类型

    通道服务提供了增量全量全量加增量三种类型的分布式数据实时消费通道。本文以全量加增量类型为例介绍。

    创建成功后,在操作列单击展示通道分区列表,可以查看通道中的数据内容、消费延迟监控以及通道分区下的消费数据行数统计。fig_tunnel_service

预览通道中的数据格式

创建通道后,通过模拟数据消费可以预览通道中的数据格式。

  1. 写入或删除数据,详情请参见控制台读写数据

  2. 预览通道中的数据格式。

    1. 概览页页面,单击实例名称或在操作列单击实例管理

    2. 实例详情页签的数据表列表区域,单击数据表名称后选择实时消费通道页签或单击fig_001后选择实时消费通道

    3. 实时消费通道页签,单击通道操作列的展示通道分区列表

    4. 在通道分区的右侧单击模拟消费

    5. 模拟消费对话框,单击开始消费

      消费的数据信息显示在对话框中,如下图所示。fig_consume_001

开启通道的数据消费

  1. 在通道列表中复制通道ID。

  2. 使用任一种语言的通道SDK,开启通道的数据消费。

    此处以Java SDK为例开启通道的数据消费。

    import com.alicloud.openservices.tablestore.TunnelClient;
    import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
    import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
    import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
    import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
    
    public class TunnelQuickStart {
    
        /**
         * 用户自定义数据消费Callback,即实现IChannelProcessor接口(process和shutdown)。
         */
        private static class SimpleProcessor implements IChannelProcessor {
            @Override
            public void process(ProcessRecordsInput input) {
                System.out.println("Default record processor, would print records count");
                System.out.println(
                        String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
                try {
                    //模拟消费处理。
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public void shutdown() {
                System.out.println("Mock shutdown");
            }
        }
    
        public static void main(String[] args) {
            // 1. 初始化Tunnel Client。
            TunnelClient tunnelClient = new TunnelClient("<ENDPOINT>", "<ACCESS_ID>", "<ACCESS_KEY>", "<INSTANCE_NAME>");
    
            // 2. 用户自定义数据消费Callback, 开始自动化的数据消费。
            //强烈建议共用一个TunnelWorkerConfig,TunnelWorkerConfig中包括更多的高级参数。
            TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
            //配置TunnelWorker,并启动自动化的数据处理任务。
            //TUNNEL_ID,通道id。
            TunnelWorker worker = new TunnelWorker("<TUNNEL_ID>", tunnelClient, config);
            try {
                worker.connectAndWorking();
            } catch (Exception e) {
                e.printStackTrace();
                worker.shutdown();
                tunnelClient.shutdown();
            }
        }
    }

查看数据消费日志

数据消费后,可以查看增量数据消费日志,例如消费统计、增量通道分区最新同步时间等。在控制台或者使用describeTunnel接口也可以查看消费延迟、通道分区下的消费数据行数更新。