云消息队列 RocketMQ 版

本文为您介绍云消息队列 RocketMQ 版连接器。

重要

鉴于云消息队列 RocketMQ 版 4.x标准版实例共享API调用弹性上限为每秒5000次,使用该版本的消息中间件在与实时计算Flink版对接时,若超过上限会触发限流机制,可能会导致Flink作业运行不稳定。因此,在选择消息中间件时,如果您正在或计划通过标准版RocketMQ与Flink对接,请您谨慎评估。如果业务场景允许,请考虑使用Kafka、日志服务(SLS)或DataHub等其他中间件进行替代。如果您确实需要使用云消息队列 RocketMQ 版 4.x标准版处理大规模的消息,也请同时通过提交工单与RocketMQ产品取得联系申请提升限速上限。

背景信息

云消息队列 RocketMQ 版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用和高可靠的分布式消息中间件。其既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐和可靠重试等特性。

RocketMQ连接器支持的信息如下。

类别

详情

支持类型

源表和结果表

运行模式

仅支持流模式

数据格式

CSV和二进制格式

特有监控指标

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

  • 结果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

说明

指标含义详情,请参见监控指标说明

API种类

Datastream(仅支持RocketMQ 4.x)和SQL

是否支持更新或删除结果表数据

不支持删除结果表数据,只支持插入和更新数据。

特色功能

RocketMQ源表和结果表支持属性字段,具体如下。

  • 源表属性字段

    说明

    仅在VVR 3.0.1及以上版本支持获取以下RocketMQ属性字段。

    字段名

    字段类型

    说明

    topic

    VARCHAR METADATA VIRTUAL

    消息Topic。

    queue-id

    INT METADATA VIRTUAL

    消息队列ID。

    queue-offset

    BIGINT METADATA VIRTUAL

    消息队列的消费位点。

    msg-id

    VARCHAR METADATA VIRTUAL

    消息ID。

    store-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    消息存储时间。

    born-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    消息生成时间。

    keys

    VARCHAR METADATA VIRTUAL

    消息Keys。

    tags

    VARCHAR METADATA VIRTUAL

    消息Tags。

  • 结果表属性字段

    说明

    仅实时计算引擎VVR 4.0.0及以上版本支持以下RocketMQ属性字段。

    字段名

    字段类型

    说明

    keys

    VARCHAR METADATA

    消息Keys。

    tags

    VARCHAR METADATA

    消息Tags。

前提条件

已创建了RocketMQ资源,详情请参见创建资源

使用限制

  • 仅Flink实时计算引擎VVR 2.0.0及以上版本支持RocketMQ连接器。

  • 仅Flink实时计算引擎VVR 8.0.3及以上版本支持5.x版本的RocketMQ。

  • 在Flink实时计算引擎VVR 6.0.2以下版本,源表的并发度必须小于等于RocketMQ topic的分区数,在实时计算引擎VVR 6.0.2及以上版本解除该限制。您可以提前设置大于分区数的并发度,不需要因RocketMQ的缩容而手动调整作业并发度。

  • RocketMQ连接器使用Pull Consumer消费,所有的子任务分担消费。

语法结构

CREATE TABLE mq_source(
  x varchar,
  y varchar,
  z varchar
) WITH (
  'connector' = 'mq5',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'consumerGroup' = '<yourConsumerGroup>'
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    connector类型。

    String

    • RocketMQ 4.x固定值为mq

    • RocketMQ 5.x固定值为mq5

    endPoint

    EndPoint地址

    String

    云消息队列 RocketMQ 版接入地址支持以下两种类型:

    • VVR 3.0.1及以上版本的作业:需要使用TCP协议客户端接入点,详情请参见

      • 内网服务MQ(阿里云经典网络/VPC)接入地址:在MQ控制台目标实例详情中,选择接入点 > TCP协议客户端接入点 > 内网访问,获取对应的EndPoint。

      • 公网服务MQ接入地址:在MQ控制台目标实例详情中,选择接入点 > TCP协议 > 客户端接入点 > 公网访问,获取对应的EndPoint。

      重要

      由于阿里云网络安全策略动态变化,实时计算连接公网服务MQ时可能会出现网络连接问题,推荐您使用内网服务MQ。

      • 内网服务无法跨域访问。例如,您所购买的实时计算服务的地域为华东1,但是购买的RocketMQ服务的地域为华东2(上海),则无法访问。

      • 通过公网方式访问RocketMQ,需要配置NAT,详情请参见创建和管理公网NAT网关实例

    • VVR 3.0.1以下版本的作业:RocketMQ旧的接入点已不可用,您需要适配升级实时计算作业。

      重要

      如果您已使用了VVR 3.0.1以下版本的RocketMQ连接器,则您需要将您的实时计算作业升级至VVR 3.0.1及以上版本,并将作业中EndPoint参数取值更改为新的RocketMQ接入点,旧的接入点存在稳定性风险或不可用的问题,详情请参见实时计算Flink版产品公告

    topic

    topic名称。

    String

    无。

    accessId

    • 4.x:阿里云账号的AccessKey ID。

    • 5.x:

      RocketMQ实例用户名

    String

    • RocketMQ 4.x:是

    • RocketMQ 5.x:否

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量管理

    • RocketMQ 5.x:如果是使用公网接入点访问,需配置为RocketMQ控制台实例用户名。如果是在阿里云ECS内网访问,无需填写该配置。

    accessKey

    • 4.x: 阿里云账号的AccessKey Secret。

    • 5.x:实例密码

    String

    • RocketMQ 4.x:是

    • RocketMQ 5.x:否

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量管理

    • RocketMQ 5.x:如果是使用公网接入点访问,需配置为RocketMQ控制台实例密码。如果是在阿里云ECS内网访问,无需填写该配置。

    tag

    订阅或写入的标签

    String

    • RocketMQ作为源表时,只能读取单个tag。

    • RocketMQ作为结果表时,支持设置多个tag,以逗号(,)进行分隔。

    说明

    当作为结果表时,仅支持RocketMQ 4.x。RocketMQ 5.x请使用结果表属性字段来指定写出消息的 tag。

    nameServerSubgroup

    NameServer组。

    String

    • 内网服务(阿里云经典网络或VPC):必须配置'nameServerSubgroup' = 'nsaddr4client-internal'

    • 公网服务:无需配置nameServerSubgroup

    说明

    仅VVR 2.1.1-VVR 3.0.0版本支持该参数,VVR 3.0.1及以后版本不支持该参数。

    encoding

    编码格式。

    String

    UTF-8

    无。

    instanceID

    RocketMQ实例ID。

    String

    • 如果RocketMQ实例无独立命名空间,则不可以使用instanceID参数。

    • 如果RocketMQ实例有独立命名空间,则instanceID参数必选。

    说明

    仅RocketMQ 4.x支持该参数,RocketMQ 5.x不需要配置该参数。

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    consumerGroup

    Consumer组名。

    String

    无。

    pullIntervalMs

    上游没有数据可供消费时,source的休眠时间。

    Int

    单位为毫秒。

    目前没有限流机制,无法设置读取RocketMQ的速率。

    说明

    仅RocketMQ 4.x支持该参数,RocketMQ 5.x不需要配置该参数。

    timeZone

    时区。

    String

    例如,Asia/Shanghai。

    startTimeMs

    启动时间点。

    Long

    时间戳,单位为毫秒。

    startMessageOffset

    消息开始的偏移量。

    Int

    如果填写该参数,则优先以startMessageOffset的位点开始加载数据。

    lineDelimiter

    解析Block时,行分隔符。

    String

    \n

    无。

    fieldDelimiter

    字段分隔符。

    String

    \u0001

    根据MQ终端的模式,分隔符分别为:

    • 在只读模式下(默认模式),分隔符为\u0001。该模式下,分隔符不可见。

    • 在编辑模式下,分隔符为^A

    lengthCheck

    单行字段条数检查策略。

    Int

    NONE

    取值如下:

    • NONE:默认值。

      • 解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。

      • 解析出的字段数小于定义字段数时,跳过这行数据。

    • SKIP:解析出的字段数和定义字段数不同时跳过数据。

    • EXCEPTION:解析出的字段数和定义字段数不同时提示异常。

    • PAD:按从左到右顺序填充。

      • 解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。

      • 解析出的字段数小于定义字段数时,在行尾用Null填充缺少的字段。

    说明

    SKIP、EXCEPTION和PAD为可选值。

    columnErrorDebug

    是否打开调试开关。

    Boolean

    false

    如果设置为true,则打印解析异常的Log。

    pullBatchSize

    每次拉取消息的最大数量。

    Int

    64

    仅实时计算引擎VVR 8.0.7及以上版本支持该参数。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    producerGroup

    写入的群组。

    String

    无。

    retryTimes

    写入的重试次数。

    Int

    10

    无。

    sleepTimeMs

    重试间隔时间。

    Long

    5000

    无。

    partitionField

    指定字段名,将该字段作为分区列。

    String

    如果modepartition,则该参数必填。

    说明

    仅实时计算引擎VVR 8.0.5及以上版本支持该参数。

类型映射

Flink字段类型

云消息队列RocketMQ字段类型

VARCHAR

STRING

代码示例

  • 源表示例

    • CSV格式

      假设您的一条CSV格式消息记录如下。

      1,name,male 
      2,name,female
      说明

      一条RocketMQ消息可以包括零条到多条数据记录,记录之间使用\n分隔。

      Flink作业中,声明RocketMQ数据源表的DDL如下。

      • RocketMQ 5.x

      CREATE TABLE mq_source(
        id varchar,
        name varchar,
        gender varchar,
        topic varchar metadata virtual
      ) WITH (
        'connector' = 'mq5',
        'topic' = 'mq-test',
        'endpoint' = '<yourEndpoint>',
        'consumerGroup' = 'mq-group',
        'fieldDelimiter' = ','
      );
      • RocketMQ 4.x

      CREATE TABLE mq_source(
        id varchar,
        name varchar,
        gender varchar,
        topic varchar metadata virtual
      ) WITH (
        'connector' = 'mq',
        'topic' = 'mq-test',
        'endpoint' = '<yourEndpoint>',
        'pullIntervalMs' = '1000',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'consumerGroup' = 'mq-group',
        'fieldDelimiter' = ','
      );
    • 二进制格式

      • RocketMQ 5.x

        CREATE TEMPORARY TABLE source_table (
          mess varbinary
        ) WITH (
          'connector' = 'mq5',
          'endpoint' = '<yourEndpoint>',
          'topic' = 'mq-test',
          'consumerGroup' = 'mq-group'
        );
        
        CREATE TEMPORARY TABLE out_table (
          commodity varchar
        ) WITH (
          'connector' = 'print'
        );
        
        INSERT INTO out_table
        select 
          cast(mess as varchar)
        FROM source_table;

      • RocketMQ 4.x

        CREATE TEMPORARY TABLE source_table (
          mess varbinary
        ) WITH (
          'connector' = 'mq',
          'endpoint' = '<yourEndpoint>',
          'pullIntervalMs' = '500',
          'accessId' = '${secret_values.ak_id}',
          'accessKey' = '${secret_values.ak_secret}',
          'topic' = 'mq-test',
          'consumerGroup' = 'mq-group'
        );
        
        CREATE TEMPORARY TABLE out_table (
          commodity varchar
        ) WITH (
          'connector' = 'print'
        );
        
        INSERT INTO out_table
        select 
          cast(mess as varchar)
        FROM source_table;
  • 结果表示例

    • 创建结果表

      • RocketMQ 5.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR
        ) WITH (
          'connector'='mq5',
          'endpoint'='<yourEndpoint>',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
      • RocketMQ 4.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR
        ) WITH (
          'connector'='mq',
          'endpoint'='<yourEndpoint>',
          'accessId'='${secret_values.ak_id}',
          'accessKey'='${secret_values.ak_secret}',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
        说明

        如果您的MQ消息为二进制格式,则DDL中只能定义一个字段,且字段类型必须为VARBINARY。

    • 创建将keystags字段指定为RocketMQ消息的key和tag的结果表

      • RocketMQ 5.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR,
          keys VARCHAR METADATA,
          tags VARCHAR METADATA
        ) WITH (
          'connector'='mq5',
          'endpoint'='<yourEndpoint>',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
      • RocketMQ 4.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR,
          keys VARCHAR METADATA,
          tags VARCHAR METADATA
        ) WITH (
          'connector'='mq',
          'endpoint'='<yourEndpoint>',
          'accessId'='${secret_values.ak_id}',
          'accessKey'='${secret_values.ak_secret}',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );

DataStream API

重要

通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法

实时计算引擎VVR提供MetaQSource,用于读取RocketMQ;提供OutputFormat的实现类MetaQOutputFormat,用于写入RocketMQ。读取RocketMQ和写入RocketMQ的示例如下:

RocketMQ 4.x

/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQDataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String INSTANCE_ID = "<instanceID>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();

        // 以下两个配置仅本地调试时使用,需要在作业打包上传到阿里云实时计算Flink版之前删除
        conf.setString("pipeline.classpaths", "file://" + "uber jar绝对路径");
        conf.setString("classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        // Creates and adds RocketMQ source.
        env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
                // Converts message body to upper case.
                .map(RocketMQDataStreamDemo2::convertMessages)
                // Creates and adds RocketMQ sink.
                .addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
                .name(RocketMQDataStreamDemo2.class.getSimpleName());
        // Compiles and submits job.
        env.execute("RocketMQ connector end-to-end DataStream demo");
    }

    private static MetaQSource<MessageExt> createRocketMQSource() {
        Properties mqProperties = createMQProperties();

        return new MetaQSource<>(SOURCE_TOPIC,
                CONSUMER_GROUP,
                null, // always null
                null, // tag of the messages to consumer
                Long.MAX_VALUE, // stop timestamp in milliseconds
                -1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
                0, // Start offset.
                300_000, // Partition discover interval.
                mqProperties,
                Boundedness.CONTINUOUS_UNBOUNDED,
                new MyDeserializationSchema());
    }

    private static MetaQOutputFormat createRocketMQOutputFormat() {
        return new MetaQOutputFormat.Builder()
                .setTopicName(SINK_TOPIC)
                .setProducerGroup(PRODUCER_GROUP)
                .setMqProperties(createMQProperties())
                .build();
    }

    private static Properties createMQProperties() {
        Properties properties = new Properties();
        properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
        properties.put(NAMESRV_ADDR, ENDPOINT);
        properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
        properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
        properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
        properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
        return properties;
    }

    private static List<MessageExt> convertMessages(MessageExt messages) {
        return Collections.singletonList(messages);
    }

    public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
        @Override
        public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
            for (MessageExt messageExt : list) {
                collector.collect(messageExt);
            }
        }

        @Override
        public TypeInformation<MessageExt> getProducedType() {
            return TypeInformation.of(MessageExt.class);
        }
    }
}
    }
}

RocketMQ 5.x

/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQ5DataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();

        // 以下两个配置仅本地调试时使用,需要在作业打包上传到阿里云实时计算Flink版之前删除
        conf.setString("pipeline.classpaths", "file://" + "uber jar绝对路径");
        conf.setString(
                "classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        final DataStreamSource<String> ds =
                env.fromSource(
                        RocketMQSource.<String>builder()
                                .setEndpoint(ENDPOINT)
                                .setAccessId(ACCESS_ID)
                                .setAccessKey(ACCESS_KEY)
                                .setTopic(SOURCE_TOPIC)
                                .setConsumerGroup(CONSUMER_GROUP)
                                .setDeserializationSchema(new MyDeserializer())
                                .setStartOffset(1)
                                .build(),
                        WatermarkStrategy.noWatermarks(),
                        "source");

        ds.map(new ToMessage())
                .addSink(
                        new OutputFormatSinkFunction<>(
                                new RocketMQOutputFormat.Builder()
                                        .setEndpoint(ENDPOINT)
                                        .setAccessId(ACCESS_ID)
                                        .setAccessKey(ACCESS_KEY)
                                        .setTopicName(SINK_TOPIC)
                                        .setProducerGroup(PRODUCER_GROUP)
                                        .build()));

        env.execute();
    }

    private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
        @Override
        public void deserialize(List<MessageExt> record, Collector<String> out) {
            for (MessageExt messageExt : record) {
                out.collect(new String(messageExt.getBody()));
            }
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return Types.STRING;
        }
    }

    private static class ToMessage implements MapFunction<String, List<MessageExt>> {

        public ToMessage() {
        }

        @Override
        public List<MessageExt> map(String s) {
            final MessageExt message = new MessageExt();
            message.setBody(s.getBytes());
            message.setWaitStoreMsgOK(true);
            return Collections.singletonList(message);
        }
    }
}

XML

Maven中央库中已经放置了MQ DataStream连接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq</artifactId>
    <version>${vvr-version}</version>
</dependency>
说明

RocketMQ接入点Endpoint配置详情请参见关于TCP内网接入点设置的公告

常见问题

RocketMQ Topic扩容时,RocketMQ如何感知Topic分区数变化?