云消息队列 RocketMQ 版

本文为您介绍云消息队列 RocketMQ 版连接器。

重要

鉴于云消息队列 RocketMQ 版 4.x标准版实例共享API调用弹性上限为每秒5000次,使用该版本的消息中间件在与实时计算Flink版对接时,若超过上限会触发限流机制,可能会导致Flink作业运行不稳定。因此,在选择消息中间件时,如果您正在或计划通过标准版RocketMQFlink对接,请您谨慎评估。如果业务场景允许,请考虑使用Kafka、日志服务(SLS)或DataHub等其他中间件进行替代。如果您确实需要使用云消息队列 RocketMQ 版 4.x标准版处理大规模的消息,也请同时通过提交工单RocketMQ产品取得联系申请提升限速上限。

背景信息

云消息队列 RocketMQ 版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用和高可靠的分布式消息中间件。其既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐和可靠重试等特性。

RocketMQ连接器支持的信息如下。

类别

详情

支持类型

源表和结果表

运行模式

仅支持流模式

数据格式

CSV和二进制格式

特有监控指标

监控指标

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

  • 结果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

说明

指标含义详情,请参见监控指标说明

API种类

Datastream(仅支持RocketMQ 4.x)和SQL

是否支持更新或删除结果表数据

不支持更新和删除结果表数据,只支持插入数据。

特色功能

RocketMQ源表和结果表支持属性字段,具体如下。

  • 源表属性字段

    字段名

    字段类型

    说明

    topic

    VARCHAR METADATA VIRTUAL

    消息Topic。

    queue-id

    INT METADATA VIRTUAL

    消息队列ID。

    queue-offset

    BIGINT METADATA VIRTUAL

    消息队列的消费位点。

    msg-id

    VARCHAR METADATA VIRTUAL

    消息ID。

    store-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    消息存储时间。

    born-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    消息生成时间。

    keys

    VARCHAR METADATA VIRTUAL

    消息Keys。

    tags

    VARCHAR METADATA VIRTUAL

    消息Tags。

  • 结果表属性字段

    字段名

    字段类型

    说明

    keys

    VARCHAR METADATA

    消息Keys。

    tags

    VARCHAR METADATA

    消息Tags。

前提条件

已创建了RocketMQ资源,详情请参见创建资源

使用限制

  • Flink实时计算引擎VVR 8.0.3及以上版本支持5.x版本的RocketMQ。

  • RocketMQ连接器使用Pull Consumer消费,所有的子任务分担消费。

语法结构

CREATE TABLE mq_source(
  x varchar,
  y varchar,
  z varchar
) WITH (
  'connector' = 'mq5',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'consumerGroup' = '<yourConsumerGroup>'
);

WITH参数

通用

参数

说明

数据类型

是否必填

默认值

备注

connector

connector类型。

String

  • RocketMQ 4.x固定值为mq

  • RocketMQ 5.x固定值为mq5

endPoint

EndPoint地址

String

云消息队列 RocketMQ 版接入地址支持以下两种类型:

  • 内网服务MQ(阿里云经典网络/VPC)接入地址:在MQ控制台目标实例详情中,选择接入点 > TCP协议客户端接入点 > 内网访问,获取对应的EndPoint。

  • 公网服务MQ接入地址:在MQ控制台目标实例详情中,选择接入点 > TCP协议 > 客户端接入点 > 公网访问,获取对应的EndPoint。

重要

由于阿里云网络安全策略动态变化,实时计算连接公网服务MQ时可能会出现网络连接问题,推荐您使用内网服务MQ。

  • 内网服务无法跨域访问。例如,您所购买的实时计算服务的地域为华东1(杭州),但是购买的RocketMQ服务的地域为华东2(上海),则无法访问。

  • 通过公网方式访问RocketMQ,需要开通公网服务,详情请参见网络连接选型

topic

topic名称。

String

无。

accessId

  • 4.x:阿里云账号的AccessKey ID。

  • 5.x:

    RocketMQ实例用户名

String

  • RocketMQ 4.x:是

  • RocketMQ 5.x:否

重要

为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量

  • RocketMQ 5.x:如果是使用公网接入点访问,需配置为RocketMQ控制台实例用户名。如果是在阿里云ECS内网访问,无需填写该配置。

accessKey

  • 4.x: 阿里云账号的AccessKey Secret。

  • 5.x:实例密码

String

  • RocketMQ 4.x:是

  • RocketMQ 5.x:否

tag

订阅或写入的标签

String

  • RocketMQ作为源表时,只能读取单个tag。

  • RocketMQ作为结果表时,支持设置多个tag,以逗号(,)进行分隔。

说明

当作为结果表时,仅支持RocketMQ 4.x。RocketMQ 5.x请使用结果表属性字段来指定写出消息的 tag。

encoding

编码格式。

String

UTF-8

无。

instanceID

RocketMQ实例ID。

String

  • 如果RocketMQ实例无独立命名空间,则不可以使用instanceID参数。

  • 如果RocketMQ实例有独立命名空间,则instanceID参数必选。

说明

RocketMQ 4.x支持该参数。

源表独有

参数

说明

数据类型

是否必填

默认值

备注

consumerGroup

Consumer组名。

String

无。

pullIntervalMs

上游没有数据可供消费时,source的休眠时间。

Int

单位为毫秒。

目前没有限流机制,无法设置读取RocketMQ的速率。

说明

RocketMQ 4.x支持该参数。

timeZone

时区。

String

例如,Asia/Shanghai。

startTimeMs

启动时间点。

Long

时间戳,单位为毫秒。

startMessageOffset

消息开始的偏移量。

Int

如果填写该参数,则优先以startMessageOffset的位点开始加载数据。

lineDelimiter

解析Block时,行分隔符。

String

\n

无。

fieldDelimiter

字段分隔符。

String

\u0001

根据MQ终端的模式,分隔符分别为:

  • 在只读模式下(默认模式),分隔符为\u0001。该模式下,分隔符不可见。

  • 在编辑模式下,分隔符为^A

lengthCheck

单行字段条数检查策略。

Int

NONE

取值如下:

  • NONE:默认值。

    • 解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。

    • 解析出的字段数小于定义字段数时,跳过这行数据。

  • SKIP:解析出的字段数和定义字段数不同时跳过数据。

  • EXCEPTION:解析出的字段数和定义字段数不同时提示异常。

  • PAD:按从左到右顺序填充。

    • 解析出的字段数大于定义字段数时,按从左到右的顺序,取定义字段数量的数据。

    • 解析出的字段数小于定义字段数时,在行尾用Null填充缺少的字段。

columnErrorDebug

是否打开调试开关。

Boolean

false

如果设置为true,则打印解析异常的Log。

pullBatchSize

每次拉取消息的最大数量。

Int

64

仅实时计算引擎VVR 8.0.7及以上版本支持该参数。

结果表独有

参数

说明

数据类型

是否必填

默认值

备注

producerGroup

写入的群组。

String

无。

retryTimes

写入的重试次数。

Int

10

无。

sleepTimeMs

重试间隔时间。

Long

5000

无。

partitionField

指定字段名,将该字段作为分区列。

String

如果modepartition,则该参数必填。

说明

仅实时计算引擎VVR 8.0.5及以上版本支持该参数。

deliveryTimestampMode

指定延迟消息的模式,该模式与deliveryTimestampValue参数共同决定延迟消息的投递时间。

String

取值如下:

  • fixed:固定时间戳模式。

  • relative:相对延迟时间模式。

  • field:指定字段作为投递时间模式。

说明

仅实时计算引擎VVR 11.1及以上版本支持该参数。

deliveryTimestampType

指定延迟消息的时间基准类型

String

processing_time

取值如下:

  • event_time:事件时间。

  • processing_time:处理时间。

说明

仅实时计算引擎VVR 11.1及以上版本支持该参数。

deliveryTimestampValue

延迟消息的投递时间。

Long

根据deliveryTimestampMode的取值,其含义如下:

  • deliveryTimestampMode=fixed:延迟至指定的时间戳(毫秒级),如果当前时间超过该时间戳,则直接投递消息。

  • deliveryTimestampMode=relative:基于deliveryTimestampType时间类型的延迟时间(默认单位:毫秒)。

  • deliveryTimestampMode=field:参数不生效,延迟时间由deliveryTimestampField指定的字段值决定。

说明

仅实时计算引擎VVR 11.1及以上版本支持该参数。

deliveryTimestampField

指定用作延迟消息投递时间的字段。字段类型必须为BIGINT

String

deliveryTimestampModefield时生效。

说明

仅实时计算引擎VVR 11.1及以上版本支持该参数。

类型映射

Flink字段类型

云消息队列RocketMQ字段类型

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

代码示例

源表示例

  • CSV格式

    假设您的一条CSV格式消息记录如下。

    1,name,male 
    2,name,female
    说明

    一条RocketMQ消息可以包括零条到多条数据记录,记录之间使用\n分隔。

    Flink作业中,声明RocketMQ数据源表的DDL如下。

    • RocketMQ 5.x

    CREATE TABLE mq_source(
      id varchar,
      name varchar,
      gender varchar,
      topic varchar metadata virtual
    ) WITH (
      'connector' = 'mq5',
      'topic' = 'mq-test',
      'endpoint' = '<yourEndpoint>',
      'consumerGroup' = 'mq-group',
      'fieldDelimiter' = ','
    );
    • RocketMQ 4.x

    CREATE TABLE mq_source(
      id varchar,
      name varchar,
      gender varchar,
      topic varchar metadata virtual
    ) WITH (
      'connector' = 'mq',
      'topic' = 'mq-test',
      'endpoint' = '<yourEndpoint>',
      'pullIntervalMs' = '1000',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'consumerGroup' = 'mq-group',
      'fieldDelimiter' = ','
    );
  • 二进制格式

    • RocketMQ 5.x

      CREATE TEMPORARY TABLE source_table (
        mess varbinary
      ) WITH (
        'connector' = 'mq5',
        'endpoint' = '<yourEndpoint>',
        'topic' = 'mq-test',
        'consumerGroup' = 'mq-group'
      );
      
      CREATE TEMPORARY TABLE out_table (
        commodity varchar
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO out_table
      select 
        cast(mess as varchar)
      FROM source_table;

    • RocketMQ 4.x

      CREATE TEMPORARY TABLE source_table (
        mess varbinary
      ) WITH (
        'connector' = 'mq',
        'endpoint' = '<yourEndpoint>',
        'pullIntervalMs' = '500',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'topic' = 'mq-test',
        'consumerGroup' = 'mq-group'
      );
      
      CREATE TEMPORARY TABLE out_table (
        commodity varchar
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO out_table
      select 
        cast(mess as varchar)
      FROM source_table;

结果表示例

  • 创建结果表

    • RocketMQ 5.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR
      ) WITH (
        'connector'='mq5',
        'endpoint'='<yourEndpoint>',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
    • RocketMQ 4.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR
      ) WITH (
        'connector'='mq',
        'endpoint'='<yourEndpoint>',
        'accessId'='${secret_values.ak_id}',
        'accessKey'='${secret_values.ak_secret}',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
      说明

      如果您的MQ消息为二进制格式,则DDL中只能定义一个字段,且字段类型必须为VARBINARY。

  • 创建将keystags字段指定为RocketMQ消息的keytag的结果表

    • RocketMQ 5.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR,
        keys VARCHAR METADATA,
        tags VARCHAR METADATA
      ) WITH (
        'connector'='mq5',
        'endpoint'='<yourEndpoint>',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );
    • RocketMQ 4.x

      CREATE TABLE mq_sink (
        id INTEGER,
        len BIGINT,
        content VARCHAR,
        keys VARCHAR METADATA,
        tags VARCHAR METADATA
      ) WITH (
        'connector'='mq',
        'endpoint'='<yourEndpoint>',
        'accessId'='${secret_values.ak_id}',
        'accessKey'='${secret_values.ak_secret}',
        'topic'='<yourTopicName>',
        'producerGroup'='<yourGroupName>'
      );

常见问题

RocketMQ Topic扩容时,RocketMQ如何感知Topic分区数变化?