新版数据订阅支持使用0.11版本至2.7版本的Kafka客户端消费订阅数据,DTS为您提供了Kafka客户端Demo,本文将介绍该客户端的使用说明。
注意事项
-
使用本文提供的Demo消费数据时,如果采用auto commit(自动提交),可能会因为数据还没被消费完就执行了提交操作,从而丢失部分数据,建议采用手动提交的方式以避免该问题。
说明如果发生故障没有提交成功,重启客户端后会从上一个记录的位点进行数据消费,期间会有部分重复数据,您需要手动过滤。
-
数据以Avro序列化存储,详细格式请参见Record.avsc文档。
警告如果您使用的不是本文提供的Kafka客户端,在进行反序列化解析(DTS Avro的反序列化示例)时,可能出现解析的数据有误,您需要自行验证数据的正确性。
-
关于
offsetForTimes接口,DTS的搜索单位为秒,原生Kafka的搜索单位为毫秒。 -
由于数据订阅服务端会因容灾等原因导致网络闪断,若您未使用本文提供的Kafka客户端,您使用的Kafka客户端需具备网络重试能力。
-
若您使用原生的Kafka客户端消费订阅数据,则可能会在DTS发生增量数据采集模块切换行为,从而使subscribe模式下订阅客户端保存在服务端的消费位点被清除,您需要手动调整订阅的消费位点以实现按需消费数据。
Kafka客户端运行流程说明
请下载Kafka客户端Demo代码。更多关于代码使用的详细介绍,请参见Demo中的Readme文档。
-
单击
,然后选择Download ZIP下载文件。 -
如需使用Kafka客户端2.0版本,您需要修改subscribe_example-master/javaimpl/pom.xml文件,将kafka客户端的版本号修改成2.0.0。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
表 1. 运行流程说明
|
步骤 |
相关目录或文件 |
|
1、使用原生的Kafka consumer从数据订阅通道中获取增量数据。 |
subscribe_example-master/javaimpl/src/main/java/recordgenerator/ |
|
2、将获取的增量数据镜像执行反序列化,并从中获取前镜像(该条记录在数据发生变更前,各字段对应的值)、 后镜像(该条记录在数据发生变更后,各字段对应的值)和其他属性。 警告
|
subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java |
|
3、将反序列化后的数据中的dataTypeNumber字段转换为对应数据库的字段类型。 |
subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/ |
操作步骤
本文以IntelliJ IDEA软件(Community Edition 2018.1.4 Windows版本)为例,介绍如何运行该客户端消费订阅通道中的数据。
-
创建新版数据订阅通道,详情请参见创建RDS MySQL数据订阅通道、创建PolarDB MySQL版数据订阅通道或创建Oracle数据订阅通道。
-
创建一个或多个消费组,详情请参见新增消费组。
-
下载Kafka客户端Demo代码,然后解压该文件。
说明单击
,然后选择Download ZIP下载文件。 -
打开IntelliJ IDEA软件,然后单击Open。
-
在弹出的对话框中,定位至Kafka客户端Demo代码下载的目录,依次展开文件夹,找到项目对象模型文件:pom.xml。
展开路径为
kafkademo>subscribe_example-master>javaimpl,选中pom.xml后单击 OK。 -
在弹出对话框中,选择Open as Project。
-
在IntelliJ IDEA软件界面,依次展开文件夹,找到并双击打开Kafka客户端Demo文件:NotifyDemoDB.java。
-
设置NotifyDemoDB.java文件中的各参数对应的值。
public static Properties getConfigs() { Properties properties = new Properties(); // user password and sid for auth properties.setProperty(USER_NAME, "dtstest"); properties.setProperty(PASSWORD_NAME, "xxx"); properties.setProperty(SID_NAME, "dtsxxx"); // kafka consumer group general same with sid properties.setProperty(GROUP_NAME, "dtsxxx"); // topic to consume, partition is 0 properties.setProperty(KAFKA_TOPIC, "cn_hangzhou_xxx"); // kafka broker url properties.setProperty(KAFKA_BROKER_URL_NAME, "dts-cn-xxx.com:18001"); // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019)) properties.setProperty(INITIAL_CHECKPOINT_NAME, "1583307907"); // if force use config checkpoint when start. for checkpoint reset properties.setProperty(USE_CONFIG_CHECKPOINT_NAME, "true"); // use consumer assign or subscribe interface // when use subscribe mode, group config is required. kafka consumer group is enabled properties.setProperty(SUBSCRIBE_MODE_NAME, "assign"); return properties; }参数
说明
获取方式
USER_NAME
消费组的账号。
警告如您未使用本文提供的客户端,请按照
<消费组的账号>-<消费组ID>的格式设置用户名(例如:dtstest-dtsae******bpv),否则无法正常连接。在DTS控制台单击目标订阅实例ID,然后单击数据消费,您可以获取到消费组ID和消费组的账号信息。
说明消费组账号的密码已在您新建消费组时指定。
PASSWORD_NAME
该账号的密码。
SID_NAME
消费组ID。
GROUP_NAME
消费组名称,需保持和消费组ID相同(即本参数也填入消费组ID)。
KAFKA_TOPIC
数据订阅通道的订阅Topic。
在DTS控制台单击目标订阅实例ID,在任务管理页面,您可以获取到订阅Topic、网络地址信息。在DTS数据订阅任务详情页的基本信息区获取订阅Topic值,在网络区获取VPC网络地址(格式示例:
xxx.aliyuncs.com:18003)。KAFKA_BROKER_URL_NAME
数据订阅通道的网络地址信息。
说明-
如果您部署Kafka客户端所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。
鉴于网络稳定性因素,不建议使用公网地址。
INITIAL_CHECKPOINT_NAME
消费的数据时间点,格式为Unix时间戳,例如1592269238。
说明-
您需要自行保存时间点信息,以便:
-
当业务程序中断后,传入已消费的数据时间点继续消费数据,防止数据丢失。
-
在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。
-
-
若SUBSCRIBE_MODE_NAME为subscribe时,传入的INITIAL_CHECKPOINT_NAME仅在订阅客户端首次启动时生效。
消费的数据时间点必须在订阅实例的数据范围之内,并需转化为Unix时间戳。在 DTS 订阅任务列表中,可查看对应订阅任务的数据范围字段,该字段显示当前可消费数据的起止时间,据此可确定 INITIAL_CHECKPOINT_NAME 的有效取值范围。
说明Unix时间戳转换工具可用搜索引擎获取。
USE_CONFIG_CHECKPOINT_NAME
默认取值为true,即强制使用指定的数据时间点来消费数据,避免丢失已接收到的但未处理的数据。
无
SUBSCRIBE_MODE_NAME
一个消费组下支持同时启动两个及以上Kafka客户端,如需实现该功能,请将所有客户端该参数的值设置为subscribe。
默认值为assign,即不使用该功能,建议只部署一个客户端。
无
-
-
在IntelliJ IDEA软件界面的顶部,选择运行该客户端。
说明首次运行时,软件需要一定时间自动加载相关依赖包并完成安装。
执行结果
运行结果该客户端可正常订阅到源库的数据变更信息。
[2020-03-09 10:41:52,408] INFO [Consumer clientId=consumer-1, groupId=dts_xxx] Discovered coordinator xxx (id: xxx rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-03-09 10:41:57,203] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721711, offset: 1732521, info: 1583721711] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:41:57,571] INFO EtlRecordProcessor: haven't receive records from generator for 5s (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:02,203] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721721, offset: 1732539, info: 1583721721] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:07,204] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721726, offset: 1732544, info: 1583721726] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:12,205] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721731, offset: 1732548, info: 1583721731] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:17,205] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721736, offset: 1732554, info: 1583721736] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:22,205] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721741, offset: 1732559, info: 1583721741] (recordprocessor.EtlRecordProcessor)
[2020-03-09 10:42:27,206] INFO commit record with checkpoint Checkpoint[ topicPartition: cn_hangzhou_rm_xxx_dtstest-0timestamp: 1583721746, offset: 1732569, info: 1583721746] (recordprocessor.EtlRecordProcessor)
您也可以去除NotifyDemoDB.java文件中的打印日志详情的注释(即删除第25行//log.info(ret);中的//),然后再次运行该客户端即可查看详细的数据变更信息。
常见问题
-
Q:为什么需要自行记录客户端的消费位点?
A:由于DTS记录的消费位点是接收到Kafka消费客户端执行commit操作的时间点,可能与当前实际消费到的时间点存在一定的时间差。当业务程序或Kafka消费客户端异常中断后,您可以传入自行记录的消费位点以继续消费,避免消费到重复的数据或缺失部分数据。
MySQL字段类型与dataTypeNumber数值的对应关系
|
MySQL字段类型 |
对应dataTypeNumber数值 |
|
MYSQL_TYPE_DECIMAL |
0 |
|
MYSQL_TYPE_INT8 |
1 |
|
MYSQL_TYPE_INT16 |
2 |
|
MYSQL_TYPE_INT32 |
3 |
|
MYSQL_TYPE_FLOAT |
4 |
|
MYSQL_TYPE_DOUBLE |
5 |
|
MYSQL_TYPE_NULL |
6 |
|
MYSQL_TYPE_TIMESTAMP |
7 |
|
MYSQL_TYPE_INT64 |
8 |
|
MYSQL_TYPE_INT24 |
9 |
|
MYSQL_TYPE_DATE |
10 |
|
MYSQL_TYPE_TIME |
11 |
|
MYSQL_TYPE_DATETIME |
12 |
|
MYSQL_TYPE_YEAR |
13 |
|
MYSQL_TYPE_DATE_NEW |
14 |
|
MYSQL_TYPE_VARCHAR |
15 |
|
MYSQL_TYPE_BIT |
16 |
|
MYSQL_TYPE_TIMESTAMP_NEW |
17 |
|
MYSQL_TYPE_DATETIME_NEW |
18 |
|
MYSQL_TYPE_TIME_NEW |
19 |
|
MYSQL_TYPE_JSON |
245 |
|
MYSQL_TYPE_DECIMAL_NEW |
246 |
|
MYSQL_TYPE_ENUM |
247 |
|
MYSQL_TYPE_SET |
248 |
|
MYSQL_TYPE_TINY_BLOB |
249 |
|
MYSQL_TYPE_MEDIUM_BLOB |
250 |
|
MYSQL_TYPE_LONG_BLOB |
251 |
|
MYSQL_TYPE_BLOB |
252 |
|
MYSQL_TYPE_VAR_STRING |
253 |
|
MYSQL_TYPE_STRING |
254 |
|
MYSQL_TYPE_GEOMETRY |
255 |
Oracle字段类型与dataTypeNumber数值的对应关系
|
Oracle字段类型 |
对应dataTypeNumber数值 |
|
VARCHAR2/NVARCHAR2 |
1 |
|
NUMBER/FLOAT |
2 |
|
LONG |
8 |
|
DATE |
12 |
|
RAW |
23 |
|
LONG_RAW |
24 |
|
UNDEFINED |
29 |
|
XMLTYPE |
58 |
|
ROWID |
69 |
|
CHAR、NCHAR |
96 |
|
BINARY_FLOAT |
100 |
|
BINARY_DOUBLE |
101 |
|
CLOB/NCLOB |
112 |
|
BLOB |
113 |
|
BFILE |
114 |
|
TIMESTAMP |
180 |
|
TIMESTAMP_WITH_TIME_ZONE |
181 |
|
INTERVAL_YEAR_TO_MONTH |
182 |
|
INTERVAL_DAY_TO_SECOND |
183 |
|
UROWID |
208 |
|
TIMESTAMP_WITH_LOCAL_TIME_ZONE |
231 |
PostgreSQL字段类型与dataTypeNumber数值的对应关系
|
PostgreSQL字段类型 |
对应dataTypeNumber数值 |
|
INT2/SMALLINT |
21 |
|
INT4/INTEGER/SERIAL |
23 |
|
INT8/BIGINT |
20 |
|
CHARACTER |
18 |
|
CHARACTER VARYING |
1043 |
|
REAL |
700 |
|
DOUBLE PRECISION |
701 |
|
NUMERIC |
1700 |
|
MONEY |
790 |
|
DATE |
1082 |
|
TIME/TIME WITHOUT TIME ZONE |
1083 |
|
TIME WITH TIME ZONE |
1266 |
|
TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE |
1114 |
|
TIMESTAMP WITH TIME ZONE |
1184 |
|
BYTEA |
17 |
|
TEXT |
25 |
|
JSON |
114 |
|
JSONB |
3082 |
|
XML |
142 |
|
UUID |
2950 |
|
POINT |
600 |
|
LSEG |
601 |
|
PATH |
602 |
|
BOX |
603 |
|
POLYGON |
604 |
|
LINE |
628 |
|
CIDR |
650 |
|
CIRCLE |
718 |
|
MACADDR |
829 |
|
INET |
869 |
|
INTERVAL |
1186 |
|
TXID_SNAPSHOT |
2970 |
|
PG_LSN |
3220 |
|
TSVECTOR |
3614 |
|
TSQUERY |
3615 |