本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
新版数据订阅支持使用0.11版本至2.7版本的Kafka客户端消费订阅数据,DTS为您提供了Kafka客户端Demo,本文将介绍该客户端的使用说明。
注意事项
- 使用本文提供的Demo消费数据时,如果采用auto commit(自动提交),可能会因为数据还没被消费完就执行了提交操作,从而丢失部分数据,建议采用手动提交的方式以避免该问题。 说明- 如果发生故障没有提交成功,重启客户端后会从上一个记录的位点进行数据消费,期间会有部分重复数据,您需要手动过滤。 
- 数据以Avro序列化存储,详细格式请参见Record.avsc文档。 警告- 如果您使用的不是本文提供的Kafka客户端,在进行反序列化解析(DTS Avro的反序列化示例)时,可能出现解析的数据有误,您需要自行验证数据的正确性。 
- 关于 - offsetForTimes接口,DTS的搜索单位为秒,原生Kafka的搜索单位为毫秒。
- 由于数据订阅服务端会因容灾等原因导致网络闪断,若您未使用本文提供的Kafka客户端,您使用的Kafka客户端需具备网络重试能力。 
- 若您使用原生的Kafka客户端消费订阅数据,则可能会在DTS发生增量数据采集模块切换行为,从而使subscribe模式下订阅客户端保存在服务端的消费位点被清除,您需要手动调整订阅的消费位点以实现按需消费数据。若您需要使用subscribe模式建议使用DTS提供的订阅SDK消费订阅数据,或者自行管理位点,详情请参见使用SDK消费订阅数据和管理位点。 
Kafka客户端运行流程说明
请下载Kafka客户端Demo代码。更多关于代码使用的详细介绍,请参见Demo中的Readme文档。
- 单击  ,然后选择Download ZIP下载文件。 ,然后选择Download ZIP下载文件。
- 如需使用Kafka客户端2.0版本,您需要修改subscribe_example-master/javaimpl/pom.xml文件,将kafka客户端的版本号修改成2.0.0。 

表 1. 运行流程说明
| 步骤 | 相关目录或文件 | 
| 1、使用原生的Kafka consumer从数据订阅通道中获取增量数据。 | subscribe_example-master/javaimpl/src/main/java/recordgenerator/ | 
| 2、将获取的增量数据镜像执行反序列化,并从中获取 前镜像 、 后镜像 和其他属性。 警告  
 | subscribe_example-master/javaimpl/src/main/java/boot/RecordPrinter.java | 
| 3、将反序列化后的数据中的dataTypeNumber字段转换为对应数据库的字段类型。 说明  关于对应关系的详情,请参见字段类型与dataTypeNumber数值的对应关系。 | subscribe_example-master/javaimpl/src/main/java/recordprocessor/mysql/ | 
操作步骤
本文以IntelliJ IDEA软件(Community Edition 2018.1.4 Windows版本)为例,介绍如何运行该客户端消费订阅通道中的数据。
- 创建新版数据订阅通道,详情请参见订阅方案概览中的相关配置文档。 
- 创建一个或多个消费组,详情请参见新增消费组。 
- 下载Kafka客户端Demo代码,然后解压该文件。 说明- 单击  ,然后选择Download ZIP下载文件。 ,然后选择Download ZIP下载文件。
- 打开IntelliJ IDEA软件,然后单击Open。  
- 在弹出的对话框中,定位至Kafka客户端Demo代码下载的目录,参照下图依次展开文件夹,找到项目对象模型文件:pom.xml。  
- 在弹出对话框中,选择Open as Project。 
- 在IntelliJ IDEA软件界面,依次展开文件夹,找到并双击打开Kafka客户端Demo文件:NotifyDemoDB.java。 
- 设置NotifyDemoDB.java文件中的各参数对应的值。  - 参数 - 说明 - 获取方式 - USER_NAME - 消费组的账号。 警告- 如您未使用本文提供的客户端,请按照 - <消费组的账号>-<消费组ID>的格式设置用户名(例如:- dtstest-dtsae******bpv),否则无法正常连接。- 在DTS控制台单击目标订阅实例ID,然后单击左侧导航栏的数据消费,您可以获取到消费组ID/名称和消费组的账号信息。 说明- 消费组账号的密码已在您新建消费组时指定。 - PASSWORD_NAME - 该账号的密码。 - SID_NAME - 消费组ID。 - GROUP_NAME - 消费组名称,需保持和消费组ID相同(即本参数也填入消费组ID)。 - KAFKA_TOPIC - 数据订阅通道的订阅Topic。 - 在DTS控制台单击目标订阅实例ID,在基本信息页面,您可以获取到订阅Topic和网络信息。 - KAFKA_BROKER_URL_NAME - 数据订阅通道的网络地址信息。 说明- 如果您部署Kafka客户端所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。 
- 鉴于网络稳定性因素,不建议使用公网地址。 
 - INITIAL_CHECKPOINT_NAME - 消费的数据时间点,格式为Unix时间戳,例如1592269238。 说明- 您需要自行保存时间点信息,以便: - 当业务程序中断后,传入已消费的数据时间点继续消费数据,防止数据丢失。 
- 在订阅客户端启动时,传入所需的消费位点,调整订阅位点,实现按需消费数据。 
 
- 若SUBSCRIBE_MODE_NAME为subscribe时,传入的INITIAL_CHECKPOINT_NAME仅在订阅客户端首次启动时生效。 
 - 消费的数据时间点必须在订阅实例的数据范围之内,并需转化为Unix时间戳。 说明- 您可以在订阅任务列表的数据范围列,查看订阅实例的数据范围。 
- Unix时间戳转换工具可用搜索引擎获取。 
 - USE_CONFIG_CHECKPOINT_NAME - 默认取值为true,即强制使用指定的数据时间点来消费数据,避免丢失已接收到的但未处理的数据。 - 无 - SUBSCRIBE_MODE_NAME - 一个消费组下支持同时启动两个及以上Kafka客户端,如需实现该功能,请将所有客户端该参数的值设置为subscribe。 - 默认值为assign,即不使用该功能,建议只部署一个客户端。 - 无 
- 在IntelliJ IDEA软件界面的顶部,选择运行该客户端。 说明- 首次运行时,软件需要一定时间自动加载相关依赖包并完成安装。 
执行结果
运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。

您也可以去除NotifyDemoDB.java文件中的打印日志详情的注释(即删除第25行//log.info(ret);中的//),然后再次运行该客户端即可查看详细的数据变更信息。

常见问题
- Q:为什么需要自行记录客户端的消费位点? - A:由于DTS记录的消费位点是接收到Kafka消费客户端执行commit操作的时间点,可能与当前实际消费到的时间点存在一定的时间差。当业务程序或Kafka消费客户端异常中断后,您可以传入自行记录的消费位点以继续消费,避免消费到重复的数据或缺失部分数据。 
管理位点
- 配置订阅客户端监听DTS的数据采集模块的切换行为。 - 通过配置订阅客户端的Consumer的properties,监听DTS的数据采集模块的切换行为,实现方法的主要内容如下所示: - properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ClusterSwitchListener.class.getName());- ClusterSwitchListener的实现方法如下所示: - public class ClusterSwitchListener implements ClusterResourceListener, ConsumerInterceptor { private final static Logger LOG = LoggerFactory.getLogger(ClusterSwitchListener.class); private ClusterResource originClusterResource = null; private ClusterResource currentClusterResource = null; public ConsumerRecords onConsume(ConsumerRecords records) { return records; } public void close() { } public void onCommit(Map offsets) { } public void onUpdate(ClusterResource clusterResource) { synchronized (this) { originClusterResource = currentClusterResource; currentClusterResource = clusterResource; if (null == originClusterResource) { LOG.info("Cluster updated to " + currentClusterResource.clusterId()); } else { if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) { LOG.info("Cluster not changed on update:" + clusterResource.clusterId()); } else { LOG.error("Cluster changed"); throw new ClusterSwitchException("Cluster changed from " + originClusterResource.clusterId() + " to " + currentClusterResource.clusterId() + ", consumer require restart"); } } } } public boolean isClusterResourceChanged() { if (null == originClusterResource) { return false; } if (originClusterResource.clusterId().equals(currentClusterResource.clusterId())) { return false; } return true; } public void configure(Map<String, ?> configs) { } public static class ClusterSwitchException extends KafkaException { public ClusterSwitchException(String message, Throwable cause) { super(message, cause); } public ClusterSwitchException(String message) { super(message); } public ClusterSwitchException(Throwable cause) { super(cause); } public ClusterSwitchException() { super(); } }
- 处理捕获到DTS数据采集模块的切换行为。 - 将实际消费的最后一条订阅数据的时间位点(timestamp),设置为下一次订阅的初始位点,实现方法的主要内容如下所示: - try{ //do some action } catch (ClusterSwitchListener.ClusterSwitchException e) { reset(); } //重置位点 public reset() { long offset = kafkaConsumer.offsetsForTimes(timestamp); kafkaConsumer.seek(tp,offset); }说明- 实现方法示例,请参见KafkaRecordFetcher。 
字段类型与dataTypeNumber数值的对应关系
MySQL字段类型与dataTypeNumber数值的对应关系
| MySQL字段类型 | 对应dataTypeNumber数值 | 
| MYSQL_TYPE_DECIMAL | 0 | 
| MYSQL_TYPE_INT8 | 1 | 
| MYSQL_TYPE_INT16 | 2 | 
| MYSQL_TYPE_INT32 | 3 | 
| MYSQL_TYPE_FLOAT | 4 | 
| MYSQL_TYPE_DOUBLE | 5 | 
| MYSQL_TYPE_NULL | 6 | 
| MYSQL_TYPE_TIMESTAMP | 7 | 
| MYSQL_TYPE_INT64 | 8 | 
| MYSQL_TYPE_INT24 | 9 | 
| MYSQL_TYPE_DATE | 10 | 
| MYSQL_TYPE_TIME | 11 | 
| MYSQL_TYPE_DATETIME | 12 | 
| MYSQL_TYPE_YEAR | 13 | 
| MYSQL_TYPE_DATE_NEW | 14 | 
| MYSQL_TYPE_VARCHAR | 15 | 
| MYSQL_TYPE_BIT | 16 | 
| MYSQL_TYPE_TIMESTAMP_NEW | 17 | 
| MYSQL_TYPE_DATETIME_NEW | 18 | 
| MYSQL_TYPE_TIME_NEW | 19 | 
| MYSQL_TYPE_JSON | 245 | 
| MYSQL_TYPE_DECIMAL_NEW | 246 | 
| MYSQL_TYPE_ENUM | 247 | 
| MYSQL_TYPE_SET | 248 | 
| MYSQL_TYPE_TINY_BLOB | 249 | 
| MYSQL_TYPE_MEDIUM_BLOB | 250 | 
| MYSQL_TYPE_LONG_BLOB | 251 | 
| MYSQL_TYPE_BLOB | 252 | 
| MYSQL_TYPE_VAR_STRING | 253 | 
| MYSQL_TYPE_STRING | 254 | 
| MYSQL_TYPE_GEOMETRY | 255 | 
Oracle字段类型与dataTypeNumber数值的对应关系
| Oracle字段类型 | 对应dataTypeNumber数值 | 
| VARCHAR2/NVARCHAR2 | 1 | 
| NUMBER/FLOAT | 2 | 
| LONG | 8 | 
| DATE | 12 | 
| RAW | 23 | 
| LONG_RAW | 24 | 
| UNDEFINED | 29 | 
| XMLTYPE | 58 | 
| ROWID | 69 | 
| CHAR、NCHAR | 96 | 
| BINARY_FLOAT | 100 | 
| BINARY_DOUBLE | 101 | 
| CLOB/NCLOB | 112 | 
| BLOB | 113 | 
| BFILE | 114 | 
| TIMESTAMP | 180 | 
| TIMESTAMP_WITH_TIME_ZONE | 181 | 
| INTERVAL_YEAR_TO_MONTH | 182 | 
| INTERVAL_DAY_TO_SECOND | 183 | 
| UROWID | 208 | 
| TIMESTAMP_WITH_LOCAL_TIME_ZONE | 231 | 
PostgreSQL字段类型与dataTypeNumber数值的对应关系
| PostgreSQL字段类型 | 对应dataTypeNumber数值 | 
| INT2/SMALLINT | 21 | 
| INT4/INTEGER/SERIAL | 23 | 
| INT8/BIGINT | 20 | 
| CHARACTER | 18 | 
| CHARACTER VARYING | 1043 | 
| REAL | 700 | 
| DOUBLE PRECISION | 701 | 
| NUMERIC | 1700 | 
| MONEY | 790 | 
| DATE | 1082 | 
| TIME/TIME WITHOUT TIME ZONE | 1083 | 
| TIME WITH TIME ZONE | 1266 | 
| TIMESTAMP/TIMESTAMP WITHOUT TIME ZONE | 1114 | 
| TIMESTAMP WITH TIME ZONE | 1184 | 
| BYTEA | 17 | 
| TEXT | 25 | 
| JSON | 114 | 
| JSONB | 3082 | 
| XML | 142 | 
| UUID | 2950 | 
| POINT | 600 | 
| LSEG | 601 | 
| PATH | 602 | 
| BOX | 603 | 
| POLYGON | 604 | 
| LINE | 628 | 
| CIDR | 650 | 
| CIRCLE | 718 | 
| MACADDR | 829 | 
| INET | 869 | 
| INTERVAL | 1186 | 
| TXID_SNAPSHOT | 2970 | 
| PG_LSN | 3220 | 
| TSVECTOR | 3614 | 
| TSQUERY | 3615 |