完成数据订阅通道的配置后,您可以使用DTS提供的SDK示例代码来订阅数据变更信息,本文介绍该示例代码的使用说明。

操作步骤

注意 如果使用子账号(RAM用户)来订阅数据,该账号需具备AliyunDTSFullAccess权限,以及订阅对象的访问权限,授权方法请参见通过系统策略授权子账号管理DTS为RAM用户授权

本文以IntelliJ IDEA软件(Community Edition 2020.1 Windows版本)为例,介绍如何运行SDK示例代码来消费订阅数据。

  1. 创建新版数据订阅通道,详情请参见创建RDS MySQL数据订阅通道创建PolarDB MySQL数据订阅通道创建Oracle数据订阅通道
  2. 创建一个或多个消费组,详情请参见新增消费组
  3. 下载SDK示例代码文件,然后解压该文件。
  4. 定位至SDK示例代码解压的目录,使用文本编辑工具打开pom.xml文件,将数据订阅SDK的版本修改为最新版本。
    设置SDK版本
    注意 您可以在Maven网站中获取最新的数据订阅SDK版本,详情请参见数据订阅SDK的Maven页面
  5. 打开IntelliJ IDEA软件,然后单击Open or Import
    打开工程
  6. 在弹出的对话框中,定位至SDK示例代码解压的目录,依次展开文件夹,找到项目对象模型文件:pom.xml
    找到项目对象模型文件
  7. 在弹出对话框中,选择Open as Project
  8. 在IntelliJ IDEA软件界面,依次展开文件夹,并根据 SDK客户端的使用模式,选择并双击打开对应的Java文件:DTSConsumerAssignDemo.javaDTSConsumerSubscribeDemo.java
    java客户端文件
    说明 DTS支持以下两种SDK客户端的使用模式:
    • ASSIGN模式:DTS为了保证消息的全局有序,每个订阅Topic只有一个partition,且固定分配至partition 0中。当SDK客户端的使用模式为ASSIGN模式时,建议只启动一个SDK客户端。
    • SUBSCRIBE模式:DTS为了保证消息的全局有序,每个订阅Topic只有一个partition,且固定分配至partition 0中。当SDK客户端的使用模式为SUBSCRIBE模式时,您可以在一个消费组下同时启动多个SDK客户端,以实现灾备。实现原理是当消费组下的正常消费数据的客户端发生故障后,其他的SDK客户端将随机且自动地分配到partition 0,继续消费。
  9. 设置Java文件代码中的必填参数。
    assigndemo
    表 1. 必填参数说明
    参数 说明 获取方式
    brokerUrl 数据订阅通道的网络地址及端口号信息。
    说明 如果您部署SDK客户端所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。
    在DTS控制台单击目标订阅实例ID,在任务管理页面,您可以获取网络地址及端口号信息。网络地址
    topic 数据订阅通道的订阅Topic。 在DTS控制台单击目标订阅实例ID,在任务管理页面,您可以获取到订阅Topic订阅配置
    sid 消费组ID。 在DTS控制台单击目标订阅实例ID,然后单击数据消费,您可以获取到消费组ID和消费组的账号信息。
    说明 消费组账号的密码已在您新建消费组时指定。
    数据消费
    userName 消费组的账号。
    警告 如您未使用本文提供的客户端,请按照<消费组的账号>-<消费组ID>的格式设置用户名(例如:dtstest-dtsae******bpv),否则无法正常连接。
    password 该账号的密码。
    initCheckpoint 消费位点,即SDK客户端消费第一条数据的时间戳,格式为Unix时间戳,例如1620962769。
    说明 消费位点信息可用于:
    • 当业务程序中断后,传入已消费位点继续消费数据,防止数据丢失。
    • 在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。
    消费位点必须在订阅实例的数据范围(如图示)之内,并需转化为Unix时间戳。数据范围
    说明 Unix时间戳转换工具可用搜索引擎获取。
    ConsumerContext.ConsumerSubscribeMode subscribeMode SDK客户端的使用模式,取值为
    • ConsumerContext.ConsumerSubscribeMode.ASSIGN:ASSIGN模式,即一个消费组下仅支持一个SDK客户端消费订阅数据,详细说明,请参见SDK使用模式说明
    • ConsumerContext.ConsumerSubscribeMode.SUBSCRIBE:SUBSCRIBE模式,即支持在同一个消费组下同时启动多个SDK客户端实现灾备,详细说明,请参见SDK使用模式说明
  10. 在IntelliJ IDEA软件界面的顶部,选择Run > Run运行该客户端。
    说明 首次运行时,软件需要一定时间自动加载相关依赖包并完成安装。
    • 运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。消费数据
    • SDK客户端每隔一定时间会统计并显示消费数据的信息,包括数据发送和接受时数据总数、数据总量、每秒请求数接收RPS等。统计信息
      表 2. 消费数据的统计信息
      参数 说明
      outCounts SDK客户端所消费的数据总数。
      outBytes SDK客户端所消费的数据总量,单位为Byte。
      outRps SDK客户端消费数据时的每秒请求数。
      outBps SDK客户端消费数据时每秒传送的比特数。
      inBytes DTS服务器发送的数据总量,单位为Byte。
      DStoreRecordQueue DTS服务器发送数据时,当前数据缓存队列的大小。
      inCounts DTS服务器发送数据总数。
      inRps DTS服务器发送数据时的每秒请求数。
      __dt SDK客户端接收到数据的当前时间戳,单位为毫秒。
      DefaultUserRecordQueue 序列化后,当前数据缓存队列的大小。

保存和查询消费位点

当SDK客户端首次启动、重启或者发生内部重试时,您需要查询并传入 消费位点,开始或重新消费数据。下文将介绍在不同情况下如何管理和查询消费位点,以确保数据不丢失,且尽量不重复,实现按需消费。
场景 SDK使用模式 查询方法
查询消费位点 ASSIGN模式、SUBSCRIBE模式
  • 由于SDK客户端每5秒保存一次消息位点,并提交至DTS服务器,如需查询最近一次消费位点,您可通过以下路径查询:
    • SDK客户端所在服务器的localCheckpointStore文件。
    • 订阅通道的数据消费界面。
  • 如您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置了外部的持久化共享存储介质(如数据库),该存储介质每5秒会保存一次消息位点,供您查询。
首次启动SDK客户端,需传入消费位点,来消费数据。 ASSIGN模式、SUBSCRIBE模式 根据SDK客户端使用模式,选择Java文件DTSConsumerAssignDemo.javaDTSConsumerSubscribeDemo.java,并配置消费位点initCheckpoint进行消费,配置方式,请参见步骤八步骤九
SDK客户端因内部重试,需重新传入上一个记录的消费位点,以继续消费数据。 ASSIGN模式 请按如下顺序,查找上一个记录的消费位点,找到即可返回位点信息:
  1. SDK客户端所在服务器的localCheckpointStore文件。
  2. IntelliJ IDEA软件中最新消费数据的位点。
  3. 您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置的外部存储介质。
SUBSCRIBE模式 请按如下顺序,查找上一个记录的消费位点,找到即可返回位点信息:
  1. 目标订阅实例ID的数据消费界面中显示的消费位点。
  2. 您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置的外部存储介质。
已重启SDK客户端,需重新传入上一个记录的消费位点,以继续消费数据。 ASSIGN模式 根据consumerContext.java文件中setForceUseCheckpoint配置情况,查询消费位点,找到即可返回位点信息:
  • 配置为true时,每次重启SDK客户端,都会强制使用传入的initCheckpoint作为消费位点。
  • 配置为false或者没有配置时,请按如下顺序,查找上一个记录的消费位点:
    1. 目标订阅实例ID的数据消费界面中显示的消费位点。
    2. 您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置的外部存储介质。
    3. IntelliJ IDEA软件中最新消费数据的位点。
SUBSCRIBE模式 该模式下consumerContext.java文件中setForceUseCheckpoint配置不生效,请按如下顺序,查找上一个记录的消费位点:
  1. 目标订阅实例ID的数据消费界面中显示的消费位点。
  2. 您在consumerContext.java文件中setUserRegisteredStore(newUserMetaStore())配置的外部存储介质。
  3. IntelliJ IDEA软件中最新消费数据的位点。

问题排查

问题 报错提示 原因 解决方法
无法连接
ERROR
CheckResult{isOk=false, errMsg='telnet dts-cn-hangzhou.aliyuncs.com:18009
failed, please check the network and if the brokerUrl is correct'}
(com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)
brokerUrl填写错误。 填入正确的brokerUrluserNamepassword,查询方式,请参见表 1
telnet real node *** failed, please check the network
无法通过broker地址连接真实的IP地址。
ERROR CheckResult{isOk=false, errMsg='build kafka consumer failed, error: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata, probably the user name or password is wrong'} (com.aliyun.dts.subscribe.clients.DefaultDTSConsumer)
用户名和密码错误。
com.aliyun.dts.subscribe.clients.exception.TimestampSeekException: RecordGenerator:seek timestamp for topic [cn_hangzhou_rm_bp11tv2923n87081s_rdsdt_dtsacct-0] with timestamp [1610249501] failed
consumerContext.java文件中 setUseCheckpoint配置为true,但消费位点不在订阅实例的数据范围(如图示)之内。 传入在订阅实例的数据范围(如图示)之内的消费位点,查询方式,请参见表 1
消费订阅速度变慢。 可通过查询统计信息中的参数DStoreRecordQueueDefaultUserRecordQueue队列的大小,分析消费数据变慢的原因。查询方式,请参见表 2
  • 如参数DStoreRecordQueue保持为0,则表示DTS服务器拉取数据速度变慢。
  • 如参数DefaultUserRecordQueue保持为默认值512,则表示SDK客户端消费数据的速度变慢。