通过SDK使用通道服务

本文介绍如何通过 SDK 快速体验通道服务。在使用通道服务前,您需要了解使用通道服务的注意事项、接口等信息。

注意事项

  • TunnelWorkerConfig 中默认会启动读数据和处理数据的线程池。如果使用的是单台机器,当需要启动多个 TunnelWorker 时,建议共用一个 TunnelWorkerConfig。

  • TunnelWorker 的初始化需要预热时间,该值受 TunnelWorkerConfig 中的 heartbeatIntervalInSec 参数影响,可以通过 TunnelWorkerConfig 中的 setHeartbeatIntervalInSec 方法配置,默认为 30 s。

  • 当客户端(TunnelWorker)没有被正常 shutdown 时(例如异常退出或者手动结束),TunnelWorker 会自动进行资源的回收,包括释放线程池,自动调用用户在 Channel 上注册的 shutdown 方法,关闭 Tunnel 连接等。

  • Tunnel 的增量日志保留时间,其数值与数据表中 Stream 的日志过期时长(最长时长为 7 天)保持一致,因此 Tunnel 的增量日志最多保留 7 天。

  • 增量或者全量加增量类型 Tunnel 消费数据时,可能会出现以下情况:

    • 当 Tunnel 处于全量阶段时,如果全量数据在增量日志保留时间内(最多保留 7 天)未能完成消费,将会触发 OTSTunnelExpired 错误,从而导致无法继续消费后续数据。

      如果您预计全量数据无法在指定时间内完成消费,请及时联系表格存储技术支持或者加入钉钉群 23307953(表格存储技术交流群-2)进行咨询。

    • 当 Tunnel 处于增量阶段时,如果增量数据在增量日志保留时间内(最多保留 7 天)未能完成消费,Tunnel 将可能从最近可消费的数据处开始消费,因此存在漏消费数据的风险。

  • Tunnel 过期后,表格存储可能会禁用该 Tunnel。如果禁用状态持续超过 30 天,则该 Tunnel 将被彻底删除,删除后将无法恢复。

接口

接口

说明

CreateTunnel

创建一个通道。

ListTunnel

列举某个数据表内通道的具体信息。

DescribeTunnel

描述某个通道里的具体 Channel 信息。

DeleteTunnel

删除一个通道。

使用

您可以使用如下语言的SDK实现通道服务。

前提条件

体验通道服务

使用 Java SDK 最小化的体验通道服务。

  1. 初始化 Tunnel Client。

    说明

    在运行本代码示例之前,请确保已设置环境变量TABLESTORE_ACCESS_KEY_IDTABLESTORE_ACCESS_KEY_SECRET,这两个变量分别对应阿里云账号或 RAM 用户的 AccessKey ID 和 AccessKey Secret。

    //instanceName为实例名称。
    //endPoint为表格存储实例的endPoint,例如https://instance.cn-hangzhou.ots.aliyuncs.com。
    //accessKeyIdaccessKeySecret分别为访问表格存储服务的AccessKeyIdSecret。
    final String instanceName = "yourInstanceName";
    final String endPoint = "yourEndpoint";
    final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. 创建新通道。

    请提前创建一张测试表或者使用已有的一张数据表。如果需要新建测试表,可以使用 SyncClient 中的 createTable 方法或者使用官网控制台等方式创建。

    重要

    创建增量或者全量加增量类型的通道时,时间戳的配置规则如下:

    • 如果不指定增量数据的起始时间戳,则起始时间戳为创建通道的时间。

    • 如果指定增量数据的起始时间戳(startTime)和结束时间戳(endTime),其取值范围为[当前系统时间-Stream过期时间+5分钟 , 当前系统时间],单位为毫秒。

      • Stream 过期时间为增量日志过期时长的毫秒单位时间戳,最大值为 7 天。您可以在为数据表开启 Stream 功能时设置,过期时长一经设置不能修改。

      • 结束时间戳的取值必须大于起始时间戳。

    //支持创建TunnelType.BaseData(全量)、TunnelType.Stream(增量)、TunnelType.BaseAndStream(全量加增量)三种类型的Tunnel。
    //如下示例为创建全量加增量类型的Tunnel,如果需创建其它类型的Tunnel,则将CreateTunnelRequest中的TunnelType设置为相应的类型。
    final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    //tunnelId用于后续TunnelWorker的初始化,该值也可以通过ListTunnel或者DescribeTunnel获取。
    String tunnelId = resp.getTunnelId(); 
    System.out.println("Create Tunnel, Id: " + tunnelId);
  3. 用户自定义数据消费 Callback,开始自动化的数据消费。

    //用户自定义数据消费Callback,即实现IChannelProcessor接口(processshutdown)。
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            //ProcessRecordsInput中包含有拉取到的数据。
            System.out.println("Default record processor, would print records count");
            System.out.println(
                //NextToken用于Tunnel Client的翻页。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                //模拟消费处理。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    //TunnelWorkerConfig默认会启动读数据和处理数据的线程池。
    //如果使用的是单台机器,当需要启动多个TunnelWorker时,建议共用一个TunnelWorkerConfig。TunnelWorkerConfig中包括更多的高级参数。
    TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    //配置TunnelWorker,并启动自动化的数据处理任务。
    TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        config.shutdown();
        worker.shutdown();
        tunnelClient.shutdown();
    }

配置 TunnelWorkerConfig

TunnelWorkerConfig 提供 Tunnel Client 的自定义配置,可根据实际需要配置参数,Java SDK 中的参数说明请参见下表。

配置

参数

说明

Heartbeat 的间隔和超时时间

heartbeatTimeoutInSec

Heartbeat 的超时间隔。默认值为 300 s。

当 Heartbeat 发生超时,Tunnel 服务端会认为当前 TunnelClient 不可用(失活),客户端需要重新进行 ConnectTunnel。

heartbeatIntervalInSec

进行 Heartbeat 的间隔。 默认值为 30 s,最小支持配置到 5 s,单位为 s。

Heartbeat 用于活跃 Channel 的探测、Channel 状态的更新、(自动化)数据拉取任务的初始化等。

记录消费位点的时间间隔

checkpointIntervalInMillis

用户消费完数据后,向 Tunnel 服务端进行记录消费位点操作(checkpoint)的时间间隔。

默认值为 5000 ms,单位为 ms。

说明
  • 因为读取任务所在机器不同,进程可能会遇到各种类型的错误。例如因为环境因素重启,需要定期对处理完的数据做记录(checkpoint)。当任务重启后,会接着上次的 checkpoint 继续往后做。在极端情况下,通道服务不保证传给您的记录只有一次,只保证数据至少传一次,且记录的顺序不变。如果出现局部数据重复发送的情况,需要您注意业务的处理逻辑。

  • 如果希望减少在出错情况下数据的重复处理,可以增加做 checkpoint 的频率。但是过于频繁的 checkpoint 会降低系统的吞吐量,请根据自身业务特点决定 checkpoint 的操作频率。

客户端的自定义标识

clientTag

客户端的自定义标识,可以生成 Tunnel Client ID,用于区分 TunnelWorker。

数据处理的自定义 Callback

channelProcessor

用户注册的处理数据的 Callback,包括 process 和 shutdown 方法。

数据读取和数据处理的线程池资源配置

readRecordsExecutor

用于数据读取的线程池资源。无特殊需求,建议使用默认的配置。

processRecordsExecutor

用于处理数据的线程池资源。无特殊需求,建议使用默认的配置。

说明
  • 自定义上述线程池时,线程池中的线程数要和 Tunnel 中的 Channel 数尽可能一致,此时可以保障每个 Channel 都能很快地分配到计算资源(CPU)。

  • 在默认线程池配置中,为了保证吞吐量,表格存储进行了如下操作:

    • 默认预先分配 32 个核心线程,以保障数据较小时(Channel数较少时)的实时吞吐量。

    • 工作队列的大小适当调小,当用户数据量比较大(Channel数较多)时,可以更快触发线程池新建线程的策略,及时弹出更多的计算资源。

    • 设置了默认的线程保活时间(默认为 60 s),当数据量下降后,可以及时回收线程资源。

内存控制

maxChannelParallel

读取和处理数据的最大 Channel 并行度,可用于内存控制。

默认值为 -1,表示不限制最大并行度。

说明

仅 Java SDK 5.10.0 及以上版本支持此功能。

最大退避时间配置

maxRetryIntervalInMillis

Tunnel 的最大退避时间基准值,最大退避时间在此基准值附近随机变化,具体范围为 0.75*maxRetryIntervalInMillis~1.25*maxRetryIntervalInMillis

默认值为 2000 ms,最小值为 200 ms。

说明
  • 仅 Java SDK 5.4.0 及以上版本支持此功能。

  • Tunnel 对于数据量较小的情况(单次拉取小于 900 KB 或 500 条)会进行一定时间的指数退避,直至达到最大退避时间。

CLOSING 分区状态检测

enableClosingChannelDetect

是否开启 CLOSING 分区实时检测。默认值为 false,表示不开启 CLOSING 分区实时检测。

说明
  • 仅 Java SDK 5.13.13 及以上版本支持此功能。

  • 未开启此功能时,在某些极端场景(包括但不限于通道分区数较多但客户端资源较低等)下,会出现分区卡住不消费的情况。

  • CLOSING 分区指调度中的分区,表示该分区正在切换 Tunnel Client,会调度到其他 Tunnel Client。

附录:完整代码

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            System.out.println("Default record processor, would print records count");
            System.out.println(
                //NextToken用于Tunnel Client的翻页。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                //模拟消费处理。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    public static void main(String[] args) throws Exception {
        //1.初始化Tunnel Client。
        // yourInstanceName 填写您的实例名称
        final String instanceName = "yourInstanceName";
        // yourEndpoint 填写您的实例访问地址
        final String endPoint = "yourEndpoint";
         // 获取环境变量里的 AccessKey ID 和 AccessKey Secret
        final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
        final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
        TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
        //2.创建新通道(此步骤需要提前创建一张测试表,可以使用SyncClient的createTable或者使用官网控制台等方式创建)。
        final String tableName = "testTable";
        final String tunnelName = "testTunnel";
        CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse resp = tunnelClient.createTunnel(request);
        //tunnelId用于后续TunnelWorker的初始化,该值也可以通过ListTunnel或者DescribeTunnel获取。
        String tunnelId = resp.getTunnelId();
        System.out.println("Create Tunnel, Id: " + tunnelId);
        //3.用户自定义数据消费Callback,开始自动化的数据消费。
        //TunnelWorkerConfig中有更多的高级参数。
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            config.shutdown();
            worker.shutdown();
            tunnelClient.shutdown();
        }
    }
}