本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
本文介绍如何通过 SDK 快速体验通道服务。在使用通道服务前,您需要了解使用通道服务的注意事项、接口等信息。
注意事项
TunnelWorkerConfig 中默认会启动读数据和处理数据的线程池。如果使用的是单台机器,当需要启动多个 TunnelWorker 时,建议共用一个 TunnelWorkerConfig。
TunnelWorker 的初始化需要预热时间,该值受 TunnelWorkerConfig 中的 heartbeatIntervalInSec 参数影响,可以通过 TunnelWorkerConfig 中的 setHeartbeatIntervalInSec 方法配置,默认为 30 s。
当客户端(TunnelWorker)没有被正常 shutdown 时(例如异常退出或者手动结束),TunnelWorker 会自动进行资源的回收,包括释放线程池,自动调用用户在 Channel 上注册的 shutdown 方法,关闭 Tunnel 连接等。
Tunnel 的增量日志保留时间,其数值与数据表中 Stream 的日志过期时长(最长时长为 7 天)保持一致,因此 Tunnel 的增量日志最多保留 7 天。
增量或者全量加增量类型 Tunnel 消费数据时,可能会出现以下情况:
当 Tunnel 处于全量阶段时,如果全量数据在增量日志保留时间内(最多保留 7 天)未能完成消费,将会触发
OTSTunnelExpired
错误,从而导致无法继续消费后续数据。如果您预计全量数据无法在指定时间内完成消费,请及时联系表格存储技术支持或者加入钉钉群 23307953(表格存储技术交流群-2)进行咨询。
当 Tunnel 处于增量阶段时,如果增量数据在增量日志保留时间内(最多保留 7 天)未能完成消费,Tunnel 将可能从最近可消费的数据处开始消费,因此存在漏消费数据的风险。
Tunnel 过期后,表格存储可能会禁用该 Tunnel。如果禁用状态持续超过 30 天,则该 Tunnel 将被彻底删除,删除后将无法恢复。
接口
接口 | 说明 |
CreateTunnel | 创建一个通道。 |
ListTunnel | 列举某个数据表内通道的具体信息。 |
DescribeTunnel | 描述某个通道里的具体 Channel 信息。 |
DeleteTunnel | 删除一个通道。 |
使用
您可以使用如下语言的SDK实现通道服务。
前提条件
在访问控制 RAM 服务侧完成如下操作:
已创建 RAM 用户并为 RAM 用户授予管理表格存储权限
AliyunOTSFullAccess
。具体操作,请参见创建 RAM 用户和为 RAM 用户授权。说明在实际业务环境中,建议您遵循最小化授权原则,避免权限过大带来的安全风险。
已为 RAM 用户创建 AccessKey。具体操作,请参见创建 AccessKey。
警告阿里云账号 AccessKey 泄露会威胁您所有资源的安全。建议您使用 RAM 用户 AccessKey 进行操作,这可以有效降低 AccessKey 泄露的风险。
在表格存储服务侧完成如下操作:
已创建数据表。具体操作,请参见使用控制台创建数据表、使用命令行工具创建数据表或使用SDK创建数据表。
已获取实例域名地址(Endpoint)。具体操作,请参见获取实例Endpoint。
已配置访问凭证。具体操作,请参见配置访问凭证。
体验通道服务
使用 Java SDK 最小化的体验通道服务。
初始化 Tunnel Client。
说明在运行本代码示例之前,请确保已设置环境变量
TABLESTORE_ACCESS_KEY_ID
和TABLESTORE_ACCESS_KEY_SECRET
,这两个变量分别对应阿里云账号或 RAM 用户的 AccessKey ID 和 AccessKey Secret。//endPoint为表格存储实例的endPoint,例如https://instance.cn-hangzhou.ots.aliyuncs.com。 //accessKeyId和accessKeySecret分别为访问表格存储服务的AccessKey的Id和Secret。 //instanceName为实例名称。 final String endPoint = ""; final String accessKeyId = System.getenv("TABLESTORE_ACCESS_KEY_ID"); final String accessKeySecret = System.getenv("TABLESTORE_ACCESS_KEY_SECRET"); final String instanceName = ""; TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
创建新通道。
请提前创建一张测试表或者使用已有的一张数据表。如果需要新建测试表,可以使用 SyncClient 中的 createTable 方法或者使用官网控制台等方式创建。
重要创建增量或者全量加增量类型的通道时,时间戳的配置规则如下:
如果不指定增量数据的起始时间戳,则起始时间戳为创建通道的时间。
如果指定增量数据的起始时间戳(startTime)和结束时间戳(endTime),其取值范围为
[当前系统时间-Stream过期时间+5分钟 , 当前系统时间]
,单位为毫秒。Stream 过期时间为增量日志过期时长的毫秒单位时间戳,最大值为 7 天。您可以在为数据表开启 Stream 功能时设置,过期时长一经设置不能修改。
结束时间戳的取值必须大于起始时间戳。
//支持创建TunnelType.BaseData(全量)、TunnelType.Stream(增量)、TunnelType.BaseAndStream(全量加增量)三种类型的Tunnel。 //如下示例为创建全量加增量类型的Tunnel,如果需创建其它类型的Tunnel,则将CreateTunnelRequest中的TunnelType设置为相应的类型。 final String tableName = "testTable"; final String tunnelName = "testTunnel"; CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream); CreateTunnelResponse resp = tunnelClient.createTunnel(request); //tunnelId用于后续TunnelWorker的初始化,该值也可以通过ListTunnel或者DescribeTunnel获取。 String tunnelId = resp.getTunnelId(); System.out.println("Create Tunnel, Id: " + tunnelId);
用户自定义数据消费 Callback,开始自动化的数据消费。
//用户自定义数据消费Callback,即实现IChannelProcessor接口(process和shutdown)。 private static class SimpleProcessor implements IChannelProcessor { @Override public void process(ProcessRecordsInput input) { //ProcessRecordsInput中包含有拉取到的数据。 System.out.println("Default record processor, would print records count"); System.out.println( //NextToken用于Tunnel Client的翻页。 String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken())); try { //模拟消费处理。 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void shutdown() { System.out.println("Mock shutdown"); } } //TunnelWorkerConfig默认会启动读数据和处理数据的线程池。 //如果使用的是单台机器,当需要启动多个TunnelWorker时,建议共用一个TunnelWorkerConfig。TunnelWorkerConfig中包括更多的高级参数。 TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); //配置TunnelWorker,并启动自动化的数据处理任务。 TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); config.shutdown(); worker.shutdown(); tunnelClient.shutdown(); }
配置 TunnelWorkerConfig
TunnelWorkerConfig 提供 Tunnel Client 的自定义配置,可根据实际需要配置参数,Java SDK 中的参数说明请参见下表。
配置 | 参数 | 说明 |
Heartbeat 的间隔和超时时间 | heartbeatTimeoutInSec | Heartbeat 的超时间隔。默认值为 300 s。 当 Heartbeat 发生超时,Tunnel 服务端会认为当前 TunnelClient 不可用(失活),客户端需要重新进行 ConnectTunnel。 |
heartbeatIntervalInSec | 进行 Heartbeat 的间隔。 默认值为 30 s,最小支持配置到 5 s,单位为 s。 Heartbeat 用于活跃 Channel 的探测、Channel 状态的更新、(自动化)数据拉取任务的初始化等。 | |
记录消费位点的时间间隔 | checkpointIntervalInMillis | 用户消费完数据后,向 Tunnel 服务端进行记录消费位点操作(checkpoint)的时间间隔。 默认值为 5000 ms,单位为 ms。 说明
|
客户端的自定义标识 | clientTag | 客户端的自定义标识,可以生成 Tunnel Client ID,用于区分 TunnelWorker。 |
数据处理的自定义 Callback | channelProcessor | 用户注册的处理数据的 Callback,包括 process 和 shutdown 方法。 |
数据读取和数据处理的线程池资源配置 | readRecordsExecutor | 用于数据读取的线程池资源。无特殊需求,建议使用默认的配置。 |
processRecordsExecutor | 用于处理数据的线程池资源。无特殊需求,建议使用默认的配置。 说明
| |
内存控制 | maxChannelParallel | 读取和处理数据的最大 Channel 并行度,可用于内存控制。 默认值为 -1,表示不限制最大并行度。 说明 仅 Java SDK 5.10.0 及以上版本支持此功能。 |
最大退避时间配置 | maxRetryIntervalInMillis | Tunnel 的最大退避时间基准值,最大退避时间在此基准值附近随机变化,具体范围为 默认值为 2000 ms,最小值为 200 ms。 说明
|
CLOSING 分区状态检测 | enableClosingChannelDetect | 是否开启 CLOSING 分区实时检测。默认值为 false,表示不开启 CLOSING 分区实时检测。 说明
|
附录:完整代码
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelRequest;
import com.alicloud.openservices.tablestore.model.tunnel.CreateTunnelResponse;
import com.alicloud.openservices.tablestore.model.tunnel.TunnelType;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
public class TunnelQuickStart {
private static class SimpleProcessor implements IChannelProcessor {
@Override
public void process(ProcessRecordsInput input) {
System.out.println("Default record processor, would print records count");
System.out.println(
//NextToken用于Tunnel Client的翻页。
String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken()));
try {
//模拟消费处理。
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void shutdown() {
System.out.println("Mock shutdown");
}
}
public static void main(String[] args) throws Exception {
//1.初始化Tunnel Client。
final String endPoint = "";
final String accessKeyId = System.getenv("OTS_AK_ENV");
final String accessKeySecret = System.getenv("OTS_SK_ENV");
final String instanceName = "";
TunnelClient tunnelClient = new TunnelClient(endPoint, accessKeyId, accessKeySecret, instanceName);
//2.创建新通道(此步骤需要提前创建一张测试表,可以使用SyncClient的createTable或者使用官网控制台等方式创建)。
final String tableName = "testTable";
final String tunnelName = "testTunnel";
CreateTunnelRequest request = new CreateTunnelRequest(tableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse resp = tunnelClient.createTunnel(request);
//tunnelId用于后续TunnelWorker的初始化,该值也可以通过ListTunnel或者DescribeTunnel获取。
String tunnelId = resp.getTunnelId();
System.out.println("Create Tunnel, Id: " + tunnelId);
//3.用户自定义数据消费Callback,开始自动化的数据消费。
//TunnelWorkerConfig中有更多的高级参数。
TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());
TunnelWorker worker = new TunnelWorker(tunnelId, tunnelClient, config);
try {
worker.connectAndWorking();
} catch (Exception e) {
e.printStackTrace();
config.shutdown();
worker.shutdown();
tunnelClient.shutdown();
}
}
}