完成数据订阅任务后,您可以使用DTS提供的SDK示例代码来订阅数据变更信息。本文介绍通过SDK示例代码消费分布式订阅数据,支持的数据源包括PolarDB-X 1.0和DMS LogicDB。

前提条件

  • 已安装JDK 1.8版本。
  • 已安装IntelliJ IDEA软件。

注意事项

如果使用子账号(RAM用户)来订阅数据,该账号需具备AliyunDTSFullAccess权限,以及订阅对象的访问权限,授权方法请参见通过系统策略授权子账号管理DTS为RAM用户授权

操作步骤

本文以IntelliJ IDEA软件(Community Edition 2020.1 Windows版本)为例,介绍如何运行SDK示例代码来消费PolarDB-X 1.0的订阅数据。

  1. 创建新版数据订阅通道,具体操作请参见创建RDS MySQL数据订阅任务
  2. 创建一个或多个消费组,具体操作请参见新增消费组
  3. 下载并解压SDK示例代码文件,下载地址为SDK示例代码
  4. 在IntelliJ IDEA软件中打开目标项目。
    1. 打开IntelliJ IDEA软件,然后单击Open or Import
      打开工程
    2. 在弹出的对话框中,选择SDK示例代码解压的目录,依次展开文件夹,双击项目对象模型文件:pom.xml
      双击模型文件
    3. 在弹出对话框中,选择Open as Projec
  5. 在IntelliJ IDEA软件界面,依次展开文件夹,并根据 SDK客户端的使用模式,选择并双击打开对应的Java文件:DistributedDTSConsumerDemo
    找到目标文件
  6. 设置Java文件代码中的必填参数。
    public static void main(String[] args) throws ClientException {
            //分布式类型数据源的订阅配置方式,例如PolarDBX10(原DRDS)。配置AccessKey、实例Id、主任务id,订阅消费组等相关信息。
            String accessKeyId = "LTA***********99reZ";
            String accessKeySecret = "****************";
            String regionId = "cn-hangzhou";
            String dtsInstanceId = "dtse5212sed162****";
            String jobId = "l791216x16d****";
            String sid = "dtsip412t13160****";
            String userName = "xftest";
            String password = "******";
            String proxyUrl = "dts-cn-****.com:18001";
            // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019))
            String checkpoint = "1639620090";
    
            // Convert physical database/table name to logical database/table name
            boolean mapping = true;
            // if force use config checkpoint when start. for checkpoint reset, only assign mode works
            boolean isForceUseInitCheckpoint = false;
    
            ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN;
            DistributedDTSConsumerDemo demo = new DistributedDTSConsumerDemo(userName, password, regionId,
                    jobId, sid, dtsInstanceId, accessKeyId, accessKeySecret, subscribeMode, proxyUrl,
                    checkpoint, isForceUseInitCheckpoint, mapping);
            demo.start();
        }
    参数说明获取方法
    accessKeyId访问密钥ID。获取方法请参见获取AccessKey
    accessKeySecret访问密钥ID的密码。
    regionId数据订阅任务的地域ID。在DTS控制台单击目标订阅实例ID,在基本信息页面,您可以获取地域信息,例如:地域为华东1(杭州),代码中需要填写为cn-hangzhou,详情请参见地域列表
    dtsInstanceId数据订阅任务实例ID。在DTS控制台单击目标订阅实例ID,在基本信息页面,您可以获取数据订阅实例的任务实例ID
    jobId数据订阅任务ID。您可以调用DescribeDtsJobs接口获取数据订阅任务ID(DtsJobId)。
    sid消费组ID。在DTS控制台单击目标订阅实例ID,然后单击左侧导航栏的数据消费,您可以获取到消费组ID/名称和消费组的账号信息。
    说明 消费组账号的密码已在您新建消费组时指定。
    userName消费组的账号。
    password消费组账号的密码。
    proxyUrl数据订阅通道的网络地址及端口号信息。
    说明 如果您部署SDK客户端所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。
    在DTS控制台单击目标订阅实例ID,在基本信息页面,您可以获取到网络信息。
    checkpoint消费位点,即SDK客户端消费第一条数据的时间戳,格式为Unix时间戳,使用秒级时间戳。
    说明 消费位点信息可用于:
    • 当业务程序中断后,传入已消费位点继续消费数据,防止数据丢失。
    • 在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。
    消费位点必须在订阅实例的数据范围之内,并需转化为Unix时间戳。
    说明
    • 您可以在订阅任务列表的数据范围列,查看订阅实例的数据范围。
    • Unix时间戳转换工具可用搜索引擎获取。
  7. 在IntelliJ IDEA软件界面的顶部,选择Run > Run运行该客户端。
    说明 首次运行时,软件需要一定时间自动加载相关依赖包并完成安装。
    • 运行结果中显示该客户端可正常订阅到源库的数据变更信息。
    • SDK客户端每隔一定时间会统计并显示消费数据的信息,包括数据发送和接受时数据总数、数据总量、每秒请求数接收RPS等。
      表 1. 消费数据的统计信息
      参数说明
      outCountsSDK客户端所消费的数据总数。
      outBytesSDK客户端所消费的数据总量,单位为Byte。
      outRpsSDK客户端消费数据时的每秒请求数。
      outBpsSDK客户端消费数据时每秒传送的比特数。
      count暂无。
      inBytesDTS服务器发送的数据总量,单位为Byte。
      DStoreRecordQueueDTS服务器发送数据时,当前数据缓存队列的大小。
      inCountsDTS服务器发送数据总数。
      inRpsDTS服务器发送数据时的每秒请求数。
      inBpsDTS服务器发送数据时每秒传送的比特数。
      __dtSDK客户端接收到数据的当前时间戳,单位为毫秒。
      DefaultUserRecordQueue序列化后,当前数据缓存队列的大小。
  8. 可选:如果您需要修改订阅数据的数据类型,可以在buildRecordListener()方法中进行修改或者自定义类。
    public static Map<String, RecordListener> buildRecordListener() {
            // user can impl their own listener
            RecordListener mysqlRecordPrintListener = new RecordListener() {
                @Override
                public void consume(DefaultUserRecord record) {
    
                    OperationType operationType = record.getOperationType();
    
                    if (operationType.equals(OperationType.INSERT)
                            || operationType.equals(OperationType.UPDATE)
                            || operationType.equals(OperationType.DELETE)
                            || operationType.equals(OperationType.HEARTBEAT)) {
    
                        // consume record
                        RecordListener recordPrintListener = new DefaultRecordPrintListener(DbType.MySQL);
    
                        recordPrintListener.consume(record);
    
                        //commit method push the checkpoint update
                        record.commit("");
                    }
                }
            };
            return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener);
        }