本文为您介绍消息队列RocketMQ连接器。
背景信息
消息队列RocketMQ是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用和高可靠的分布式消息中间件。消息队列RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐和可靠重试等特性。
RocketMQ连接器支持的信息如下。
类别 | 详情 |
---|---|
支持类型 | 源表和结果表 |
运行模式 | 仅支持流模式 |
数据格式 | CSV和二进制格式 |
特有监控指标 |
说明 指标的含义及如何查看监控指标,请参见查看监控指标。 |
API种类 | Datastream和SQL |
是否支持更新或删除结果表数据 | 不支持更新和删除结果表数据,只支持插入数据。 |
特色功能
RocketMQ源表和结果表支持属性字段,具体如下。
- 源表属性字段说明 仅在VVR 3.0.1及以上版本支持获取以下RocketMQ属性字段。
字段名 字段类型 说明 topic VARCHAR METADATA VIRTUAL RocketMQ消息Topic。 queue-id INT METADATA VIRTUAL RocketMQ消息队列ID。 queue-offset BIGINT METADATA VIRTUAL RocketMQ消息队列的消费位点。 msg-id VARCHAR METADATA VIRTUAL RocketMQ消息ID。 store-timestamp TIMESTAMP(3) METADATA VIRTUAL RocketMQ消息存储时间。 born-timestamp TIMESTAMP(3) METADATA VIRTUAL RocketMQ消息生成时间。 keys VARCHAR METADATA VIRTUAL RocketMQ消息Keys。 tags VARCHAR METADATA VIRTUAL RocketMQ消息Tags。 - 结果表属性字段说明 仅实时计算引擎VVR 4.0.0及以上版本支持以下RocketMQ属性字段。
字段名 字段类型 说明 keys VARCHAR METADATA RocketMQ消息Keys。 tags VARCHAR METADATA RocketMQ消息Tags。
前提条件
已创建了RocketMQ资源,详情请参见创建资源。
使用限制
- 消息队列RocketMQ连接器仅支持消费4.X版本的RocketMQ数据。
- 仅Flink实时计算引擎VVR 2.0.0及以上版本支持消息队列RocketMQ连接器。
- 在Flink实时计算引擎VVR 6.0.2以下版本,消息队列RocketMQ版源表的并发度必须小于等于RocketMQ topic的分区数,在实时计算引擎VVR 6.0.2及以上版本解除该限制。您可以提前设置大于分区数的并发度,不需要因RocketMQ的缩容而手动调整作业并发度。
- RocketMQ连接器使用RocketMQ Pull Consumer消费,所有的子任务分担消费。
语法结构
CREATE TABLE mq_source(
x varchar,
y varchar,
z varchar
) WITH (
'connector' = 'mq',
'topic' = '<yourTopicName>',
'endpoint' = '<yourEndpoint>',
'pullIntervalMs' = '1000',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessSecret>',
'startMessageOffset' = '1000',
'consumerGroup' = '<yourConsumerGroup>',
'fieldDelimiter' = '|'
);
WITH参数
- 通用
参数 说明 数据类型 是否必填 默认值 备注 connector connector类型。 String 是 无 固定值为mq。 endPoint EndPoint地址 String 是 无 阿里云消息队列RocketMQ版接入地址支持以下两种类型: - VVR 3.0.1及以上版本的作业:需要使用TCP协议客户端接入点,详情请参见
- 内网服务MQ(阿里云经典网络/VPC)接入地址:在MQ控制台目标实例详情中,选择 ,获取对应的EndPoint。
- 公网服务MQ接入地址:在MQ控制台目标实例详情中,选择 ,获取对应的EndPoint。
重要 由于阿里云网络安全策略动态变化,实时计算连接公网服务MQ时可能会出现网络连接问题,推荐您使用内网服务MQ。- 内网服务无法跨域访问。例如,您所购买的实时计算服务的地域为华东1,但是购买的消息队列MQ服务的地域为华东2(上海),则无法访问。
- 通过公网方式访问RocketMQ,需要配置NAT,详情请参见创建和管理公网NAT网关实例。
- VVR 3.0.1以下版本的作业:RocketMQ旧的接入点已不可用,您需要适配升级实时计算作业。重要 如果您已使用了VVR 3.0.1以下版本的RocketMQ连接器,则您需要将您的实时计算作业升级至VVR 3.0.1及以上版本,并将作业中EndPoint参数取值更改为新的RocketMQ接入点,旧的RocketMQ接入点存在稳定性风险或不可用的问题,详情请参见2021年11月1日,RocketMQ旧的接入点将不可用,您需要适配升级实时计算作业。
topic topic名称。 String 是 无 无 accessId 阿里云账号的AccessKey ID。 String 是 无 无 accessKey 阿里云账号的AccessKey Secret。 String 是 无 无 tag 订阅或写入的标签 String 否 无 - RocketMQ作为源表时,只能读取单个tag。
- RocketMQ作为结果表时,支持设置多个tag,以逗号(,)进行分隔。
nameServerSubgroup NameServer组。 String 否 无 - 内网服务(阿里云经典网络或VPC):必须配置
'nameServerSubgroup' = 'nsaddr4client-internal'
。 - 公网服务:无需配置
nameServerSubgroup
。
说明 仅VVR 2.1.1-VVR 3.0.0版本支持该参数,VVR 3.0.1及以后版本不支持该参数。encoding 编码格式。 String 否 UTF-8 无 instanceID RocketMQ实例ID。 String 否 无 - 如果RocketMQ实例无独立命名空间,则不可以使用instanceID参数。
- 如果RocketMQ实例有独立命名空间,则instanceID参数必选。
- VVR 3.0.1及以上版本的作业:需要使用TCP协议客户端接入点,详情请参见
- 源表独有
参数 说明 数据类型 是否必填 默认值 备注 consumerGroup Consumer组名。 String 是 无 无。 pullIntervalMs 拉取数据的时间间隔。 Int 是 无 单位为毫秒。 timeZone 时区。 String 否 无 例如,Asia或Shanghai。 startTimeMs 启动时间点。 Long 否 无 时间戳,单位为毫秒。 startMessageOffset 消息开始的偏移量。 Int 否 无 如果填写该参数,则优先以 startMessageOffset
的位点开始加载数据。lineDelimiter 解析Block时,行分隔符。 String 否 \n 无。 fieldDelimiter 字段分隔符。 String 否 \u0001 根据MQ终端的模式,分隔符分别为: - 在只读模式下(默认模式),分隔符为
\u0001
。该模式下,分隔符不可见。 - 在编辑模式下,分隔符为
^A
。
lengthCheck 单行字段条数检查策略。 Int 否 NONE 取值如下: - NONE:默认值。
- 解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。
- 解析出的字段数小于定义字段数时,跳过这行数据。
- SKIP:解析出的字段数和定义字段数不同时跳过数据。
- EXCEPTION:解析出的字段数和定义字段数不同时提示异常。
- PAD:按从左到右顺序填充。
- 解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。
- 解析出的字段数小于定义字段数时,在行尾用Null填充缺少的字段。
说明 SKIP、EXCEPTION和PAD为可选值。columnErrorDebug 是否打开调试开关。 Boolean 否 false 如果设置为True,则打印解析异常的Log。 - 在只读模式下(默认模式),分隔符为
- 结果表独有
参数 说明 数据类型 是否必填 默认值 备注 producerGroup 写入的群组。 String 是 无 无。 retryTimes 写入的重试次数。 Int 否 10 无。 sleepTimeMs 重试间隔时间。 Long 否 5000 无。 partitionField 指定字段名,将该字段作为分区列。 String 否 无 如果 mode
为partition
,则该参数必填。
类型映射
Flink字段类型 | 消息队列RocketMQ字段类型 |
---|---|
VARCHAR | STRING |
代码示例
- 源表示例
- CSV格式假设您的一条CSV格式消息记录如下。
1,name,male 2,name,female
说明 一条RocketMQ消息可以包括零条到多条数据记录,记录之间使用\n
分隔。Flink作业中,声明RocketMQ数据源表的DDL如下。CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessSecret>', 'startMessageOffset' = '1000', 'consumerGroup' = 'mq-group', );
- 二进制格式
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessSecret>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE table out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
- CSV格式
- 结果表示例
- 创建RocketMQ结果表
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='<yourAccessId>', 'accessKey'='<yourAccessSecret>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>', 'tag'='<yourTagName>', 'encoding'='utf-8', 'retryTimes'='5', 'sleepTimeMs'='500' );
说明 如果您的MQ消息为二进制格式,则DDL中只能定义一个字段,且字段类型必须为VARBINARY。 - 创建将
keys
和tags
字段指定为RocketMQ消息的key和tag的结果表CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='<yourAccessId>', 'accessKey'='<yourAccessSecret>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>', 'encoding'='utf-8', 'retryTimes'='5', 'sleepTimeMs'='500' );
- 创建RocketMQ结果表
DataStream API
重要 通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法。
实时计算引擎VVR提供SourceFunction的实现类MetaQSourceFunction,用于读取RocketMQ;提供OutputFormat的实现类MetaQOutputFormat,用于写入RocketMQ。读取RocketMQ和写入RocketMQ的示例如下。
/**
* A {@link DataStream} demo that illustrates how to consume messages from RocketMQ, convert
* messages, then produce messages to RocketMQ.
*
* <pre>
* Arguments
* mqSourceTopic: The consumer topic of the RocketMQ source.
* mqSourceConsumerGroup: The consumer group of the RocketMQ source.
* mqSourceEndpoint: The endpoint address of consumer topic for the RocketMQ source.
* mqSourceAccessId: The access id of consumer topic for the RocketMQ source.
* mqSourceAccessKey: The access key of consumer topic for the RocketMQ source.
* mqSourceInstanceId: The instance id of consumer topic for the RocketMQ source.
* startMessageOffset: The starting offset of message consumption for the RocketMQ source.
* startTime: The starting time of message consumption for the RocketMQ source.
* mqSinkTopic: The producer topic of the RocketMQ sink.
* mqSinkProducerGroup: The producer group of the RocketMQ sink.
* mqSinkEndpoint: The endpoint address of producer topic for the RocketMQ sink.
* mqSinkAccessId: The access id of producer topic for the RocketMQ sink.
* mqSinkAccessKey: The access key of producer topic for the RocketMQ sink.
* mqSinkInstanceId: The instance id of producer topic for the RocketMQ sink.
* </pre>
*/
public class RocketMQDataStreamDemo {
private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQDataStreamDemo.class);
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final ParameterTool parameters = ParameterTool.fromArgs(args);
// Creates and adds RocketMQ source.
env.addSource(createRocketMQSource(parameters))
// Converts message body to upper case.
.map(RocketMQDataStreamDemo::convertMessages)
// Creates and adds RocketMQ sink.
.addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat(parameters)))
.name(RocketMQDataStreamDemo.class.getSimpleName());
// Compiles and submits job.
env.execute("RocketMQ connector end-to-end DataStream demo");
}
private static MetaQSourceFunction createRocketMQSource(ParameterTool parameters) {
String sourceTopic = parameters.get("mqSourceTopic");
String consumerGroup = parameters.get("mqSourceConsumerGroup");
Properties mqProperties = createSourceMQProperties(parameters);
int partitionCount = 0;
MetaPullConsumer consumer = null;
try {
consumer = createConsumerInstance(sourceTopic, consumerGroup, mqProperties, null, -1);
Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(sourceTopic);
partitionCount = queues == null ? 0 : queues.size();
} catch (MQClientException e) {
LOGGER.error(
"Fetches RocketMQ partition count for RocketMQ source exception [{}].",
e.getMessage());
} finally {
if (consumer != null) {
try {
MetaQConnect.shutdownConsumer(consumer);
} catch (Exception ignored) {
}
}
}
return new MetaQSourceFunction(
sourceTopic,
consumerGroup,
null,
null,
100,
partitionCount,
Long.MAX_VALUE,
Long.parseLong(parameters.get("startMessageOffset")),
Long.parseLong(parameters.get("startTime")),
mqProperties);
}
private static MetaQOutputFormat createRocketMQOutputFormat(ParameterTool parameters) {
return new MetaQOutputFormat.Builder()
.setTopicName(parameters.get("mqSinkTopic"))
.setProducerGroup(parameters.get("mqSinkProducerGroup"))
.setMqProperties(createSinkMQProperties(parameters))
.build();
}
private static Properties createSourceMQProperties(ParameterTool parameters) {
Properties properties = new Properties();
properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
properties.put(NAMESRV_ADDR, parameters.get("mqSourceEndpoint"));
properties.put(PROPERTY_ACCESSKEY, parameters.get("mqSourceAccessId"));
properties.put(PROPERTY_SECRETKEY, parameters.get("mqSourceAccessKey"));
properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
properties.put(PROPERTY_INSTANCE_ID, parameters.get("mqSourceInstanceId"));
return properties;
}
private static Properties createSinkMQProperties(ParameterTool parameters) {
Properties properties = new Properties();
properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
properties.put(NAMESRV_ADDR, parameters.get("mqSinkEndpoint"));
properties.put(PROPERTY_ACCESSKEY, parameters.get("mqSinkAccessId"));
properties.put(PROPERTY_SECRETKEY, parameters.get("mqSinkAccessKey"));
properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
properties.put(PROPERTY_INSTANCE_ID, parameters.get("mqSinkInstanceId"));
return properties;
}
private static List<MessageExt> convertMessages(List<MessageExt> messages) {
messages.forEach(
message -> message.setBody(new String(message.getBody()).toUpperCase().getBytes()));
return messages;
}
}
说明 RocketMQ接入点Endpoint配置详情请参见关于TCP内网接入点设置的公告。