本文为您介绍消息队列RocketMQ版源表DDL定义、WITH参数、类型映射、属性字段和代码示例。
说明 RocketMQ Connector可以作为Stream作业和Batch作业的源表使用。
什么是消息队列RocketMQ版
消息队列RocketMQ版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用和高可靠的分布式消息中间件。消息队列RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐和可靠重试等特性。
前提条件
已创建了RocketMQ资源,详情请参见创建资源。
使用限制
- 仅支持消费4.X版本的RocketMQ数据。
- 仅Flink计算引擎VVR 2.0.0及以上版本支持消息队列RocketMQ Connector。
- 在实时计算引擎VVR 6.0.2之前,消息队列RocketMQ版源表的并发度必须小于等于RocketMQ topic的partition数。该限制在实时计算引擎VVR 6.0.2及以后解除。用户可以提前设置大于分区数的并发度,不需要因RocketMQ的缩容而手动调整作业并发度。
DDL定义
CREATE TABLE mq_source(
x varchar,
y varchar,
z varchar
) WITH (
'connector' = 'mq',
'topic' = '<yourTopicName>',
'endpoint' = '<yourEndpoint>',
'pullIntervalMs' = '1000',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'startMessageOffset' = '1000',
'consumerGroup' = '<yourConsumerGroup>',
'fieldDelimiter' = '|'
);
说明 RocketMQ是非结构化存储格式的消息中间件,对于数据的Schema不提供强制定义,完全由业务层指定。Flink仅支持CSV和二进制格式的RocketMQ消息。
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
connector | 源表类型。 | 是 | 固定值为mq 。
|
topic | topic名称。 | 是 | 无。 |
endPoint | EndPoint地址。 | 是 | 阿里云消息队列RocketMQ版接入地址支持以下两种类型:
|
accessId | AccessKey ID。 | 是 | 无。 |
accessKey | AccessKey Secret。 | 是 | 无。 |
consumerGroup | Consumer组名。 | 是 | 无。 |
pullIntervalMs | 拉取数据的时间间隔。 | 是 | 单位为毫秒。 |
nameServerSubgroup | NameServer组。 | 否 |
说明 仅VVR 2.1.1 ~ VVR 3.0.0版本支持该参数,VVR 3.0.1及以后版本不支持该参数。
|
timeZone | 时区。 | 否 | 例如,Asia/Shanghai。 |
startTimeMs | 启动时间点。 | 否 | 无。 |
startMessageOffset | 消息开始的偏移量。 | 否 | 如果填写该参数,则优先以startMessageOffset的位点开始加载数据。 |
tag | 订阅的标签。 | 否 | MQ作为数据源时,只能读取单个tag。 |
lineDelimiter | 解析Block时,行分隔符。 | 否 | 默认值为\n 。
|
fieldDelimiter | 字段分隔符。 | 否 | 根据MQ终端的模式,分隔符分别为:
|
encoding | 编码格式。 | 否 | 默认值为utf-8 。
|
lengthCheck | 单行字段条数检查策略。 | 否 | 默认值为NONE,表示:
|
columnErrorDebug | 是否打开调试开关。 | 否 | 默认值为FALSE。如果设置为TRUE,则打印解析异常的Log。 |
instanceID | 实例ID。 | 否 | 根据RocketMQ实例是否有独立命名空间,执行如下操作:
|
类型映射
RocketMQ字段类型 | Flink字段类型 |
---|---|
STRING | VARCHAR |
属性字段
字段名 | 字段类型 | 说明 |
---|---|---|
topic | VARCHAR METADATA VIRTUAL | RocketMQ消息Topic。 |
queue-id | INT METADATA VIRTUAL | RocketMQ消息队列ID。 |
queue-offset | BIGINT METADATA VIRTUAL | RocketMQ消息队列的消费位点。 |
msg-id | VARCHAR METADATA VIRTUAL | RocketMQ消息ID。 |
store-timestamp | TIMESTAMP(3) METADATA VIRTUAL | RocketMQ消息存储时间。 |
born-timestamp | TIMESTAMP(3) METADATA VIRTUAL | RocketMQ消息生成时间。 |
keys | VARCHAR METADATA VIRTUAL | RocketMQ消息Keys。 |
tags | VARCHAR METADATA VIRTUAL | RocketMQ消息Tags。 |
说明 仅在VVR 3.0.1及以上版本支持获取以上RocketMQ属性字段。
代码示例
- CSV格式
假设您的1条CSV格式消息记录如下。
1,name,male 2,name,female
说明 1条RocketMQ消息可以包括0条到多条数据记录,记录之间使用\n
分隔。Flink作业中,声明RocketMQ数据源表的DDL如下。CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessSecret>', 'startMessageOffset' = '1000', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = '|' );
- 二进制格式
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessSecret>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE table out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;