本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
新版数据订阅支持使用0.11版本至2.7版本的Kafka客户端消费订阅数据,DTS为您提供了Kafka客户端Demo,本文将介绍该客户端的使用说明。
注意事项
使用本文提供的Demo消费数据时,如果采用auto commit(自动提交),可能会因为数据还没被消费完就执行了提交操作,从而丢失部分数据,建议采用手动提交的方式以避免该问题。
说明如果发生故障没有提交成功,重启客户端后会从上一个记录的位点进行数据消费,期间会有部分重复数据,您需要手动过滤。
数据以Avro序列化存储,详细格式请参见Record.avsc文档。
警告如果您使用的不是本文提供的Kafka客户端,在进行反序列化解析(DTS Avro的反序列化示例)时,可能出现解析的数据有误,您需要自行验证数据的正确性。
关于
offsetForTimes接口,DTS的搜索单位为秒,原生Kafka的搜索单位为毫秒。由于数据订阅服务端会因容灾等原因导致网络闪断,若您未使用本文提供的Kafka客户端,您使用的Kafka客户端需具备网络重试能力。
若您使用原生的Kafka客户端消费订阅数据,则可能会在DTS发生增量数据采集模块切换行为,从而使subscribe模式下订阅客户端保存在服务端的消费位点被清除,您需要手动调整订阅的消费位点以实现按需消费数据。若您需要使用subscribe模式建议使用DTS提供的订阅SDK消费订阅数据,或者自行管理位点,详情请参见使用SDK消费订阅数据和管理位点。
Kafka客户端运行流程说明
请下载Kafka客户端Demo代码。更多关于代码使用的详细介绍,请参见Demo中的Readme文档。
单击
,然后选择Download ZIP下载文件。如需使用Kafka客户端2.0版本,您需要修改subscribe_example-master/javaimpl/pom.xml文件,将kafka客户端的版本号修改成2.0.0。

表 1. 运行流程说明
步骤  | 相关目录或文件  | 
1、使用原生的Kafka consumer从数据订阅通道中获取增量数据。  | subscribe_example-master/javaimpl/src/main/java/recordgenerator/  | 
2、将获取的增量数据镜像执行反序列化,并从中获取 前镜像 、 后镜像 和其他属性。 警告  
  | subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java  | 
3、将反序列化后的数据中的dataTypeNumber字段转换为对应数据库的字段类型。 说明  关于对应关系的详情,请参见字段类型与dataTypeNumber数值的对应关系。  | subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/  | 
操作步骤
本文以IntelliJ IDEA软件(Community Edition 2018.1.4 Windows版本)为例,介绍如何运行该客户端消费订阅通道中的数据。
创建新版数据订阅通道,详情请参见订阅方案概览中的相关配置文档。
创建一个或多个消费组,详情请参见新增消费组。
下载Kafka客户端Demo代码,然后解压该文件。
说明单击
,然后选择Download ZIP下载文件。打开IntelliJ IDEA软件,然后单击Open。

在弹出的对话框中,定位至Kafka客户端Demo代码下载的目录,参照下图依次展开文件夹,找到项目对象模型文件:pom.xml。

在弹出对话框中,选择Open as Project。
在IntelliJ IDEA软件界面,依次展开文件夹,找到并双击打开Kafka客户端Demo文件:NotifyDemoDB.java。
设置NotifyDemoDB.java文件中的各参数对应的值。

参数
说明
获取方式
USER_NAME
消费组的账号。
警告如您未使用本文提供的客户端,请按照
<消费组的账号>-<消费组ID>的格式设置用户名(例如:dtstest-dtsae******bpv),否则无法正常连接。在DTS控制台单击目标订阅实例ID,然后单击左侧导航栏的数据消费,您可以获取到消费组ID/名称和消费组的账号信息。
说明消费组账号的密码已在您新建消费组时指定。
PASSWORD_NAME
该账号的密码。
SID_NAME
消费组ID。
GROUP_NAME
消费组名称,需保持和消费组ID相同(即本参数也填入消费组ID)。
KAFKA_TOPIC
数据订阅通道的订阅Topic。
在DTS控制台单击目标订阅实例ID,在基本信息页面,您可以获取到订阅Topic和网络信息。
KAFKA_BROKER_URL_NAME
数据订阅通道的网络地址信息。
说明如果您部署Kafka客户端所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。
鉴于网络稳定性因素,不建议使用公网地址。
INITIAL_CHECKPOINT_NAME
消费的数据时间点,格式为Unix时间戳,例如1592269238。
说明您需要自行保存时间点信息,以便:
当业务程序中断后,传入已消费的数据时间点继续消费数据,防止数据丢失。
在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。
若SUBSCRIBE_MODE_NAME为subscribe时,传入的INITIAL_CHECKPOINT_NAME仅在订阅客户端首次启动时生效。
消费的数据时间点必须在订阅实例的数据范围之内,并需转化为Unix时间戳。
说明您可以在订阅任务列表的数据范围列,查看订阅实例的数据范围。
Unix时间戳转换工具可用搜索引擎获取。
USE_CONFIG_CHECKPOINT_NAME
默认取值为true,即强制使用指定的数据时间点来消费数据,避免丢失已接收到的但未处理的数据。
无
SUBSCRIBE_MODE_NAME
一个消费组下支持同时启动两个及以上Kafka客户端,如需实现该功能,请将所有客户端该参数的值设置为subscribe。
默认值为assign,即不使用该功能,建议只部署一个客户端。
无
在IntelliJ IDEA软件界面的顶部,选择运行该客户端。
说明首次运行时,软件需要一定时间自动加载相关依赖包并完成安装。
执行结果
运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。

您也可以去除NotifyDemoDB.java文件中的打印日志详情的注释(即删除第25行//log.info(ret);中的//),然后再次运行该客户端即可查看详细的数据变更信息。

常见问题
Q:为什么需要自行记录客户端的消费位点?
A:由于DTS记录的消费位点是接收到Kafka消费客户端执行commit操作的时间点,可能与当前实际消费到的时间点存在一定的时间差。当业务程序或Kafka消费客户端异常中断后,您可以传入自行记录的消费位点以继续消费,避免消费到重复的数据或缺失部分数据。
管理位点
配置订阅客户端监听DTS的数据采集模块的切换行为。
通过配置订阅客户端的Consumer的properties,监听DTS的数据采集模块的切换行为,实现方法的主要内容如下所示:
properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());ClusterSwitchListener的实现方法如下所示:
public class ClusterSwitchListener implements ClusterResourceListener, ConsumerInterceptor { private final static Logger LOG = LoggerFactory.getLogger(ClusterSwitchListener.class); private ClusterResource originClusterResource = null; private ClusterResource currentClusterResource = null; public ConsumerRecords onConsume(ConsumerRecords records) { return records; } public void close() { } public void onCommit(Map offsets) { } public void onUpdate(ClusterResource clusterResource) { synchronized (this) { originClusterResource = currentClusterResource; currentClusterResource = clusterResource; if (null == originClusterResource) { LOG.info("Cluster updated to " + currentClusterResource.clusterId()); } else { if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) { LOG.info("Cluster not changed on update:" + clusterResource.clusterId()); } else { LOG.error("Cluster changed"); throw new ClusterSwitchException("Cluster changed from " + originClusterResource.clusterId() + " to " + currentClusterResource.clusterId() + ", consumer require restart"); } } } } public boolean isClusterResourceChanged() { if (null == originClusterResource) { return false; } if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) { return false; } return true; } public void configure(Map<String, ?> configs) { } public static class ClusterSwitchException extends KafkaException { public ClusterSwitchException(String message, Throwable cause) { super(message, cause); } public ClusterSwitchException(String message) { super(message); } public ClusterSwitchException(Throwable cause) { super(cause); } public ClusterSwitchException() { super(); } }处理捕获到DTS数据采集模块的切换行为。
将实际消费的最后一条订阅数据的时间位点(timestamp),设置为下一次订阅的初始位点,实现方法的主要内容如下所示:
try{ //do some action } catch (ClusterSwitchListener.ClusterSwitchException e) { reset(); } //重置位点 public reset() { long offset = kafkaConsumer.offsetsForTimes(timestamp); kafkaConsumer.seek(tp,offset); }说明实现方法示例,请参见KafkaRecordFetcher。
字段类型与dataTypeNumber数值的对应关系
MySQL字段类型与dataTypeNumber数值的对应关系
MySQL字段类型  | 对应dataTypeNumber数值  | 
MYSQL_TYPE_DECIMAL  | 0  | 
MYSQL_TYPE_INT8  | 1  | 
MYSQL_TYPE_INT16  | 2  | 
MYSQL_TYPE_INT32  | 3  | 
MYSQL_TYPE_FLOAT  | 4  | 
MYSQL_TYPE_DOUBLE  | 5  | 
MYSQL_TYPE_NULL  | 6  | 
MYSQL_TYPE_TIMESTAMP  | 7  | 
MYSQL_TYPE_INT64  | 8  | 
MYSQL_TYPE_INT24  | 9  | 
MYSQL_TYPE_DATE  | 10  | 
MYSQL_TYPE_TIME  | 11  | 
MYSQL_TYPE_DATETIME  | 12  | 
MYSQL_TYPE_YEAR  | 13  | 
MYSQL_TYPE_DATE_NEW  | 14  | 
MYSQL_TYPE_VARCHAR  | 15  | 
MYSQL_TYPE_BIT  | 16  | 
MYSQL_TYPE_TIMESTAMP_NEW  | 17  | 
MYSQL_TYPE_DATETIME_NEW  | 18  | 
MYSQL_TYPE_TIME_NEW  | 19  | 
MYSQL_TYPE_JSON  | 245  | 
MYSQL_TYPE_DECIMAL_NEW  | 246  | 
MYSQL_TYPE_ENUM  | 247  | 
MYSQL_TYPE_SET  | 248  | 
MYSQL_TYPE_TINY_BLOB  | 249  | 
MYSQL_TYPE_MEDIUM_BLOB  | 250  | 
MYSQL_TYPE_LONG_BLOB  | 251  | 
MYSQL_TYPE_BLOB  | 252  | 
MYSQL_TYPE_VAR_STRING  | 253  | 
MYSQL_TYPE_STRING  | 254  | 
MYSQL_TYPE_GEOMETRY  | 255  | 
Oracle字段类型与dataTypeNumber数值的对应关系
Oracle字段类型  | 对应dataTypeNumber数值  | 
VARCHAR2/NVARCHAR2  | 1  | 
NUMBER/FLOAT  | 2  | 
LONG  | 8  | 
DATE  | 12  | 
RAW  | 23  | 
LONG_RAW  | 24  | 
UNDEFINED  | 29  | 
XMLTYPE  | 58  | 
ROWID  | 69  | 
CHAR、NCHAR  | 96  | 
BINARY_FLOAT  | 100  | 
BINARY_DOUBLE  | 101  | 
CLOB/NCLOB  | 112  | 
BLOB  | 113  | 
BFILE  | 114  | 
TIMESTAMP  | 180  | 
TIMESTAMP_WITH_TIME_ZONE  | 181  | 
INTERVAL_YEAR_TO_MONTH  | 182  | 
INTERVAL_DAY_TO_SECOND  | 183  | 
UROWID  | 208  | 
TIMESTAMP_WITH_LOCAL_TIME_ZONE  | 231  | 
PostgreSQL字段类型与dataTypeNumber数值的对应关系
PostgreSQL字段类型  | 对应dataTypeNumber数值  | 
INT2/SMALLINT  | 21  | 
INT4/INTEGER/SERIAL  | 23  | 
INT8/BIGINT  | 20  | 
CHARACTER  | 18  | 
CHARACTER VARYING  | 1043  | 
REAL  | 700  | 
DOUBLE PRECISION  | 701  | 
NUMERIC  | 1700  | 
MONEY  | 790  | 
DATE  | 1082  | 
TIME/TIME WITHOUT TIME ZONE  | 1083  | 
TIME WITH TIME ZONE  | 1266  | 
TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE  | 1114  | 
TIMESTAMP WITH TIME ZONE  | 1184  | 
BYTEA  | 17  | 
TEXT  | 25  | 
JSON  | 114  | 
JSONB  | 3082  | 
XML  | 142  | 
UUID  | 2950  | 
POINT  | 600  | 
LSEG  | 601  | 
PATH  | 602  | 
BOX  | 603  | 
POLYGON  | 604  | 
LINE  | 628  | 
CIDR  | 650  | 
CIRCLE  | 718  | 
MACADDR  | 829  | 
INET  | 869  | 
INTERVAL  | 1186  | 
TXID_SNAPSHOT  | 2970  | 
PG_LSN  | 3220  | 
TSVECTOR  | 3614  | 
TSQUERY  | 3615  |