本文为您介绍云消息队列 RocketMQ 版连接器。
鉴于云消息队列 RocketMQ 版 4.x标准版实例共享API调用弹性上限为每秒5000次,使用该版本的消息中间件在与实时计算Flink版对接时,若超过上限会触发限流机制,可能会导致Flink作业运行不稳定。因此,在选择消息中间件时,如果您正在或计划通过标准版RocketMQ与Flink对接,请您谨慎评估。如果业务场景允许,请考虑使用Kafka、日志服务(SLS)或DataHub等其他中间件进行替代。如果您确实需要使用云消息队列 RocketMQ 版 4.x标准版处理大规模的消息,也请同时通过提交工单与RocketMQ产品取得联系申请提升限速上限。
背景信息
云消息队列 RocketMQ 版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用和高可靠的分布式消息中间件。其既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐和可靠重试等特性。
RocketMQ连接器支持的信息如下。
|
类别 |
详情 |
|
支持类型 |
源表和结果表 |
|
运行模式 |
仅支持流模式 |
|
数据格式 |
CSV和二进制格式 |
|
特有监控指标 |
|
|
API种类 |
Datastream(仅支持RocketMQ 4.x)和SQL |
|
是否支持更新或删除结果表数据 |
不支持更新和删除结果表数据,只支持插入数据。 |
特色功能
RocketMQ源表和结果表支持属性字段,具体如下。
-
源表属性字段
字段名
字段类型
说明
topic
VARCHAR METADATA VIRTUAL
消息Topic。
queue-id
INT METADATA VIRTUAL
消息队列ID。
queue-offset
BIGINT METADATA VIRTUAL
消息队列的消费位点。
msg-id
VARCHAR METADATA VIRTUAL
消息ID。
store-timestamp
TIMESTAMP(3) METADATA VIRTUAL
消息存储时间。
born-timestamp
TIMESTAMP(3) METADATA VIRTUAL
消息生成时间。
keys
VARCHAR METADATA VIRTUAL
消息Keys。
tags
VARCHAR METADATA VIRTUAL
消息Tags。
-
结果表属性字段
字段名
字段类型
说明
keys
VARCHAR METADATA
消息Keys。
tags
VARCHAR METADATA
消息Tags。
前提条件
已创建了RocketMQ资源,详情请参见创建资源。
使用限制
-
仅Flink实时计算引擎VVR 8.0.3及以上版本支持5.x版本的RocketMQ。
-
RocketMQ连接器使用Pull Consumer消费,所有的子任务分担消费。
语法结构
CREATE TABLE mq_source(
x varchar,
y varchar,
z varchar
) WITH (
'connector' = 'mq5',
'topic' = '<yourTopicName>',
'endpoint' = '<yourEndpoint>',
'consumerGroup' = '<yourConsumerGroup>'
);
WITH参数
通用
|
参数 |
说明 |
数据类型 |
是否必填 |
默认值 |
备注 |
|
connector |
connector类型。 |
String |
是 |
无 |
|
|
endPoint |
EndPoint地址 |
String |
是 |
无 |
云消息队列 RocketMQ 版接入地址支持以下两种类型:
重要
由于阿里云网络安全策略动态变化,实时计算连接公网服务MQ时可能会出现网络连接问题,推荐您使用内网服务MQ。
|
|
topic |
topic名称。 |
String |
是 |
无 |
无。 |
|
accessId |
|
String |
|
无 |
重要
为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量。
|
|
accessKey |
|
String |
|
无 |
|
|
tag |
订阅或写入的标签 |
String |
否 |
无 |
说明
当作为结果表时,仅支持RocketMQ 4.x。RocketMQ 5.x请使用结果表属性字段来指定写出消息的 tag。 |
|
encoding |
编码格式。 |
String |
否 |
UTF-8 |
无。 |
|
instanceID |
RocketMQ实例ID。 |
String |
否 |
无 |
说明
仅RocketMQ 4.x支持该参数。 |
源表独有
|
参数 |
说明 |
数据类型 |
是否必填 |
默认值 |
备注 |
|
consumerGroup |
Consumer组名。 |
String |
是 |
无 |
无。 |
|
pullIntervalMs |
上游没有数据可供消费时,source的休眠时间。 |
Int |
是 |
无 |
单位为毫秒。 目前没有限流机制,无法设置读取RocketMQ的速率。 说明
仅RocketMQ 4.x支持该参数。 |
|
timeZone |
时区。 |
String |
否 |
无 |
例如,Asia/Shanghai。 |
|
startTimeMs |
启动时间点。 |
Long |
否 |
无 |
时间戳,单位为毫秒。 |
|
startMessageOffset |
消息开始的偏移量。 |
Int |
否 |
无 |
如果填写该参数,则优先以 |
|
lineDelimiter |
解析Block时,行分隔符。 |
String |
否 |
\n |
无。 |
|
fieldDelimiter |
字段分隔符。 |
String |
否 |
\u0001 |
根据MQ终端的模式,分隔符分别为:
|
|
lengthCheck |
单行字段条数检查策略。 |
Int |
否 |
NONE |
取值如下:
|
|
columnErrorDebug |
是否打开调试开关。 |
Boolean |
否 |
false |
如果设置为true,则打印解析异常的Log。 |
|
pullBatchSize |
每次拉取消息的最大数量。 |
Int |
否 |
64 |
仅实时计算引擎VVR 8.0.7及以上版本支持该参数。 |
结果表独有
|
参数 |
说明 |
数据类型 |
是否必填 |
默认值 |
备注 |
|
producerGroup |
写入的群组。 |
String |
是 |
无 |
无。 |
|
retryTimes |
写入的重试次数。 |
Int |
否 |
10 |
无。 |
|
sleepTimeMs |
重试间隔时间。 |
Long |
否 |
5000 |
无。 |
|
partitionField |
指定字段名,将该字段作为分区列。 |
String |
否 |
无 |
如果 说明
仅实时计算引擎VVR 8.0.5及以上版本支持该参数。 |
|
deliveryTimestampMode |
指定延迟消息的模式,该模式与 |
String |
否 |
无 |
取值如下:
说明
仅实时计算引擎VVR 11.1及以上版本支持该参数。 |
|
deliveryTimestampType |
指定延迟消息的时间基准类型。 |
String |
否 |
processing_time |
取值如下:
说明
仅实时计算引擎VVR 11.1及以上版本支持该参数。 |
|
deliveryTimestampValue |
延迟消息的投递时间。 |
Long |
否 |
无 |
根据
说明
仅实时计算引擎VVR 11.1及以上版本支持该参数。 |
|
deliveryTimestampField |
指定用作延迟消息投递时间的字段。字段类型必须为 |
String |
否 |
无 |
说明
仅实时计算引擎VVR 11.1及以上版本支持该参数。 |
类型映射
|
Flink字段类型 |
云消息队列RocketMQ字段类型 |
|
BOOLEAN |
STRING |
|
VARBINARY |
|
|
VARCHAR |
|
|
TINYINT |
|
|
INTEGER |
|
|
BIGINT |
|
|
FLOAT |
|
|
DOUBLE |
|
|
DECIMAL |
代码示例
源表示例
-
CSV格式
假设您的一条CSV格式消息记录如下。
1,name,male 2,name,female说明一条RocketMQ消息可以包括零条到多条数据记录,记录之间使用
\n分隔。Flink作业中,声明RocketMQ数据源表的DDL如下。
-
RocketMQ 5.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq5', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );-
RocketMQ 4.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' ); -
-
二进制格式
-
RocketMQ 5.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq5', 'endpoint' = '<yourEndpoint>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table; -
RocketMQ 4.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
-
结果表示例
-
创建结果表
-
RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' ); -
RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );说明如果您的MQ消息为二进制格式,则DDL中只能定义一个字段,且字段类型必须为VARBINARY。
-
-
创建将
keys和tags字段指定为RocketMQ消息的key和tag的结果表-
RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' ); -
RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
-
DataStream API
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法。
实时计算引擎VVR提供MetaQSource,用于读取RocketMQ;提供OutputFormat的实现类MetaQOutputFormat,用于写入RocketMQ。读取RocketMQ和写入RocketMQ的示例如下:
RocketMQ 5.x
在RocketMQ 5.x 中,访问密钥对应的是实例中配置的用户名和密码。若通过内网访问RocketMQ实例,且实例未开启 ACL 认证,则可不填写AK/SK参数。
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.mq5.shaded.org.apache.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.mq5.sink.RocketMQOutputFormat;
import com.alibaba.ververica.connectors.mq5.source.RocketMQSource;
import com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
/**
* A demo that illustrates how to consume messages from RocketMQ, convert
* messages, then produce messages to RocketMQ.
*/
public class RocketMQ5DataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
Configuration conf = new Configuration();
// 以下两个配置仅本地调试时使用,需要在作业打包上传到阿里云实时计算Flink版之前删除
conf.setString("pipeline.classpaths", "file://" + "uber jar绝对路径");
conf.setString(
"classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
final DataStreamSource<String> ds =
env.fromSource(
RocketMQSource.<String>builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopic(SOURCE_TOPIC)
.setConsumerGroup(CONSUMER_GROUP)
.setDeserializationSchema(new MyDeserializer())
.setStartOffset(1)
.build(),
WatermarkStrategy.noWatermarks(),
"source");
ds.map(new ToMessage())
.addSink(
new OutputFormatSinkFunction<>(
new RocketMQOutputFormat.Builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.build()));
env.execute();
}
private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
@Override
public void deserialize(List<MessageExt> record, Collector<String> out) {
for (MessageExt messageExt : record) {
out.collect(new String(messageExt.getBody()));
}
}
@Override
public TypeInformation<String> getProducedType() {
return Types.STRING;
}
}
private static class ToMessage implements MapFunction<String, List<MessageExt>> {
public ToMessage() {
}
@Override
public List<MessageExt> map(String s) {
final MessageExt message = new MessageExt();
message.setBody(s.getBytes());
message.setWaitStoreMsgOK(true);
return Collections.singletonList(message);
}
}
}
RocketMQ 4.x
import com.alibaba.ververica.connector.mq.shaded.com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.metaq.sink.MetaQOutputFormat;
import com.alibaba.ververica.connectors.metaq.source.MetaQSource;
import com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import static com.alibaba.ververica.connector.mq.shaded.com.taobao.metaq.client.ExternConst.*;
/**
* A demo that illustrates how to consume messages from RocketMQ, convert
* messages, then produce messages to RocketMQ.
*/
public class RocketMQDataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String INSTANCE_ID = "<instanceID>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
Configuration conf = new Configuration();
// 以下两个配置仅本地调试时使用,需要在作业打包上传到阿里云实时计算Flink版之前删除
conf.setString("pipeline.classpaths", "file://" + "uber jar绝对路径");
conf.setString("classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// Creates and adds RocketMQ source.
env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
// Converts message body to upper case.
.map(RocketMQDataStreamDemo2::convertMessages)
// Creates and adds RocketMQ sink.
.addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
.name(RocketMQDataStreamDemo2.class.getSimpleName());
// Compiles and submits job.
env.execute("RocketMQ connector end-to-end DataStream demo");
}
private static MetaQSource<MessageExt> createRocketMQSource() {
Properties mqProperties = createMQProperties();
return new MetaQSource<>(SOURCE_TOPIC,
CONSUMER_GROUP,
null, // always null
null, // tag of the messages to consumer
Long.MAX_VALUE, // stop timestamp in milliseconds
-1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
0, // Start offset.
300_000, // Partition discover interval.
mqProperties,
Boundedness.CONTINUOUS_UNBOUNDED,
new MyDeserializationSchema());
}
private static MetaQOutputFormat createRocketMQOutputFormat() {
return new MetaQOutputFormat.Builder()
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.setMqProperties(createMQProperties())
.build();
}
private static Properties createMQProperties() {
Properties properties = new Properties();
properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
properties.put(NAMESRV_ADDR, ENDPOINT);
properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
return properties;
}
private static List<MessageExt> convertMessages(MessageExt messages) {
return Collections.singletonList(messages);
}
public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
@Override
public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
for (MessageExt messageExt : list) {
collector.collect(messageExt);
}
}
@Override
public TypeInformation<MessageExt> getProducedType() {
return TypeInformation.of(MessageExt.class);
}
}
}
}
}
XML
-
MQ 4.x:MQ DataStream连接器。
-
MQ 5.x:MQ DataStream连接器。
<!--MQ5-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq5</artifactId>
<version>${vvr-version}</version>
<scope>provided</scope>
</dependency>
<!--MQ4-->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq</artifactId>
<version>${vvr-version}</version>
</dependency>
RocketMQ接入点Endpoint配置详情请参见关于4.x系列实例的非Region化接入点下线及升级公告。