本文为您介绍云消息队列 RocketMQ 版连接器。
鉴于云消息队列 RocketMQ 版 4.x标准版实例共享API调用弹性上限为每秒5000次,使用该版本的消息中间件在与实时计算Flink版对接时,若超过上限会触发限流机制,可能会导致Flink作业运行不稳定。因此,在选择消息中间件时,如果您正在或计划通过标准版RocketMQ与Flink对接,请您谨慎评估。如果业务场景允许,请考虑使用Kafka、日志服务(SLS)或DataHub等其他中间件进行替代。如果您确实需要使用云消息队列 RocketMQ 版 4.x标准版处理大规模的消息,也请同时通过提交工单与RocketMQ产品取得联系申请提升限速上限。
背景信息
云消息队列 RocketMQ 版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用和高可靠的分布式消息中间件。其既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐和可靠重试等特性。
RocketMQ连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表和结果表 |
运行模式 | 仅支持流模式 |
数据格式 | CSV和二进制格式 |
特有监控指标 | |
API种类 | Datastream(仅支持RocketMQ 4.x)和SQL |
是否支持更新或删除结果表数据 | 不支持更新和删除结果表数据,只支持插入数据。 |
特色功能
RocketMQ源表和结果表支持属性字段,具体如下。
源表属性字段
字段名
字段类型
说明
topic
VARCHAR METADATA VIRTUAL
消息Topic。
queue-id
INT METADATA VIRTUAL
消息队列ID。
queue-offset
BIGINT METADATA VIRTUAL
消息队列的消费位点。
msg-id
VARCHAR METADATA VIRTUAL
消息ID。
store-timestamp
TIMESTAMP(3) METADATA VIRTUAL
消息存储时间。
born-timestamp
TIMESTAMP(3) METADATA VIRTUAL
消息生成时间。
keys
VARCHAR METADATA VIRTUAL
消息Keys。
tags
VARCHAR METADATA VIRTUAL
消息Tags。
结果表属性字段
字段名
字段类型
说明
keys
VARCHAR METADATA
消息Keys。
tags
VARCHAR METADATA
消息Tags。
前提条件
已创建了RocketMQ资源,详情请参见创建资源。
使用限制
仅Flink实时计算引擎VVR 8.0.3及以上版本支持5.x版本的RocketMQ。
RocketMQ连接器使用Pull Consumer消费,所有的子任务分担消费。
语法结构
CREATE TABLE mq_source(
x varchar,
y varchar,
z varchar
) WITH (
'connector' = 'mq5',
'topic' = '<yourTopicName>',
'endpoint' = '<yourEndpoint>',
'consumerGroup' = '<yourConsumerGroup>'
);
WITH参数
通用
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | connector类型。 | String | 是 | 无 |
|
endPoint | EndPoint地址 | String | 是 | 无 | 云消息队列 RocketMQ 版接入地址支持以下两种类型:
重要 由于阿里云网络安全策略动态变化,实时计算连接公网服务MQ时可能会出现网络连接问题,推荐您使用内网服务MQ。
|
topic | topic名称。 | String | 是 | 无 | 无。 |
accessId |
| String |
| 无 |
重要 为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量。
|
accessKey |
| String |
| 无 | |
tag | 订阅或写入的标签 | String | 否 | 无 |
说明 当作为结果表时,仅支持RocketMQ 4.x。RocketMQ 5.x请使用结果表属性字段来指定写出消息的 tag。 |
encoding | 编码格式。 | String | 否 | UTF-8 | 无。 |
instanceID | RocketMQ实例ID。 | String | 否 | 无 |
说明 仅RocketMQ 4.x支持该参数。 |
源表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
consumerGroup | Consumer组名。 | String | 是 | 无 | 无。 |
pullIntervalMs | 上游没有数据可供消费时,source的休眠时间。 | Int | 是 | 无 | 单位为毫秒。 目前没有限流机制,无法设置读取RocketMQ的速率。 说明 仅RocketMQ 4.x支持该参数。 |
timeZone | 时区。 | String | 否 | 无 | 例如,Asia/Shanghai。 |
startTimeMs | 启动时间点。 | Long | 否 | 无 | 时间戳,单位为毫秒。 |
startMessageOffset | 消息开始的偏移量。 | Int | 否 | 无 | 如果填写该参数,则优先以 |
lineDelimiter | 解析Block时,行分隔符。 | String | 否 | \n | 无。 |
fieldDelimiter | 字段分隔符。 | String | 否 | \u0001 | 根据MQ终端的模式,分隔符分别为:
|
lengthCheck | 单行字段条数检查策略。 | Int | 否 | NONE | 取值如下:
|
columnErrorDebug | 是否打开调试开关。 | Boolean | 否 | false | 如果设置为true,则打印解析异常的Log。 |
pullBatchSize | 每次拉取消息的最大数量。 | Int | 否 | 64 | 仅实时计算引擎VVR 8.0.7及以上版本支持该参数。 |
结果表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
producerGroup | 写入的群组。 | String | 是 | 无 | 无。 |
retryTimes | 写入的重试次数。 | Int | 否 | 10 | 无。 |
sleepTimeMs | 重试间隔时间。 | Long | 否 | 5000 | 无。 |
partitionField | 指定字段名,将该字段作为分区列。 | String | 否 | 无 | 如果 说明 仅实时计算引擎VVR 8.0.5及以上版本支持该参数。 |
deliveryTimestampMode | 指定延迟消息的模式,该模式与 | String | 否 | 无 | 取值如下:
说明 仅实时计算引擎VVR 11.1及以上版本支持该参数。 |
deliveryTimestampType | 指定延迟消息的时间基准类型。 | String | 否 | processing_time | 取值如下:
说明 仅实时计算引擎VVR 11.1及以上版本支持该参数。 |
deliveryTimestampValue | 延迟消息的投递时间。 | Long | 否 | 无 | 根据
说明 仅实时计算引擎VVR 11.1及以上版本支持该参数。 |
deliveryTimestampField | 指定用作延迟消息投递时间的字段。字段类型必须为 | String | 否 | 无 |
说明 仅实时计算引擎VVR 11.1及以上版本支持该参数。 |
类型映射
Flink字段类型 | 云消息队列RocketMQ字段类型 |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
代码示例
源表示例
CSV格式
假设您的一条CSV格式消息记录如下。
1,name,male 2,name,female
说明一条RocketMQ消息可以包括零条到多条数据记录,记录之间使用
\n
分隔。Flink作业中,声明RocketMQ数据源表的DDL如下。
RocketMQ 5.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq5', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );
RocketMQ 4.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );
二进制格式
RocketMQ 5.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq5', 'endpoint' = '<yourEndpoint>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
RocketMQ 4.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
结果表示例
创建结果表
RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
说明如果您的MQ消息为二进制格式,则DDL中只能定义一个字段,且字段类型必须为VARBINARY。
创建将
keys
和tags
字段指定为RocketMQ消息的key和tag的结果表RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );