使用SDK消费订阅数据

更新时间:2025-03-20 08:49:28
重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

在完成数据订阅通道的配置(创建好订阅任务和消费组)后,您可以使用DTS提供的SDK来消费订阅到的数据,本文介绍示例代码的使用方法。

说明

前提条件

注意事项

  • 在消费订阅数据时,您需要调用DefaultUserRecordcommit方法以提交位点信息,否则会导致数据重复消费。

  • 不同的消费之间是相互独立的。

操作步骤

  1. 下载SDK示例代码文件,然后解压该文件。

  2. 确认SDK代码的版本。

    1. 定位至SDK示例代码解压的目录。

    2. 使用文本编辑工具打开目录中的pom.xml文件。

    3. 将数据订阅SDK的版本(version)修改为最新版本。

      说明

      您可以在dts-new-subscribe-sdk页面查看最新Maven依赖。

      SDK版本参数的位置(单击展开)

      <name>dts-new-subscribe-sdk</name>
      <url>https://www.aliyun.com/product/dts</url>
      <description>The Aliyun new Subscribe SDK for Java used for accessing Data Transmission Service</description>
      <packaging>jar</packaging>
      <groupId>com.aliyun.dts</groupId>
      <artifactId>dts-new-subscribe-sdk</artifactId>
      <version>2.1.4</version>
  3. 编辑SDK代码。

    1. 使用编码软件打开解压后的文件。

    2. 根据SDK客户端的使用模式,打开对应模式的Java文件。

      说明

      Java文件的路径为aliyun-dts-subscribe-sdk-java-master/src/test/java/com/aliyun/dts/subscribe/clients/

      使用模式

      Java文件

      说明

      适用场景

      使用模式

      Java文件

      说明

      适用场景

      ASSIGN模式

      DTSConsumerAssignDemo.java

      DTS为了保证消息的全局有序,每个订阅Topic只有一个partition,且固定分配至partition 0中。当SDK客户端的使用模式为ASSIGN模式时,建议只启动一个SDK客户端。

      同一个消费组下仅有一个SDK客户端消费订阅数据。

      SUBSCRIBE模式

      DTSConsumerSubscribeDemo.java

      DTS为了保证消息的全局有序,每个订阅Topic只有一个partition,且固定分配至partition 0中。当SDK客户端的使用模式为SUBSCRIBE模式时,您可以在一个消费组下同时启动多个SDK客户端,以实现灾备。实现原理是当消费组下的正常消费数据的客户端发生故障后,其他的SDK客户端将随机且自动地分配到partition 0,继续消费。

      同一个消费组下同时有多个SDK客户端消费订阅数据,即数据灾备场景。

    3. 设置Java代码中的参数。

      示例代码

      ******        
          public static void main(String[] args) {
              // kafka broker url
              String brokerUrl = "dts-cn-***.com:18001";
              // topic to consume, partition is 0
              String topic = "cn_***_version2";
              // user password and sid for auth
              String sid = "dts***";
              String userName = "dts***";
              String password = "DTS***";
              // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019))
              String initCheckpoint = "1740472***";
              // when use subscribe mode, group config is required. kafka consumer group is enabled
              ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE;
        
              DTSConsumerSubscribeDemo consumerDemo = new DTSConsumerSubscribeDemo(brokerUrl, topic, sid, userName, password, initCheckpoint, subscribeMode);
              consumerDemo.start();
          }
      ******

      参数

      说明

      获取方式

      brokerUrl

      数据订阅通道的网络地址及端口号信息。

      说明
      • 如果您部署SDK客户端所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟较小。

      • 不建议使用公网地址。

      DTS控制台单击目标订阅实例ID,在基本信息页面的网络区域,获取网络地址及端口号信息。

      topic

      数据订阅通道的订阅Topic。

      DTS控制台单击目标订阅实例ID,在基本信息页面的基本信息区域,获取到订阅Topic

      sid

      消费组ID。

      DTS控制台单击目标订阅实例ID,在数据消费页面获取消费组ID/名称和消费组的账号信息。

      userName

      消费组的账号。

      警告

      如您未使用本文提供的客户端,请按照<消费组的账号>-<消费组ID>的格式设置用户名(例如:dtstest-dtsae******bpv),否则无法正常连接。

      password

      该账号的密码。

      在新建消费组时设置的消费组账号密码。

      initCheckpoint

      消费位点,即SDK客户端消费第一条数据的时间戳,格式为Unix时间戳,例如1620962769。

      说明

      消费位点信息可用于:

      • 当业务程序中断后,传入已消费位点继续消费数据,防止数据丢失。

      • 在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。

      消费位点必须在订阅实例的数据范围之内,并需转换为Unix时间戳。

      说明

      您可以在订阅任务列表的数据范围列,查看目标订阅实例的数据范围。

      subscribeMode

      SDK客户端的使用模式,无需修改。

      • ConsumerContext.ConsumerSubscribeMode.ASSIGN:ASSIGN模式。

      • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE:SUBSCRIBE模式。

  4. 打开编码软件的项目结构,确保此项目的OpenJDK版本为1.8。

  5. 运行该客户端代码。

    说明

    代码首次运行时,编码软件需要一定时间自动加载相关插件和依赖项。

    运行结果示例(单击展开)

    正常运行结果
    正常订阅结果
    异常运行结果

    若运行结果如下所示,则表示该客户端正常运行,可以正常订阅源库的数据变更信息。

    ******
    [2025-02-25 18:47:22.991] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.consumer.KafkaConsumer:1587] - [Consumer clientId=consumer-dtsl5vy2ao5250****-1, groupId=dtsl5vy2ao5250****] Seeking to offset 8200 for partition cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0
    [2025-02-25 18:47:22.993] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ConsumerWrap:116] - RecordFetcher consumer:  subscribe for [cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0] with checkpoint [Checkpoint[ topicPartition: cn_hangzhou_vpc_rm_bp15uddebh4a1****_dts****_version2-0timestamp: 174048****, offset: 8200, info: 174048****]] start
    [2025-02-25 18:47:23.011] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174048044****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8200]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    
    [2025-02-25 18:47:23.226] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8201]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [HEARTBEAT]
    ******

    若运行结果如下所示,表示该客户端正常订阅到源库的数据变更(UPDATE操作)。

    ******
    [2025-02-25 18:48:24.905] [INFO ] [com.aliyun.dts.subscribe.clients.recordprocessor.EtlRecordProcessor] [com.aliyun.dts.subscribe.clients.recordprocessor.DefaultRecordPrintListener:49] - 
    RecordID [8413]
    RecordTimestamp [174048****] 
    Source [{"sourceType": "MySQL", "version": "8.0.36"}]
    RecordType [UPDATE]
    Schema info [{, 
    recordFields= [{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}, {fieldName='name', rawDataTypeNum=253, isPrimaryKey=false, isUniqueKey=false, fieldPosition=1}], 
    databaseName='dtsdb', 
    tableName='person', 
    primaryIndexInfo [[indexType=PrimaryKey, indexFields=[{fieldName='id', rawDataTypeNum=8, isPrimaryKey=true, isUniqueKey=false, fieldPosition=0}], cardinality=0, nullable=true, isFirstUniqueIndex=false, name=null]], 
    uniqueIndexInfo [[]], 
    partitionFields = null}]
    Before image {[Field [id] [3]
    Field [name] [test1]
    ]}
    After image {[Field [id] [3]
    Field [name] [test2]
    ]}
    ******

    若运行结果如下所示,表示该客户端无法正常连接源库。

    ******
    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:22.002] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****-1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:22.509] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    [2025-02-25 18:22:23.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":1740478943160,"DefaultUserRecordQueue":0.0}
    [2025-02-25 18:22:27.192] [WARN ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [org.apache.kafka.clients.NetworkClient:780] - [Consumer clientId=consumer-dtsnd7u2n0625m****1, groupId=dtsnd7u2n0625m****] Connection to node 1 (47.118.XXX.XXX/47.118.XXX.XXX:18001) could not be established. Broker may not be available.
    [2025-02-25 18:22:27.618] [INFO ] [com.aliyun.dts.subscribe.clients.recordfetcher.KafkaRecordFetcher] [com.aliyun.dts.subscribe.clients.recordfetcher.ClusterSwitchListener:44] - Cluster not changed on update:5aPLLlDtTHqP8sKq-DZVfg
    ******

    SDK客户端每隔一定时间会统计并显示消费数据的信息,包括数据发送和接收的总数、数据总量、每秒请求数接收RPS等参数。

    [2025-02-25 18:22:18.160] [INFO ] [subscribe-logMetricsReporter-1-thread-1] [log.metrics:184] - {"outCounts":0.0,"outBytes":0.0,"outRps":0.0,"outBps":0.0,"count":11.0,"inBytes":0.0,"DStoreRecordQueue":0.0,"inCounts":0.0,"inRps":0.0,"inBps":0.0,"__dt":174047893****,"DefaultUserRecordQueue":0.0}

    参数

    说明

    参数

    说明

    outCounts

    SDK客户端所消费的数据总数。

    outBytes

    SDK客户端所消费的数据总量,单位为Byte。

    outRps

    SDK客户端消费数据时的每秒请求数。

    outBps

    SDK客户端消费数据时每秒传送的比特数。

    count

    SDK客户端消费数据信息(metrics)中的参数总数。

    说明

    不包含count本身。

    inBytes

    DTS服务器发送的数据总量,单位为Byte。

    DStoreRecordQueue

    DTS服务器发送数据时,当前数据缓存队列的大小。

    inCounts

    DTS服务器发送数据总数。

    inBps

    DTS服务器发送数据时每秒传送的比特数。

    inRps

    DTS服务器发送数据时的每秒请求数。

    __dt

    SDK客户端接收到数据的当前时间戳,单位为毫秒。

    DefaultUserRecordQueue

    序列化后,当前数据缓存队列的大小。

  6. 根据您的业务需求,自行编辑代码以消费订阅数据。

    在消费订阅数据时,您需要管理消费位点,以确保数据不丢失,且尽量不重复,实现按需消费。

常见问题

  • 无法连接订阅实例,如何处理?

    请根据报错提示进行排查,详情请参见异常排查

  • 持久化后的消费位点是什么格式的数据?

    消费位点在持久化处理后,将返回JSON格式的数据。其中,持久化后的消费位点的格式为Unix时间戳,您可以直接将其传回SDK进行使用。如下返回数据中,"timestamp"后的1700709977即为持久化后的消费位点。

    {"groupID":"dtsglg11d48230***","streamCheckpoint":[{"partition":0,"offset":577989,"topic":"ap_southeast_1_vpc_rm_t4n22s21iysr6****_root_version2","timestamp":170070****,"info":""}]}
  • 订阅任务是否支持多个客户端并行消费?

    不支持。SUBSCRIBE模式允许多个客户端并行,但只有一个客户端可以消费到数据。

  • SDK代码中封装的是哪个版本的Kafka客户端?

    2.0.0及以上版本的dts-new-subscribe-sdk封装的是2.7.0版本的Kafka客户端(kafka-clients),2.0.0以下版本封装的是1.0.0版本的Kafka客户端。

附录

管理消费位点

SDK客户端首次启动、重启或者发生内部重试时,您需要查询并传入消费位点(即SDK客户端消费第一条数据的时间戳,格式为Unix时间戳)开始或重新消费数据。

若您需要重置客户端的消费位点,可以根据订阅的模式(SDK使用模式)参考下表查询消费位点并进行修改。

场景

SDK使用模式

位点管理方式

场景

SDK使用模式

位点管理方式

查询消费位点

ASSIGN模式、SUBSCRIBE模式

  • 由于SDK客户端每5秒保存一次消息位点,并提交至DTS服务器,如需查询最近一次消费位点,您可通过以下路径查询:

    • SDK客户端所在服务器的localCheckpointStore文件。

    • 订阅通道的数据消费界面。

  • 如您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置了外部的持久化共享存储介质(如数据库),该存储介质每5秒会保存一次消息位点,供您查询。

首次启动SDK客户端,需传入消费位点,以便消费数据。

ASSIGN模式、SUBSCRIBE模式

根据SDK客户端的使用模式,选择Java文件DTSConsumerAssignDemo.javaDTSConsumerSubscribeDemo.java,并配置消费位点initCheckpoint)进行消费。

SDK客户端因内部重试,需重新传入上一个记录的消费位点,以继续消费数据。

ASSIGN模式

按如下顺序,查找上一个记录的消费位点,找到即可返回位点信息:

  1. 您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置的外部存储介质。

  2. SDK客户端所在服务器的localCheckpointStore文件。

  3. 您在DTSConsumerSubscribeDemo.java文件中initCheckpoint传入的开始时间戳(start timestamp)。

SUBSCRIBE模式

按如下顺序,查找上一个记录的消费位点,找到即可返回位点信息:

  1. 您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置的外部存储介质。

  2. DTS Server(增量数据采集模块)保存的位点。

    说明

    SDK客户端调用commit方法更新消费位点后,此位点才会更新。

  3. 您在DTSConsumerSubscribeDemo.java文件中initCheckpoint传入的开始时间戳(start timestamp)。

  4. 使用DTS Server(新建增量数据采集模块)的起始位点。

    重要

    如果增量数据采集模块发生了切换,新建的增量数据采集模块将无法保存客户端上次的消费位点信息,可能会导致从一个较旧的位点开始消费订阅数据,建议您在客户端持久化存储消费位点

已重启SDK客户端,需重新传入上一个记录的消费位点,以继续消费数据。

ASSIGN模式

根据consumerContext.java文件中setForceUseCheckpoint配置情况,查询消费位点,找到即可返回位点信息:

  • 配置为true时,每次重启SDK客户端,都会强制使用传入的initCheckpoint作为消费位点。

  • 配置为false或者没有配置时,请按如下顺序,查找上一个记录的消费位点:

    1. SDK客户端所在服务器的localCheckpointStore文件。

    2. DTS Server(增量数据采集模块)保存的位点。

      说明

      SDK客户端调用commit方法更新消费位点后,此位点才会更新。

    3. 您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置的外部存储介质。

SUBSCRIBE模式

该模式下consumerContext.java文件中setForceUseCheckpoint配置不生效,请按如下顺序,查找上一个记录的消费位点:

  1. 您在consumerContext.java文件中setUserRegisteredStore(new UserMetaStore())配置的外部存储介质。

  2. DTS Server(增量数据采集模块)保存的位点。

    说明

    SDK客户端调用commit方法更新消费位点后,此位点才会更新。

  3. 您在DTSConsumerSubscribeDemo.java文件中initCheckpoint传入的开始时间戳(start timestamp)。

  4. 使用DTS Server(新建增量数据采集模块)的起始位点。

持久化存储消费位点

如果增量数据采集模块触发容灾机制(特别是SUBSCRIBE模式),新建的增量数据采集模块将无法保存客户端上次的消费位点信息,可能会导致客户端从一个较旧的位点开始消费订阅数据,从而造成历史数据的重复消费。例如:增量数据服务切换前,老的增量数据采集模块位点范围为20231111日 08:00:00~ 20231112日 08:00:00,客户端的消费位点为20231112日 08:00:00;增量数据服务切换后,新的增量数据采集模块位点范围为20231108日 10:00:00~ 20231112日 08:01:00,那么客户端会从新的增量数据采集模块的起始位点20231108日 10:00:00开始消费,造成重复消费历史数据。

为了规避这种切换场景对历史数据的重复消费,建议您在客户端配置一个在客户端保存的消费位点持久化存储方式。示例方法如下,您可以根据实际情况进行修改。

  1. 创建一个UserMetaStore()方法,继承实现AbstractUserMetaStore()方法。

    例如使用MySQL数据库存储位点信息,Java示例代码如下:

    public class UserMetaStore extends AbstractUserMetaStore {
    
        @Override
        protected void saveData(String groupID, String toStoreJson) {
            Connection con = getConnection();
    			  String sql = "insert into dts_checkpoint(group_id, checkpoint) values(?, ?)";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
                pres.setString(2, toStoreJson);
                pres.execute();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    
        @Override
        protected String getData(String groupID) {
            Connection con = getConnection();
    			  String sql = "select checkpoint from dts_checkpoint where group_id = ?";
    
            PreparedStatement pres = null;
            ResultSet rs = null;
    
            try {
                pres = con.prepareStatement(sql);
                pres.setString(1, groupID);
    						ResultSet rs = pres.executeQuery()
                              
                String checkpoint = rs.getString("checkpoint");
              
                return checkpoint;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                close(rs, pres, con);
            }
        }
    }
    
  2. consumerContext.java文件中的setUserRegisteredStore(new UserMetaStore())方法,配置外部存储介质。

异常排查

异常

报错提示

原因

解决方法

异常

报错提示

原因

解决方法

无法连接

ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

brokerUrl填写错误。

填入正确的brokerUrluserNamepassword。查询方式,请参见参数说明

telnet real node *** failed, please check the network

无法通过broker地址连接真实的IP地址。

ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)

用户名和密码错误。

com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed

consumerContext.java文件中setUseCheckpoint配置为true,但消费位点不在订阅实例的数据范围之内。

传入在订阅实例的数据范围之内的消费位点。查询方式,请参见参数说明

消费订阅速度变慢

  • 通过查询统计信息中的参数DStoreRecordQueueDefaultUserRecordQueue队列的大小,分析消费数据变慢的原因。

    • 若参数DStoreRecordQueue保持为0,则表示DTS服务器拉取数据速度变慢。

    • 若参数DefaultUserRecordQueue保持为默认值512,则表示SDK客户端消费数据的速度变慢。

  • 根据实际情况,修改代码中的消费位点(initCheckpoint)以重置位点。

  • 本页导读 (1)
  • 前提条件
  • 注意事项
  • 操作步骤
  • 常见问题
  • 附录
  • 管理消费位点
  • 持久化存储消费位点
  • 异常排查