本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
在完成数据订阅通道的配置(创建好订阅任务和消费组)后,您可以使用DTS提供的SDK来消费订阅到的数据,本文介绍示例代码的使用方法。
- 如果数据源是PolarDB-X 1.0或DMS LogicDB,消费订阅数据的操作步骤,请参见使用SDK消费PolarDB-X 1.0订阅数据。 
- 本操作为Java语言的SDK客户端示例,Python和Go语言的示例代码,请参见dts-subscribe-demo。 
前提条件
- 已创建订阅实例,且实例的运行状态为正常。 说明- 您可以通过订阅方案概览,查看创建订阅实例的操作文档。 
- 已为订阅实例创建消费组。 
- 若使用子账号(RAM用户)来消费订阅到的数据,该账号需具备AliyunDTSFullAccess权限,以及订阅对象的访问权限。授权方法,请参见通过系统策略授权子账号管理DTS和为RAM用户授权。 
注意事项
- 在消费订阅数据时,您需要调用DefaultUserRecord的commit方法以提交位点信息,否则会导致数据重复消费。 
- 不同的消费之间是相互独立的。 
- 控制台中的当前位点表示订阅任务当前订阅到的位点,而非客户端所提交的位点。 
操作步骤
- 下载SDK示例代码文件,然后解压该文件。 
- 确认SDK代码的版本。 - 定位至SDK示例代码解压的目录。 
- 使用文本编辑工具打开目录中的pom.xml文件。 
- 将数据订阅SDK的版本(version)修改为最新版本。 说明- 您可以在dts-new-subscribe-sdk页面查看最新Maven依赖。 
 
- 编辑SDK代码。 - 使用编码软件打开解压后的文件。 
- 根据SDK客户端的使用模式,打开对应模式的Java文件。 说明- Java文件的路径为 - aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/。- 使用模式 - Java文件 - 说明 - 适用场景 - ASSIGN模式 - DTSConsumerAssignDemo.java - DTS为了保证消息的全局有序,每个订阅Topic只有一个partition,且固定分配至partition 0中。当SDK客户端的使用模式为ASSIGN模式时,建议只启动一个SDK客户端。 - 同一个消费组下仅有一个SDK客户端消费订阅数据。 - SUBSCRIBE模式 - DTSConsumerSubscribeDemo.java - DTS为了保证消息的全局有序,每个订阅Topic只有一个partition,且固定分配至partition 0中。当SDK客户端的使用模式为SUBSCRIBE模式时,您可以在一个消费组下同时启动多个SDK客户端,以实现灾备。实现原理是当消费组下的正常消费数据的客户端发生故障后,其他的SDK客户端将随机且自动地分配到partition 0,继续消费。 - 同一个消费组下同时有多个SDK客户端消费订阅数据,即数据灾备场景。 
- 设置Java代码中的参数。 - 参数 - 说明 - 获取方式 - brokerUrl- 数据订阅通道的网络地址及端口号信息。 说明- 如果您部署SDK客户端的服务器(如ECS实例)与数据订阅实例属于同一专有网络,建议通过VPC网络地址进行数据消费,以减少网络延迟。 
- 鉴于网络稳定性因素,不建议使用公网地址。 
 - 在DTS控制台单击目标订阅实例ID,在基本信息页面的网络区域,获取网络地址及端口号信息。 - topic- 数据订阅通道的订阅Topic。 - 在DTS控制台单击目标订阅实例ID,在基本信息页面的基本信息区域,获取到订阅Topic。 - sid- 消费组ID。 - 在DTS控制台单击目标订阅实例ID,在数据消费页面获取消费组ID/名称和消费组的账号信息。 - userName- 消费组的账号。 警告- 如您未使用本文提供的客户端,请按照 - <消费组的账号>-<消费组ID>的格式设置用户名(例如:- dtstest-dtsae******bpv),否则无法正常连接。- password- 该账号的密码。 - 在新建消费组时设置的消费组账号密码。 - initCheckpoint- 消费位点,即SDK客户端消费第一条数据的时间戳,格式为Unix时间戳,例如1620962769。 说明- 消费位点信息可用于: - 当业务程序中断后,传入已消费位点继续消费数据,防止数据丢失。 
- 在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。 
 - 消费位点必须在订阅实例的数据范围之内,并需转换为Unix时间戳。 说明- 您可以在订阅任务列表的数据范围列,查看目标订阅实例的数据范围。 - subscribeMode- SDK客户端的使用模式,无需修改。 - ConsumerContext.ConsumerSubscribeMode.ASSIGN:ASSIGN模式。
- ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE:SUBSCRIBE模式。
 - 无 
 
- 打开编码软件的项目结构,确保此项目的OpenJDK版本为1.8。 
- 运行该客户端代码。 说明- 代码首次运行时,编码软件需要一定时间自动加载相关插件和依赖项。 - SDK客户端每隔一定时间会统计并显示消费数据的信息,包括数据发送和接收的总数、数据总量、每秒请求数接收RPS等参数。 - [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}- 参数 - 说明 - outCounts- SDK客户端所消费的数据总数。 - outBytes- SDK客户端所消费的数据总量,单位为Byte。 - outRps- SDK客户端消费数据时的每秒请求数。 - outBps- SDK客户端消费数据时每秒传送的比特数。 - count- SDK客户端消费数据信息(metrics)中的参数总数。 说明- 不包含 - count本身。- inBytes- DTS服务器发送的数据总量,单位为Byte。 - DStoreRecordQueue- DTS服务器发送数据时,当前数据缓存队列的大小。 - inCounts- DTS服务器发送数据总数。 - inBps- DTS服务器发送数据时每秒传送的比特数。 - inRps- DTS服务器发送数据时的每秒请求数。 - __dt- SDK客户端接收到数据的当前时间戳,单位为毫秒。 - DefaultUserRecordQueue- 序列化后,当前数据缓存队列的大小。 
- 根据您的业务需求,自行编辑代码以消费订阅数据。 - 在消费订阅数据时,您需要管理消费位点,以确保数据不丢失,且尽量不重复,实现按需消费。 
常见问题
- 无法连接订阅实例,如何处理? - 请根据报错提示进行排查,详情请参见异常排查。 
- 持久化后的消费位点是什么格式的数据? - 消费位点在持久化处理后,将返回JSON格式的数据。其中,持久化后的消费位点的格式为Unix时间戳,您可以直接将其传回SDK进行使用。如下返回数据中, - "timestamp"后的- 1700709977即为持久化后的消费位点。- {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}
- 订阅任务是否支持多个客户端并行消费? - 不支持。SUBSCRIBE模式允许多个客户端并行,但只有一个客户端可以消费到数据。 
- SDK代码中封装的是哪个版本的Kafka客户端? - 2.0.0及以上版本的dts-new-subscribe-sdk封装的是2.7.0版本的Kafka客户端(kafka-clients),2.0.0以下版本封装的是1.0.0版本的Kafka客户端。 
附录
管理消费位点
当SDK客户端首次启动、重启或者发生内部重试时,您需要查询并传入消费位点(即SDK客户端消费第一条数据的时间戳,格式为Unix时间戳)开始或重新消费数据。
若您需要重置客户端的消费位点,可以根据订阅的模式(SDK使用模式)参考下表查询消费位点并进行修改。
| 场景 | SDK使用模式 | 位点管理方式 | 
| 查询消费位点 | ASSIGN模式、SUBSCRIBE模式 | 
 | 
| 首次启动SDK客户端,需传入消费位点,以便消费数据。 | ASSIGN模式、SUBSCRIBE模式 | 根据SDK客户端的使用模式,选择Java文件DTSConsumerAssignDemo.java或DTSConsumerSubscribeDemo.java,并配置消费位点( | 
| SDK客户端因内部重试,需重新传入上一个记录的消费位点,以继续消费数据。 | ASSIGN模式 | 按如下顺序,查找上一个记录的消费位点,找到即可返回位点信息: 
 | 
| SUBSCRIBE模式 | 按如下顺序,查找上一个记录的消费位点,找到即可返回位点信息: 
 | |
| 已重启SDK客户端,需重新传入上一个记录的消费位点,以继续消费数据。 | ASSIGN模式 | 根据consumerContext.java文件中 
 | 
| SUBSCRIBE模式 | 该模式下consumerContext.java文件中 
 | 
持久化存储消费位点
如果增量数据采集模块触发容灾机制(特别是SUBSCRIBE模式),新建的增量数据采集模块将无法保存客户端上次的消费位点信息,可能会导致客户端从一个较旧的位点开始消费订阅数据,从而造成历史数据的重复消费。例如:增量数据服务切换前,老的增量数据采集模块位点范围为2023年11月11日 08:00:00~ 2023年11月12日 08:00:00,客户端的消费位点为2023年11月12日 08:00:00;增量数据服务切换后,新的增量数据采集模块位点范围为2023年11月08日 10:00:00~ 2023年11月12日 08:01:00,那么客户端会从新的增量数据采集模块的起始位点2023年11月08日 10:00:00开始消费,造成重复消费历史数据。
为了规避这种切换场景对历史数据的重复消费,建议您在客户端配置一个在客户端保存的消费位点持久化存储方式。示例方法如下,您可以根据实际情况进行修改。
- 创建一个 - UserMetaStore()方法,继承实现- AbstractUserMetaStore()方法。- 例如使用MySQL数据库存储位点信息,Java示例代码如下: - public class UserMetaStore extends AbstractUserMetaStore { @Override protected void saveData(String groupID, String toStoreJson) { Connection con = getConnection(); String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); pres.setString(2, toStoreJson); pres.execute(); } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } @Override protected String getData(String groupID) { Connection con = getConnection(); String sql = "select checkpoint from dts_checkpoint where group_id = ?"; PreparedStatement pres = null; ResultSet rs = null; try { pres = con.prepareStatement(sql); pres.setString(1, groupID); ResultSet rs = pres.executeQuery() String checkpoint = rs.getString("checkpoint"); return checkpoint; } catch (Exception e) { e.printStackTrace(); } finally { close(rs, pres, con); } } }
- 在consumerContext.java文件中的 - setUserRegisteredStore(new UserMetaStore())方法,配置外部存储介质。
异常排查
| 异常 | 报错提示 | 原因 | 解决方法 | 
| 无法连接 |  | 
 | 填入正确的 | 
|  | 无法通过broker地址连接真实的IP地址。 | ||
|  | 用户名和密码错误。 | ||
|  | consumerContext.java文件中 | 传入在订阅实例的数据范围之内的消费位点。查询方式,请参见参数说明。 | |
| 消费订阅速度变慢 | 无 | 
 
 | |