通过SDK使用通道服务

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

本文介绍如何通过 SDK 快速体验通道服务。在使用通道服务前,您需要了解使用通道服务的注意事项、接口等信息。

注意事项

  • TunnelWorkerConfig 中默认会启动读数据和处理数据的线程池。如果使用的是单台机器,当需要启动多个 TunnelWorker 时,建议共用一个 TunnelWorkerConfig。

  • TunnelWorker 的初始化需要预热时间,该值受 TunnelWorkerConfig 中的 heartbeatIntervalInSec 参数影响,可以通过 TunnelWorkerConfig 中的 setHeartbeatIntervalInSec 方法配置,默认为 30 s。

  • 当客户端(TunnelWorker)没有被正常 shutdown 时(例如异常退出或者手动结束),TunnelWorker 会自动进行资源的回收,包括释放线程池,自动调用用户在 Channel 上注册的 shutdown 方法,关闭 Tunnel 连接等。

  • Tunnel 的增量日志保留时间,其数值与数据表中 Stream 的日志过期时长(最长时长为 7 天)保持一致,因此 Tunnel 的增量日志最多保留 7 天。

  • 增量或者全量加增量类型 Tunnel 消费数据时,可能会出现以下情况:

    • 当 Tunnel 处于全量阶段时,如果全量数据在增量日志保留时间内(最多保留 7 天)未能完成消费,将会触发 OTSTunnelExpired 错误,从而导致无法继续消费后续数据。

      如果您预计全量数据无法在指定时间内完成消费,请及时联系表格存储技术支持或者加入钉钉群 23307953(表格存储技术交流群-2)进行咨询。

    • 当 Tunnel 处于增量阶段时,如果增量数据在增量日志保留时间内(最多保留 7 天)未能完成消费,Tunnel 将可能从最近可消费的数据处开始消费,因此存在漏消费数据的风险。

  • Tunnel 过期后,表格存储可能会禁用该 Tunnel。如果禁用状态持续超过 30 天,则该 Tunnel 将被彻底删除,删除后将无法恢复。

接口

接口

说明

CreateTunnel

创建一个通道。

ListTunnel

列举某个数据表内通道的具体信息。

DescribeTunnel

描述某个通道里的具体 Channel 信息。

DeleteTunnel

删除一个通道。

使用

您可以使用如下语言的SDK实现通道服务。

前提条件

  • 在访问控制 RAM 服务侧完成如下操作:

    • 已创建 RAM 用户并为 RAM 用户授予管理表格存储权限 AliyunOTSFullAccess。具体操作,请参见创建 RAM 用户为 RAM 用户授权

      说明

      在实际业务环境中,建议您遵循最小化授权原则,避免权限过大带来的安全风险。

    • 已为 RAM 用户创建 AccessKey。具体操作,请参见创建 AccessKey

      警告

      阿里云账号 AccessKey 泄露会威胁您所有资源的安全。建议您使用 RAM 用户 AccessKey 进行操作,这可以有效降低 AccessKey 泄露的风险。

体验通道服务

使用 Java SDK 最小化的体验通道服务。

  1. 初始化 Tunnel Client。

    说明

    在运行本代码示例之前,请确保已设置环境变量TABLESTORE_ACCESS_KEY_IDTABLESTORE_ACCESS_KEY_SECRET,这两个变量分别对应阿里云账号或 RAM 用户的 AccessKey ID 和 AccessKey Secret。

    //endPoint为表格存储实例的endPoint,例如https://instance.cn-hangzhou.ots.aliyuncs.com。
    //accessKeyIdaccessKeySecret分别为访问表格存储服务的AccessKeyIdSecret。
    //instanceName为实例名称。
    final String endPoint = "";
    final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID");
    final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET");
    final String instanceName = "";
    TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
  2. 创建新通道。

    请提前创建一张测试表或者使用已有的一张数据表。如果需要新建测试表,可以使用 SyncClient 中的 createTable 方法或者使用官网控制台等方式创建。

    重要

    创建增量或者全量加增量类型的通道时,时间戳的配置规则如下:

    • 如果不指定增量数据的起始时间戳,则起始时间戳为创建通道的时间。

    • 如果指定增量数据的起始时间戳(startTime)和结束时间戳(endTime),其取值范围为[当前系统时间-Stream过期时间+5分钟 , 当前系统时间],单位为毫秒。

      • Stream 过期时间为增量日志过期时长的毫秒单位时间戳,最大值为 7 天。您可以在为数据表开启 Stream 功能时设置,过期时长一经设置不能修改。

      • 结束时间戳的取值必须大于起始时间戳。

    //支持创建TunnelType.BaseData(全量)、TunnelType.Stream(增量)、TunnelType.BaseAndStream(全量加增量)三种类型的Tunnel。
    //如下示例为创建全量加增量类型的Tunnel,如果需创建其它类型的Tunnel,则将CreateTunnelRequest中的TunnelType设置为相应的类型。
    final String tableName = "testTable";
    final String tunnelName = "testTunnel";
    CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
    CreateTunnelResponse resp = tunnelClient.createTunnel(request);
    //tunnelId用于后续TunnelWorker的初始化,该值也可以通过ListTunnel或者DescribeTunnel获取。
    String tunnelId = resp.getTunnelId(); 
    System.out.println("Create Tunnel, Id: " + tunnelId);
  3. 用户自定义数据消费 Callback,开始自动化的数据消费。

    //用户自定义数据消费Callback,即实现IChannelProcessor接口(processshutdown)。
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            //ProcessRecordsInput中包含有拉取到的数据。
            System.out.println("Default record processor, would print records count");
            System.out.println(
                //NextToken用于Tunnel Client的翻页。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                //模拟消费处理。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    
    //TunnelWorkerConfig默认会启动读数据和处理数据的线程池。
    //如果使用的是单台机器,当需要启动多个TunnelWorker时,建议共用一个TunnelWorkerConfig。TunnelWorkerConfig中包括更多的高级参数。
    TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
    //配置TunnelWorker,并启动自动化的数据处理任务。
    TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
    try {
        worker.connectAndWorking();
    } catch (Exception e) {
        e.printStackTrace();
        config.shutdown();
        worker.shutdown();
        tunnelClient.shutdown();
    }

配置 TunnelWorkerConfig

TunnelWorkerConfig 提供 Tunnel Client 的自定义配置,可根据实际需要配置参数,Java SDK 中的参数说明请参见下表。

配置

参数

说明

Heartbeat 的间隔和超时时间

heartbeatTimeoutInSec

Heartbeat 的超时间隔。默认值为 300 s。

当 Heartbeat 发生超时,Tunnel 服务端会认为当前 TunnelClient 不可用(失活),客户端需要重新进行 ConnectTunnel。

heartbeatIntervalInSec

进行 Heartbeat 的间隔。 默认值为 30 s,最小支持配置到 5 s,单位为 s。

Heartbeat 用于活跃 Channel 的探测、Channel 状态的更新、(自动化)数据拉取任务的初始化等。

记录消费位点的时间间隔

checkpointIntervalInMillis

用户消费完数据后,向 Tunnel 服务端进行记录消费位点操作(checkpoint)的时间间隔。

默认值为 5000 ms,单位为 ms。

说明
  • 因为读取任务所在机器不同,进程可能会遇到各种类型的错误。例如因为环境因素重启,需要定期对处理完的数据做记录(checkpoint)。当任务重启后,会接着上次的 checkpoint 继续往后做。在极端情况下,通道服务不保证传给您的记录只有一次,只保证数据至少传一次,且记录的顺序不变。如果出现局部数据重复发送的情况,需要您注意业务的处理逻辑。

  • 如果希望减少在出错情况下数据的重复处理,可以增加做 checkpoint 的频率。但是过于频繁的 checkpoint 会降低系统的吞吐量,请根据自身业务特点决定 checkpoint 的操作频率。

客户端的自定义标识

clientTag

客户端的自定义标识,可以生成 Tunnel Client ID,用于区分 TunnelWorker。

数据处理的自定义 Callback

channelProcessor

用户注册的处理数据的 Callback,包括 process 和 shutdown 方法。

数据读取和数据处理的线程池资源配置

readRecordsExecutor

用于数据读取的线程池资源。无特殊需求,建议使用默认的配置。

processRecordsExecutor

用于处理数据的线程池资源。无特殊需求,建议使用默认的配置。

说明
  • 自定义上述线程池时,线程池中的线程数要和 Tunnel 中的 Channel 数尽可能一致,此时可以保障每个 Channel 都能很快地分配到计算资源(CPU)。

  • 在默认线程池配置中,为了保证吞吐量,表格存储进行了如下操作:

    • 默认预先分配 32 个核心线程,以保障数据较小时(Channel数较少时)的实时吞吐量。

    • 工作队列的大小适当调小,当用户数据量比较大(Channel数较多)时,可以更快触发线程池新建线程的策略,及时弹出更多的计算资源。

    • 设置了默认的线程保活时间(默认为 60 s),当数据量下降后,可以及时回收线程资源。

内存控制

maxChannelParallel

读取和处理数据的最大 Channel 并行度,可用于内存控制。

默认值为 -1,表示不限制最大并行度。

说明

仅 Java SDK 5.10.0 及以上版本支持此功能。

最大退避时间配置

maxRetryIntervalInMillis

Tunnel 的最大退避时间基准值,最大退避时间在此基准值附近随机变化,具体范围为 0.75*maxRetryIntervalInMillis~1.25*maxRetryIntervalInMillis

默认值为 2000 ms,最小值为 200 ms。

说明
  • 仅 Java SDK 5.4.0 及以上版本支持此功能。

  • Tunnel 对于数据量较小的情况(单次拉取小于 900 KB 或 500 条)会进行一定时间的指数退避,直至达到最大退避时间。

CLOSING 分区状态检测

enableClosingChannelDetect

是否开启 CLOSING 分区实时检测。默认值为 false,表示不开启 CLOSING 分区实时检测。

说明
  • 仅 Java SDK 5.13.13 及以上版本支持此功能。

  • 未开启此功能时,在某些极端场景(包括但不限于通道分区数较多但客户端资源较低等)下,会出现分区卡住不消费的情况。

  • CLOSING 分区指调度中的分区,表示该分区正在切换 Tunnel Client,会调度到其他 Tunnel Client。

附录:完整代码

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
    private static class SimpleProcessor implements IChannelProcessor {
        @Override
        public void process(ProcessRecordsInput input) {
            System.out.println("Default record processor, would print records count");
            System.out.println(
                //NextToken用于Tunnel Client的翻页。
                String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
            try {
                //模拟消费处理。
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void shutdown() {
            System.out.println("Mock shutdown");
        }
    }
    public static void main(String[] args) throws Exception {
        //1.初始化Tunnel Client。
        final String endPoint = "";
        final String accessKeyId = System.getenv("OTS_AK_ENV");
        final String accessKeySecret = System.getenv("OTS_SK_ENV");
        final String instanceName = "";
        TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
        //2.创建新通道(此步骤需要提前创建一张测试表,可以使用SyncClient的createTable或者使用官网控制台等方式创建)。
        final String tableName = "testTable";
        final String tunnelName = "testTunnel";
        CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
        CreateTunnelResponse resp = tunnelClient.createTunnel(request);
        //tunnelId用于后续TunnelWorker的初始化,该值也可以通过ListTunnel或者DescribeTunnel获取。
        String tunnelId = resp.getTunnelId();
        System.out.println("Create Tunnel, Id: " + tunnelId);
        //3.用户自定义数据消费Callback,开始自动化的数据消费。
        //TunnelWorkerConfig中有更多的高级参数。
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
        TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
        try {
            worker.connectAndWorking();
        } catch (Exception e) {
            e.printStackTrace();
            config.shutdown();
            worker.shutdown();
            tunnelClient.shutdown();
        }
    }
}