新版数据订阅支持使用0.11版本至2.0版本的Kafka客户端消费订阅数据,DTS为您提供了Kafka客户端Demo,本文将介绍该客户端的使用说明。
注意事项
- 使用本文提供的Demo消费数据时,如果采用auto commit(自动提交),可能会因为数据还没被消费完就执行了提交操作,从而丢失部分数据,建议采用手动提交的方式以避免该问题。
说明 如果发生故障没有提交成功,重启客户端后会从上一个记录的位点进行数据消费,期间会有部分重复数据,您需要手动过滤。
- 数据以Avro序列化存储,详细格式请参见Record.avsc文档。
警告 如果您使用的不是本文提供的Kafka客户端,在进行反序列化解析时,可能出现解析的数据有误,您需要自行验证数据的正确性。
- 关于
offsetForTimes
接口,DTS的搜索单位为秒,原生Kafka的搜索单位为毫秒。
Kafka客户端运行流程说明
请下载Kafka客户端Demo代码。更多关于代码使用的详细介绍,请参见Demo中的Readme文档。
说明
- 单击
,然后选择Download ZIP下载文件。
- 如需使用Kafka客户端2.0版本,您需要修改subscribe_example-master/javaimpl/pom.xml文件,将kafka客户端的版本号修改成2.0.0。

步骤 | 相关目录或文件 |
---|---|
1、使用原生的Kafka consumer从数据订阅通道中获取增量数据。 | subscribe_example-master/javaimpl/src/main/java/recordgenerator/ |
2、将获取的增量数据镜像执行反序列化,并从中获取
前镜像
、
后镜像
和其他属性。
警告
|
subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java |
3、将反序列化后的数据中的dataTypeNumber字段转换为对应数据库的字段类型。 | subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/ |
操作步骤
本文以IntelliJ IDEA软件(Community Edition 2018.1.4 Windows版本)为例,介绍如何运行该客户端消费订阅通道中的数据。
执行结果
运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。

您也可以去除NotifyDemoDB.java文件中的打印日志详情的注释(即删除第25行//log.info(ret);
中的//
),然后再次运行该客户端即可查看详细的数据变更信息。

常见问题
- Q:为什么需要自行记录客户端的消费位点?
A:由于DTS记录的消费位点是接收到Kafka消费客户端执行commit操作的时间点,可能与当前实际消费到的时间点存在一定的时间差。当业务程序或Kafka消费客户端异常中断后,您可以传入自行记录的消费位点以继续消费,避免消费到重复的数据或缺失部分数据。
MySQL字段类型与dataTypeNumber数值的对应关系
MySQL字段类型 | 对应dataTypeNumber数值 |
---|---|
MYSQL_TYPE_DECIMAL | 0 |
MYSQL_TYPE_INT8 | 1 |
MYSQL_TYPE_INT16 | 2 |
MYSQL_TYPE_INT32 | 3 |
MYSQL_TYPE_FLOAT | 4 |
MYSQL_TYPE_DOUBLE | 5 |
MYSQL_TYPE_NULL | 6 |
MYSQL_TYPE_TIMESTAMP | 7 |
MYSQL_TYPE_INT64 | 8 |
MYSQL_TYPE_INT24 | 9 |
MYSQL_TYPE_DATE | 10 |
MYSQL_TYPE_TIME | 11 |
MYSQL_TYPE_DATETIME | 12 |
MYSQL_TYPE_YEAR | 13 |
MYSQL_TYPE_DATE_NEW | 14 |
MYSQL_TYPE_VARCHAR | 15 |
MYSQL_TYPE_BIT | 16 |
MYSQL_TYPE_TIMESTAMP_NEW | 17 |
MYSQL_TYPE_DATETIME_NEW | 18 |
MYSQL_TYPE_TIME_NEW | 19 |
MYSQL_TYPE_JSON | 245 |
MYSQL_TYPE_DECIMAL_NEW | 246 |
MYSQL_TYPE_ENUM | 247 |
MYSQL_TYPE_SET | 248 |
MYSQL_TYPE_TINY_BLOB | 249 |
MYSQL_TYPE_MEDIUM_BLOB | 250 |
MYSQL_TYPE_LONG_BLOB | 251 |
MYSQL_TYPE_BLOB | 252 |
MYSQL_TYPE_VAR_STRING | 253 |
MYSQL_TYPE_STRING | 254 |
MYSQL_TYPE_GEOMETRY | 255 |
Oracle字段类型与dataTypeNumber数值的对应关系
Oracle字段类型 | 对应dataTypeNumber数值 |
---|---|
VARCHAR2/NVARCHAR2 | 1 |
NUMBER/FLOAT | 2 |
LONG | 8 |
DATE | 12 |
RAW | 23 |
LONG_RAW | 24 |
UNDEFINED | 29 |
XMLTYPE | 58 |
ROWID | 69 |
CHAR、NCHAR | 96 |
BINARY_FLOAT | 100 |
BINARY_DOUBLE | 101 |
CLOB/NCLOB | 112 |
BLOB | 113 |
BFILE | 114 |
TIMESTAMP | 180 |
TIMESTAMP_WITH_TIME_ZONE | 181 |
INTERVAL_YEAR_TO_MONTH | 182 |
INTERVAL_DAY_TO_SECOND | 183 |
UROWID | 208 |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | 231 |
PostgreSQL字段类型与dataTypeNumber数值的对应关系
PostgreSQL字段类型 | 对应dataTypeNumber数值 |
---|---|
INT2/SMALLINT | 21 |
INT4/INTEGER/SERIAL | 23 |
INT8/BIGINT | 20 |
CHARACTER | 18 |
CHARACTER VARYING | 1043 |
REAL | 700 |
DOUBLE PRECISION | 701 |
NUMERIC | 1700 |
MONEY | 790 |
DATE | 1082 |
TIME/TIME WITHOUT TIME ZONE | 1083 |
TIME WITH TIME ZONE | 1266 |
TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE | 1114 |
TIMESTAMP WITH TIME ZONE | 1184 |
BYTEA | 17 |
TEXT | 25 |
JSON | 114 |
JSONB | 3082 |
XML | 142 |
UUID | 2950 |
POINT | 600 |
LSEG | 601 |
PATH | 602 |
BOX | 603 |
POLYGON | 604 |
LINE | 628 |
CIDR | 650 |
CIRCLE | 718 |
MACADDR | 829 |
INET | 869 |
INTERVAL | 1186 |
TXID_SNAPSHOT | 2970 |
PG_LSN | 3220 |
TSVECTOR | 3614 |
TSQUERY | 3615 |