本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
完成数据订阅通道的配置后,您可以使用flink-dts-connector文件消费通道中的数据,用于Flink客户端消费。本文介绍如何使用flink-dts-connector文件。
注意事项
- 仅支持Flink客户端使用DataStream API、Table API和SQL。 
- 如您的Flink客户端使用Table API和SQL,则单次配置时仅支持消费单张表的数据,如需消费多张表的数据,您需进行多次配置独立的任务。 
操作步骤
本文以IntelliJ IDEA软件(Community Edition 2020.1 Windows版本)为例,介绍如何使用flink-dts-connector文件来消费订阅通道中的数据。
- 创建新版数据订阅通道,详情请参见订阅方案概览中的相关配置文档。
- 创建一个或多个消费组,详情请参见新增消费组。
- 下载flink-dts-connector文件并解压。 
- 运行IntelliJ IDEA工具,然后单击Open or Import。  
- 在弹出的对话框中,定位至flink-dts-connector文件所在目录,依次展开文件夹,找到项目对象模型文件:pom.xml。  
- 在弹出对话框中,选择Open as Project。
- 在pom.xml文件中添加如下依赖: - <dependency> <groupId>com.alibaba.flink</groupId> <artifactId>flink-dts-connector</artifactId> <version>1.1.1-SNAPSHOT</version> <classifier>jar-with-dependencies</classifier> </dependency>
- 在IntelliJ IDEA软件界面,依次展开文件夹,并根据您所使用的Flink Connector的程序类型,选择对应的Java文件。 - 如Flink客户端类型为DataStream API,您需双击打开flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java文件,并执行如下操作: - 在IntelliJ IDEA软件界面的顶部,单击如下图标。  
- 在弹跳框中单击。  
- 在弹跳框的Program arguments中,按如下示例输入参数及对应的值,并单击下方的Run,启动flink-dts-connector。 说明- 具体参数说明及查询方式,请参见参数说明。 - --broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
- 运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。  说明 说明- 如需查询数据变更的具体记录,您可登录Flink客户端的Task Manager界面进行查看。 
 
- 如Flink客户端类型为Table API和SQL,您需双击打开flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java文件,并执行如下操作: 说明- 单个 - DtsTableISelectTCaseTest.java文件,仅支持配置并消费单张表的订阅数据。如需配置并消费多张表中的数据,您需要重复配置,并运行多个独立任务。- 在如下位置添加前导字符 - //,注释该行代码信息。 
- 设置所需消费的单张表的信息,支持使用SQL语句。 
- 设置订阅通道参数,具体参数说明及查询方式,请参见参数说明。  
- 在IntelliJ IDEA软件界面的顶部,单击Run'DtsTableISelectTCaseTest',启动flink-dts-connector。 
- 运行结果如下图所示,该客户端可正常订阅到源库的数据变更信息。  说明 说明- 如需查询数据变更的具体记录,您可登录Flink客户端的Task Manager界面进行查看。 
 
 
参数说明
| DstExample文件中的参数 | DtsTableISelectTCaseTest文件中的参数 | 说明 | 查询方式 | 
| 
 | 
 | 数据订阅通道的网络地址及端口号信息。 说明  
 | 在DTS控制台单击目标订阅实例ID,在基本信息页面,您可以获取到订阅Topic和网络信息。 | 
| 
 | 
 | 数据订阅通道的订阅Topic。 | |
| 
 | 
 | 消费组ID。 | 在DTS控制台单击目标订阅实例ID,然后单击左侧导航栏的数据消费,您可以获取到消费组ID/名称和消费组的账号信息。 说明  消费组账号的密码已在您新建消费组时指定。 | 
| 
 | 
 | 消费组的账号。 警告  如您未使用本文提供的flink-dts-connector文件,请按照 | |
| 
 | 
 | 该账号的密码。 | |
| 
 | 
 | 消费位点,即flink-dts-connector消费第一条数据的时间戳,格式为Unix时间戳,例如1624440043。 说明  消费位点信息可用于: 
 | 消费的数据时间点必须在订阅实例的数据范围之内,并需转化为Unix时间戳。 说明  
 | 
| 无 | 
 | 订阅对象。仅支持传入单张表,且格式要求如下: 
 | 在DTS控制台单击目标订阅实例ID,在基本信息或任务管理页面的上方,单击查看订阅对象,查询订阅对象所属数据库和表。 | 
常见问题
| 报错提示 | 可能的原因 | 解决方式 | 
|  | DTS用于读取增量数据的模块DStore发生切换,导致Flink客户端的消费位点丢失。 | 您无需重启客户端,仅需查询客户端的消费位点,并在DtsExample.java和DtsTableISelectTCaseTest.java文件中重新传入消费位点 |