完成数据订阅通道的配置后,您可以使用flink-dts-connector文件消费通道中的数据,用于Flink客户端消费。本文介绍如何flink-dts-connector文件的使用说明。
注意事项
- 仅支持Flink客户端使用DataStream API、Table API和SQL。
- 如您的Flink客户端使用Table API和SQL,则单次配置时仅支持消费单张表的数据,如需消费多张表的数据,您需进行多次配置独立的任务。
操作步骤
本文以IntelliJ IDEA软件(Community Edition 2020.1 Windows版本)为例,介绍如何使用flink-dts-connector文件来消费订阅通道中的数据。
参数说明
DstExample文件中的参数 | DtsTableISelectTCaseTest文件中的参数 | 说明 | 查询方式 |
---|---|---|---|
broker-url |
dts.server |
数据订阅通道的网络地址及端口号信息。
说明 如果您部署的Flink所属的ECS实例与数据订阅通道属于经典网络或同一专有网络,建议通过内网地址进行数据订阅,网络延迟最小。
|
在DTS控制台单击目标订阅实例ID,在订阅配置页面,您可以获取到订阅Topic、网络地址及端口号信息。![]() |
topic |
topic |
数据订阅通道的订阅Topic。 | |
sid |
dts.sid |
消费组ID。 | 在DTS控制台单击目标订阅实例ID,然后单击数据消费,您可以获取到消费组ID和消费组的账号信息。
说明 消费组账号的密码已在您新建消费组时指定。
![]() |
user |
dts.user |
消费组的账号。
警告 如您未使用本文提供的flink-dts-connector文件,请按照
<消费组的账号>-<消费组ID> 的格式设置用户名(例如:dtstest-dtsae******bpv ),否则无法正常连接。
|
|
password |
dts.password |
该账号的密码。 | |
checkpoint |
dts.checkpoint |
消费位点,即flink-dts-connector消费第一条数据的时间戳,格式为Unix时间戳,例如1624440043。
说明 消费位点信息可用于:
|
消费位点必须在订阅实例的数据范围(如图所示)之内,并需转化为Unix时间戳。![]() 说明 Unix时间戳转换工具可用搜索引擎获取。
|
无 | dts.cdc.table.name |
订阅对象。仅支持传入单张表,且格式为<数据库名称>.<表名称> ,例如dtstestdata.order 。
|
在DTS控制台单击目标订阅实例ID,在订阅配置页面,单击右上方的查看订阅对象,查询订阅对象所属数据库和表。 |
常见问题
报错提示 | 可能的原因 | 解决方式 |
---|---|---|
|
DTS用于读取增量数据的模块DStore发生切换,导致Flink客户端的消费位点丢失。 | 您无需重启客户端,仅需查询客户端的消费位点,并在DtsExample.java和DtsTableISelectTCaseTest.java文件中重新传入消费位点checkpoint 或dts.checkpoint ,即可重新消费订阅数据。
|