本文为您介绍消息队列RocketMQ连接器。

背景信息

消息队列RocketMQ是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用和高可靠的分布式消息中间件。消息队列RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐和可靠重试等特性。

RocketMQ连接器支持的信息如下。
类别详情
支持类型源表和结果表
运行模式仅支持流模式
数据格式CSV和二进制格式
特有监控指标
  • 源表
    • numRecordsIn
    • numRecordsInPerSecond
    • numBytesIn
    • numBytesInPerScond
    • currentEmitEventTimeLag
    • currentFetchEventTimeLag
    • sourceIdleTime
  • 结果表
    • numRecordsOut
    • numRecordsOutPerSecond
    • numBytesOut
    • numBytesOutPerSecond
    • currentSendTime
说明 指标的含义及如何查看监控指标,请参见查看监控指标
API种类Datastream和SQL
是否支持更新或删除结果表数据不支持更新和删除结果表数据,只支持插入数据。

特色功能

RocketMQ源表和结果表支持属性字段,具体如下。
  • 源表属性字段
    说明 仅在VVR 3.0.1及以上版本支持获取以下RocketMQ属性字段。
    字段名字段类型说明
    topicVARCHAR METADATA VIRTUALRocketMQ消息Topic。
    queue-idINT METADATA VIRTUALRocketMQ消息队列ID。
    queue-offsetBIGINT METADATA VIRTUALRocketMQ消息队列的消费位点。
    msg-idVARCHAR METADATA VIRTUALRocketMQ消息ID。
    store-timestampTIMESTAMP(3) METADATA VIRTUALRocketMQ消息存储时间。
    born-timestampTIMESTAMP(3) METADATA VIRTUALRocketMQ消息生成时间。
    keysVARCHAR METADATA VIRTUALRocketMQ消息Keys。
    tagsVARCHAR METADATA VIRTUALRocketMQ消息Tags。
  • 结果表属性字段
    说明 仅实时计算引擎VVR 4.0.0及以上版本支持以下RocketMQ属性字段。
    字段名字段类型说明
    keysVARCHAR METADATARocketMQ消息Keys。
    tagsVARCHAR METADATARocketMQ消息Tags。

前提条件

已创建了RocketMQ资源,详情请参见创建资源

使用限制

  • 消息队列RocketMQ连接器仅支持消费4.X版本的RocketMQ数据。
  • 仅Flink实时计算引擎VVR 2.0.0及以上版本支持消息队列RocketMQ连接器。
  • 在Flink实时计算引擎VVR 6.0.2以下版本,消息队列RocketMQ版源表的并发度必须小于等于RocketMQ topic的分区数,在实时计算引擎VVR 6.0.2及以上版本解除该限制。您可以提前设置大于分区数的并发度,不需要因RocketMQ的缩容而手动调整作业并发度。
  • RocketMQ连接器使用RocketMQ Pull Consumer消费,所有的子任务分担消费。

语法结构

CREATE TABLE mq_source(
  x varchar,
  y varchar,
  z varchar
) WITH (
  'connector' = 'mq',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'pullIntervalMs' = '1000',
  'accessId' = '<yourAccessId>',
  'accessKey' = '<yourAccessSecret>',
  'startMessageOffset' = '1000',
  'consumerGroup' = '<yourConsumerGroup>',
  'fieldDelimiter' = '|'
);

WITH参数

  • 通用
    参数说明数据类型是否必填默认值备注
    connectorconnector类型。String固定值为mq。
    endPointEndPoint地址String阿里云消息队列RocketMQ版接入地址支持以下两种类型:
    • VVR 3.0.1及以上版本的作业:需要使用TCP协议客户端接入点,详情请参见
      • 内网服务MQ(阿里云经典网络/VPC)接入地址:在MQ控制台目标实例详情中,选择接入点 > TCP协议客户端接入点 > 内网访问,获取对应的EndPoint。
      • 公网服务MQ接入地址:在MQ控制台目标实例详情中,选择接入点 > TCP协议 > 客户端接入点 > 公网访问,获取对应的EndPoint。
      重要 由于阿里云网络安全策略动态变化,实时计算连接公网服务MQ时可能会出现网络连接问题,推荐您使用内网服务MQ。
      • 内网服务无法跨域访问。例如,您所购买的实时计算服务的地域为华东1,但是购买的消息队列MQ服务的地域为华东2(上海),则无法访问。
      • 通过公网方式访问RocketMQ,需要配置NAT,详情请参见创建和管理公网NAT网关实例
    • VVR 3.0.1以下版本的作业:RocketMQ旧的接入点已不可用,您需要适配升级实时计算作业。
      重要 如果您已使用了VVR 3.0.1以下版本的RocketMQ连接器,则您需要将您的实时计算作业升级至VVR 3.0.1及以上版本,并将作业中EndPoint参数取值更改为新的RocketMQ接入点,旧的RocketMQ接入点存在稳定性风险或不可用的问题,详情请参见2021年11月1日,RocketMQ旧的接入点将不可用,您需要适配升级实时计算作业。
    topictopic名称。String
    accessId阿里云账号的AccessKey ID。String
    accessKey阿里云账号的AccessKey Secret。String
    tag订阅或写入的标签String
    • RocketMQ作为源表时,只能读取单个tag。
    • RocketMQ作为结果表时,支持设置多个tag,以逗号(,)进行分隔。
    nameServerSubgroupNameServer组。String
    • 内网服务(阿里云经典网络或VPC):必须配置'nameServerSubgroup' = 'nsaddr4client-internal'
    • 公网服务:无需配置nameServerSubgroup
    说明 仅VVR 2.1.1-VVR 3.0.0版本支持该参数,VVR 3.0.1及以后版本不支持该参数。
    encoding编码格式。StringUTF-8
    instanceIDRocketMQ实例ID。String
    • 如果RocketMQ实例无独立命名空间,则不可以使用instanceID参数。
    • 如果RocketMQ实例有独立命名空间,则instanceID参数必选。
  • 源表独有
    参数说明数据类型是否必填默认值备注
    consumerGroupConsumer组名。String无。
    pullIntervalMs拉取数据的时间间隔。Int单位为毫秒。
    timeZone时区。String例如,Asia或Shanghai。
    startTimeMs启动时间点。Long时间戳,单位为毫秒。
    startMessageOffset消息开始的偏移量。Int如果填写该参数,则优先以startMessageOffset的位点开始加载数据。
    lineDelimiter解析Block时,行分隔符。String\n无。
    fieldDelimiter字段分隔符。String\u0001根据MQ终端的模式,分隔符分别为:
    • 在只读模式下(默认模式),分隔符为\u0001。该模式下,分隔符不可见。
    • 在编辑模式下,分隔符为^A
    lengthCheck单行字段条数检查策略。IntNONE取值如下:
    • NONE:默认值。
      • 解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。
      • 解析出的字段数小于定义字段数时,跳过这行数据。
    • SKIP:解析出的字段数和定义字段数不同时跳过数据。
    • EXCEPTION:解析出的字段数和定义字段数不同时提示异常。
    • PAD:按从左到右顺序填充。
      • 解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。
      • 解析出的字段数小于定义字段数时,在行尾用Null填充缺少的字段。
    说明 SKIP、EXCEPTION和PAD为可选值。
    columnErrorDebug是否打开调试开关。Booleanfalse如果设置为True,则打印解析异常的Log。
  • 结果表独有
    参数说明数据类型是否必填默认值备注
    producerGroup写入的群组。String无。
    retryTimes写入的重试次数。Int10无。
    sleepTimeMs重试间隔时间。Long5000无。
    partitionField指定字段名,将该字段作为分区列。String如果modepartition,则该参数必填。

类型映射

Flink字段类型消息队列RocketMQ字段类型
VARCHARSTRING

代码示例

  • 源表示例
    • CSV格式
      假设您的一条CSV格式消息记录如下。
      1,name,male 
      2,name,female
      说明 一条RocketMQ消息可以包括零条到多条数据记录,记录之间使用\n分隔。
      Flink作业中,声明RocketMQ数据源表的DDL如下。
      CREATE TABLE mq_source(
        id varchar,
        name varchar,
        gender varchar,
        topic varchar metadata virtual
      ) WITH (
        'connector' = 'mq',
        'topic' = 'mq-test',
        'endpoint' = '<yourEndpoint>',
        'pullIntervalMs' = '1000',
        'accessId' = '<yourAccessId>',
        'accessKey' = '<yourAccessSecret>',
        'startMessageOffset' = '1000',
        'consumerGroup' = 'mq-group',
      );
    • 二进制格式
      CREATE TEMPORARY TABLE source_table (
        mess varbinary
      ) WITH (
        'connector' = 'mq',
        'endpoint' = '<yourEndpoint>',
        'pullIntervalMs' = '500',
        'accessId' = '<yourAccessId>',
        'accessKey' = '<yourAccessSecret>',
        'topic' = 'mq-test',
        'consumerGroup' = 'mq-group'
      );
      
      CREATE TEMPORARY TABLE table out_table (
        commodity varchar
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO out_table
      select 
        cast(mess as varchar)
      FROM source_table;
  • 结果表示例
    • 创建RocketMQ结果表
      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR
      ) WITH (
        'connector'='mq',
        'endpoint'='<yourEndpoint>',
        'accessId'='<yourAccessId>',
        'accessKey'='<yourAccessSecret>',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>',
        'tag'='<yourTagName>',
        'encoding'='utf-8',
        'retryTimes'='5',
        'sleepTimeMs'='500'
      );
      说明 如果您的MQ消息为二进制格式,则DDL中只能定义一个字段,且字段类型必须为VARBINARY。
    • 创建将keystags字段指定为RocketMQ消息的key和tag的结果表
      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR,
        keys VARCHAR METADATA,
        tags VARCHAR METADATA
      ) WITH (
        'connector'='mq',
        'endpoint'='<yourEndpoint>',
        'accessId'='<yourAccessId>',
        'accessKey'='<yourAccessSecret>',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>',
        'encoding'='utf-8',
        'retryTimes'='5',
        'sleepTimeMs'='500'
      );

DataStream API

重要 通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法
实时计算引擎VVR提供SourceFunction的实现类MetaQSourceFunction,用于读取RocketMQ;提供OutputFormat的实现类MetaQOutputFormat,用于写入RocketMQ。读取RocketMQ和写入RocketMQ的示例如下。
/**
 * A {@link DataStream} demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 *
 * <pre>
 * Arguments
 * mqSourceTopic: The consumer topic of the RocketMQ source.
 * mqSourceConsumerGroup: The consumer group of the RocketMQ source.
 * mqSourceEndpoint: The endpoint address of consumer topic for the RocketMQ source.
 * mqSourceAccessId: The access id of consumer topic for the RocketMQ source.
 * mqSourceAccessKey: The access key of consumer topic for the RocketMQ source.
 * mqSourceInstanceId: The instance id of consumer topic for the RocketMQ source.
 * startMessageOffset: The starting offset of message consumption for the RocketMQ source.
 * startTime: The starting time of message consumption for the RocketMQ source.
 * mqSinkTopic: The producer topic of the RocketMQ sink.
 * mqSinkProducerGroup: The producer group of the RocketMQ sink.
 * mqSinkEndpoint: The endpoint address of producer topic for the RocketMQ sink.
 * mqSinkAccessId: The access id of producer topic for the RocketMQ sink.
 * mqSinkAccessKey: The access key of producer topic for the RocketMQ sink.
 * mqSinkInstanceId: The instance id of producer topic for the RocketMQ sink.
 * </pre>
 */
public class RocketMQDataStreamDemo {

    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQDataStreamDemo.class);

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final ParameterTool parameters = ParameterTool.fromArgs(args);
        // Creates and adds RocketMQ source.
        env.addSource(createRocketMQSource(parameters))
                // Converts message body to upper case.
                .map(RocketMQDataStreamDemo::convertMessages)
                // Creates and adds RocketMQ sink.
                .addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat(parameters)))
                .name(RocketMQDataStreamDemo.class.getSimpleName());
        // Compiles and submits job.
        env.execute("RocketMQ connector end-to-end DataStream demo");
    }

    private static MetaQSourceFunction createRocketMQSource(ParameterTool parameters) {
        String sourceTopic = parameters.get("mqSourceTopic");
        String consumerGroup = parameters.get("mqSourceConsumerGroup");
        Properties mqProperties = createSourceMQProperties(parameters);
        int partitionCount = 0;
        MetaPullConsumer consumer = null;
        try {
            consumer = createConsumerInstance(sourceTopic, consumerGroup, mqProperties, null, -1);
            Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(sourceTopic);
            partitionCount = queues == null ? 0 : queues.size();
        } catch (MQClientException e) {
            LOGGER.error(
                    "Fetches RocketMQ partition count for RocketMQ source exception [{}].",
                    e.getMessage());
        } finally {
            if (consumer != null) {
                try {
                    MetaQConnect.shutdownConsumer(consumer);
                } catch (Exception ignored) {
                }
            }
        }
        return new MetaQSourceFunction(
                sourceTopic,
                consumerGroup,
                null,
                null,
                100,
                partitionCount,
                Long.MAX_VALUE,
                Long.parseLong(parameters.get("startMessageOffset")),
                Long.parseLong(parameters.get("startTime")),
                mqProperties);
    }

    private static MetaQOutputFormat createRocketMQOutputFormat(ParameterTool parameters) {
        return new MetaQOutputFormat.Builder()
                .setTopicName(parameters.get("mqSinkTopic"))
                .setProducerGroup(parameters.get("mqSinkProducerGroup"))
                .setMqProperties(createSinkMQProperties(parameters))
                .build();
    }

    private static Properties createSourceMQProperties(ParameterTool parameters) {
        Properties properties = new Properties();
        properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
        properties.put(NAMESRV_ADDR, parameters.get("mqSourceEndpoint"));
        properties.put(PROPERTY_ACCESSKEY, parameters.get("mqSourceAccessId"));
        properties.put(PROPERTY_SECRETKEY, parameters.get("mqSourceAccessKey"));
        properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
        properties.put(PROPERTY_INSTANCE_ID, parameters.get("mqSourceInstanceId"));
        return properties;
    }

    private static Properties createSinkMQProperties(ParameterTool parameters) {
        Properties properties = new Properties();
        properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
        properties.put(NAMESRV_ADDR, parameters.get("mqSinkEndpoint"));
        properties.put(PROPERTY_ACCESSKEY, parameters.get("mqSinkAccessId"));
        properties.put(PROPERTY_SECRETKEY, parameters.get("mqSinkAccessKey"));
        properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
        properties.put(PROPERTY_INSTANCE_ID, parameters.get("mqSinkInstanceId"));
        return properties;
    }

    private static List<MessageExt> convertMessages(List<MessageExt> messages) {
        messages.forEach(
                message -> message.setBody(new String(message.getBody()).toUpperCase().getBytes()));
        return messages;
    }
}
说明 RocketMQ接入点Endpoint配置详情请参见关于TCP内网接入点设置的公告

常见问题

RocketMQ Topic扩容时,RocketMQ如何感知Topic分区数变化?