本文为您介绍云消息队列 RocketMQ 版连接器。
鉴于云消息队列 RocketMQ 版 4.x标准版实例共享API调用弹性上限为每秒5000次,使用该版本的消息中间件在与实时计算Flink版对接时,若超过上限会触发限流机制,可能会导致Flink作业运行不稳定。因此,在选择消息中间件时,如果您正在或计划通过标准版RocketMQ与Flink对接,请您谨慎评估。如果业务场景允许,请考虑使用Kafka、日志服务(SLS)或DataHub等其他中间件进行替代。如果您确实需要使用云消息队列 RocketMQ 版 4.x标准版处理大规模的消息,也请同时通过提交工单与RocketMQ产品取得联系申请提升限速上限。
背景信息
云消息队列 RocketMQ 版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用和高可靠的分布式消息中间件。其既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐和可靠重试等特性。
RocketMQ连接器支持的信息如下。
| 类别 | 详情 | 
| 支持类型 | 源表和结果表 | 
| 运行模式 | 仅支持流模式 | 
| 数据格式 | CSV和二进制格式 | 
| 特有监控指标 | |
| API种类 | Datastream(仅支持RocketMQ 4.x)和SQL | 
| 是否支持更新或删除结果表数据 | 不支持更新和删除结果表数据,只支持插入数据。 | 
特色功能
RocketMQ源表和结果表支持属性字段,具体如下。
- 源表属性字段 - 字段名 - 字段类型 - 说明 - topic - VARCHAR METADATA VIRTUAL - 消息Topic。 - queue-id - INT METADATA VIRTUAL - 消息队列ID。 - queue-offset - BIGINT METADATA VIRTUAL - 消息队列的消费位点。 - msg-id - VARCHAR METADATA VIRTUAL - 消息ID。 - store-timestamp - TIMESTAMP(3) METADATA VIRTUAL - 消息存储时间。 - born-timestamp - TIMESTAMP(3) METADATA VIRTUAL - 消息生成时间。 - keys - VARCHAR METADATA VIRTUAL - 消息Keys。 - tags - VARCHAR METADATA VIRTUAL - 消息Tags。 
- 结果表属性字段 - 字段名 - 字段类型 - 说明 - keys - VARCHAR METADATA - 消息Keys。 - tags - VARCHAR METADATA - 消息Tags。 
前提条件
已创建了RocketMQ资源,详情请参见创建资源。
使用限制
- 仅Flink实时计算引擎VVR 8.0.3及以上版本支持5.x版本的RocketMQ。 
- RocketMQ连接器使用Pull Consumer消费,所有的子任务分担消费。 
语法结构
CREATE TABLE mq_source(
  x varchar,
  y varchar,
  z varchar
) WITH (
  'connector' = 'mq5',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'consumerGroup' = '<yourConsumerGroup>'
);WITH参数
通用
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| connector | connector类型。 | String | 是 | 无 | 
 | 
| endPoint | EndPoint地址 | String | 是 | 无 | 云消息队列 RocketMQ 版接入地址支持以下两种类型: 
 重要  由于阿里云网络安全策略动态变化,实时计算连接公网服务MQ时可能会出现网络连接问题,推荐您使用内网服务MQ。 
 | 
| topic | topic名称。 | String | 是 | 无 | 无。 | 
| accessId | 
 | String | 
 | 无 | 
 重要  为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量。 
 | 
| accessKey | 
 | String | 
 | 无 | |
| tag | 订阅或写入的标签 | String | 否 | 无 | 
 说明  当作为结果表时,仅支持RocketMQ 4.x。RocketMQ 5.x请使用结果表属性字段来指定写出消息的 tag。 | 
| encoding | 编码格式。 | String | 否 | UTF-8 | 无。 | 
| instanceID | RocketMQ实例ID。 | String | 否 | 无 | 
 说明  仅RocketMQ 4.x支持该参数。 | 
源表独有
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| consumerGroup | Consumer组名。 | String | 是 | 无 | 无。 | 
| pullIntervalMs | 上游没有数据可供消费时,source的休眠时间。 | Int | 是 | 无 | 单位为毫秒。 目前没有限流机制,无法设置读取RocketMQ的速率。 说明  仅RocketMQ 4.x支持该参数。 | 
| timeZone | 时区。 | String | 否 | 无 | 例如,Asia/Shanghai。 | 
| startTimeMs | 启动时间点。 | Long | 否 | 无 | 时间戳,单位为毫秒。 | 
| startMessageOffset | 消息开始的偏移量。 | Int | 否 | 无 | 如果填写该参数,则优先以 | 
| lineDelimiter | 解析Block时,行分隔符。 | String | 否 | \n | 无。 | 
| fieldDelimiter | 字段分隔符。 | String | 否 | \u0001 | 根据MQ终端的模式,分隔符分别为: 
 | 
| lengthCheck | 单行字段条数检查策略。 | Int | 否 | NONE | 取值如下: 
 | 
| columnErrorDebug | 是否打开调试开关。 | Boolean | 否 | false | 如果设置为true,则打印解析异常的Log。 | 
| pullBatchSize | 每次拉取消息的最大数量。 | Int | 否 | 64 | 仅实时计算引擎VVR 8.0.7及以上版本支持该参数。 | 
结果表独有
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| producerGroup | 写入的群组。 | String | 是 | 无 | 无。 | 
| retryTimes | 写入的重试次数。 | Int | 否 | 10 | 无。 | 
| sleepTimeMs | 重试间隔时间。 | Long | 否 | 5000 | 无。 | 
| partitionField | 指定字段名,将该字段作为分区列。 | String | 否 | 无 | 如果 说明  仅实时计算引擎VVR 8.0.5及以上版本支持该参数。 | 
| deliveryTimestampMode | 指定延迟消息的模式,该模式与 | String | 否 | 无 | 取值如下: 
 说明  仅实时计算引擎VVR 11.1及以上版本支持该参数。 | 
| deliveryTimestampType | 指定延迟消息的时间基准类型。 | String | 否 | processing_time | 取值如下: 
 说明  仅实时计算引擎VVR 11.1及以上版本支持该参数。 | 
| deliveryTimestampValue | 延迟消息的投递时间。 | Long | 否 | 无 | 根据 
 说明  仅实时计算引擎VVR 11.1及以上版本支持该参数。 | 
| deliveryTimestampField | 指定用作延迟消息投递时间的字段。字段类型必须为 | String | 否 | 无 | 
 说明  仅实时计算引擎VVR 11.1及以上版本支持该参数。 | 
类型映射
| Flink字段类型 | 云消息队列RocketMQ字段类型 | 
| BOOLEAN | STRING | 
| VARBINARY | |
| VARCHAR | |
| TINYINT | |
| INTEGER | |
| BIGINT | |
| FLOAT | |
| DOUBLE | |
| DECIMAL | 
代码示例
源表示例
- CSV格式 - 假设您的一条CSV格式消息记录如下。 - 1,name,male 2,name,female说明- 一条RocketMQ消息可以包括零条到多条数据记录,记录之间使用 - \n分隔。- Flink作业中,声明RocketMQ数据源表的DDL如下。 - RocketMQ 5.x 
 - CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq5', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );- RocketMQ 4.x 
 - CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );
- 二进制格式 - RocketMQ 5.x - CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq5', 'endpoint' = '<yourEndpoint>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
- RocketMQ 4.x - CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
 
结果表示例
- 创建结果表 - RocketMQ 5.x - CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
- RocketMQ 4.x - CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );说明- 如果您的MQ消息为二进制格式,则DDL中只能定义一个字段,且字段类型必须为VARBINARY。 
 
- 创建将 - keys和- tags字段指定为RocketMQ消息的key和tag的结果表- RocketMQ 5.x - CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
- RocketMQ 4.x - CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
 
DataStream API
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器使用方法。
实时计算引擎VVR提供MetaQSource,用于读取RocketMQ;提供OutputFormat的实现类MetaQOutputFormat,用于写入RocketMQ。读取RocketMQ和写入RocketMQ的示例如下:
RocketMQ 5.x
在RocketMQ 5.x 中,访问密钥对应的是实例中配置的用户名和密码。若通过内网访问RocketMQ实例,且实例未开启 ACL 认证,则可不填写AK/SK参数。
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.mq5.shaded.org.apache.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.mq5.sink.RocketMQOutputFormat;
import com.alibaba.ververica.connectors.mq5.source.RocketMQSource;
import com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Collections;
import java.util.List;
/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQ5DataStreamDemo {
    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";
    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();
        // 以下两个配置仅本地调试时使用,需要在作业打包上传到阿里云实时计算Flink版之前删除
        conf.setString("pipeline.classpaths", "file://" + "uber jar绝对路径");
        conf.setString(
                "classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        final DataStreamSource<String> ds =
                env.fromSource(
                        RocketMQSource.<String>builder()
                                .setEndpoint(ENDPOINT)
                                .setAccessId(ACCESS_ID)
                                .setAccessKey(ACCESS_KEY)
                                .setTopic(SOURCE_TOPIC)
                                .setConsumerGroup(CONSUMER_GROUP)
                                .setDeserializationSchema(new MyDeserializer())
                                .setStartOffset(1)
                                .build(),
                        WatermarkStrategy.noWatermarks(),
                        "source");
        ds.map(new ToMessage())
                .addSink(
                        new OutputFormatSinkFunction<>(
                                new RocketMQOutputFormat.Builder()
                                        .setEndpoint(ENDPOINT)
                                        .setAccessId(ACCESS_ID)
                                        .setAccessKey(ACCESS_KEY)
                                        .setTopicName(SINK_TOPIC)
                                        .setProducerGroup(PRODUCER_GROUP)
                                        .build()));
        env.execute();
    }
    private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
        @Override
        public void deserialize(List<MessageExt> record, Collector<String> out) {
            for (MessageExt messageExt : record) {
                out.collect(new String(messageExt.getBody()));
            }
        }
        @Override
        public TypeInformation<String> getProducedType() {
            return Types.STRING;
        }
    }
    private static class ToMessage implements MapFunction<String, List<MessageExt>> {
        public ToMessage() {
        }
        @Override
        public List<MessageExt> map(String s) {
            final MessageExt message = new MessageExt();
            message.setBody(s.getBytes());
            message.setWaitStoreMsgOK(true);
            return Collections.singletonList(message);
        }
    }
}RocketMQ 4.x
import com.alibaba.ververica.connector.mq.shaded.com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.ververica.connectors.common.sink.OutputFormatSinkFunction;
import com.alibaba.ververica.connectors.metaq.sink.MetaQOutputFormat;
import com.alibaba.ververica.connectors.metaq.source.MetaQSource;
import com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import static com.alibaba.ververica.connector.mq.shaded.com.taobao.metaq.client.ExternConst.*;
/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQDataStreamDemo {
    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String INSTANCE_ID = "<instanceID>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";
    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();
        // 以下两个配置仅本地调试时使用,需要在作业打包上传到阿里云实时计算Flink版之前删除
        conf.setString("pipeline.classpaths", "file://" + "uber jar绝对路径");
        conf.setString("classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        // Creates and adds RocketMQ source.
        env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
                // Converts message body to upper case.
                .map(RocketMQDataStreamDemo2::convertMessages)
                // Creates and adds RocketMQ sink.
                .addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
                .name(RocketMQDataStreamDemo2.class.getSimpleName());
        // Compiles and submits job.
        env.execute("RocketMQ connector end-to-end DataStream demo");
    }
    private static MetaQSource<MessageExt> createRocketMQSource() {
        Properties mqProperties = createMQProperties();
        return new MetaQSource<>(SOURCE_TOPIC,
                CONSUMER_GROUP,
                null, // always null
                null, // tag of the messages to consumer
                Long.MAX_VALUE, // stop timestamp in milliseconds
                -1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
                0, // Start offset.
                300_000, // Partition discover interval.
                mqProperties,
                Boundedness.CONTINUOUS_UNBOUNDED,
                new MyDeserializationSchema());
    }
    private static MetaQOutputFormat createRocketMQOutputFormat() {
        return new MetaQOutputFormat.Builder()
                .setTopicName(SINK_TOPIC)
                .setProducerGroup(PRODUCER_GROUP)
                .setMqProperties(createMQProperties())
                .build();
    }
    private static Properties createMQProperties() {
        Properties properties = new Properties();
        properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
        properties.put(NAMESRV_ADDR, ENDPOINT);
        properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
        properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
        properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
        properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
        return properties;
    }
    private static List<MessageExt> convertMessages(MessageExt messages) {
        return Collections.singletonList(messages);
    }
    public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
        @Override
        public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
            for (MessageExt messageExt : list) {
                collector.collect(messageExt);
            }
        }
        @Override
        public TypeInformation<MessageExt> getProducedType() {
            return TypeInformation.of(MessageExt.class);
        }
    }
}
    }
}XML
- MQ 4.x:MQ DataStream连接器。 
- MQ 5.x:MQ DataStream连接器。 
<!--MQ5-->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq5</artifactId>
    <version>${vvr-version}</version>
    <scope>provided</scope>
</dependency>
<!--MQ4-->
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq</artifactId>
    <version>${vvr-version}</version>
</dependency>RocketMQ接入点Endpoint配置详情请参见关于TCP内网接入点设置的公告。