云消息队列 RocketMQ 版

更新时间:
复制为 MD 格式

本文介绍如何在事件总线EventBridge控制台添加云消息队列 RocketMQ 版作为事件流中的事件提供方。

前提条件

操作步骤

  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流

  2. 在顶部菜单栏,选择地域,然后单击创建事件流

  3. 创建事件流面板,设置任务名称描述,配置以下参数,然后单击保存

    • 任务创建

      1. Source(源)配置向导,选择数据提供方消息队列 RocketMQ 版,设置以下参数,然后单击下一步

        参数

        说明

        示例

        地域

        选择消息队列RocketMQ源实例所在的地域。

        华东1(杭州)

        版本

        选择RocketMQ实例版本。支持以下两个版本:

        • RocketMQ 4.x

        • RocketMQ 5.x

        RocketMQ 5.x

        RocketMQ 实例

        选择生产消息的源云消息队列 RocketMQ 版实例。

        rmq-cn-jte3w5i****

        Topic

        选择生产消息队列RocketMQ消息的Topic。

        topic

        Tag

        配置源实例中用于过滤消息的Tag。

        test

        Group ID

        RocketMQ消费组名称。

        • 快速创建:推荐方案。如果您选择自动创建,系统将自动创建以 GID_EVENTBRIDGE_xxx 命名的 Group ID。

        • 使用已有:如果您选择使用已创建的Group,请选择独立的 Group ID,不要和已有的业务混用,以免影响已有的消息收发。

        快速创建

        消费位点

        选择开始消费消息的位点。

        • 最新位点:从最新位点开始消费。

        • 最早位点:从最初位点开始消费。

        • 指定时间戳:从指定时间开始消费。

        指定时间戳

        消费时间点

        选择消费时间点。仅当消费点位配置为指定时间戳时需配置此参数。

        2025-03-10 17:27:01

        VPC

        RocketMQ实例的VPC ID,系统会自动选择,无需您手动选择。

        vpc-****

        交换机

        RocketMQ实例的交换机ID,系统会自动选择,无需您手动选择。

        vsw-****

        安全组

        支持以下两种方案。

        • 快速创建:推荐方案,如果您选择此种方案,会自动创建以eb-毫秒时间戳-security_group命名的安全组。

        • 使用已有:如果您选择此种方案,需要选择该VPC下的安全组,推荐使用默认安全组策略。

        快速创建

        数据格式(Body)

        数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力。支持多种数据格式编码,如无特殊编码诉求可将格式设置为Json。

        • Json(默认Json格式编码,二进制数据按照utf-8 编码为Json格式放入Payload。)

        • Text(文本格式编码,二进制数据按照utf-8编码为字符串放入Payload。)

        • Binary(二进制格式编码,二进制数据按照Base64编码为字符串放入Payload。)

        Json

        批量推送条数

        一次调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

        100

        批量推送间隔(单位:秒)

        调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发送给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

        3

      2. Filtering(过滤)Transform(转换)Sink(目标)配置向导,设置事件过滤、转换规则及事件目标。事件转换的配置说明,请参见使用函数计算实现消息数据清洗

    • 任务属性

      设置事件流的重试策略及死信队列。更多信息,请参见重试和死信

  4. 返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用

    启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。

事件示例

{
    "specversion":"1.0",
    "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
    "source":"acs:mq",
    "type":"mq:Topic:SendMessage",
    "subject":"acs:mq:cn-hangzhou:123456789098****:MQ_INST_123456789098****_BXhFHryi%TopicName",
    "datacontenttype":"application/json; charset=utf-8",
    "time":"2021-04-08T06:01:20.766Z",
    "aliyunpublishtime":"2021-04-08T06:01:20.725Z",
    "aliyuneventbusname":"BusName",
    "data":{
        "topic":"TopicName",
        "systemProperties":{
            "MIN_OFFSET":"0",
            "TRACE_ON":"true",
            "MAX_OFFSET":"8",
            "MSG_REGION":"cn-hangzhou",
            "KEYS":"systemProperties.KEYS",
            "CONSUME_START_TIME":1628577790396,
            "UNIQ_KEY":"AC14C305069E1B28CDFA3181CDA2****",
            "TAGS":"systemProperties.TAGS",
            "INSTANCE_ID":"MQ_INST_123456789098****_BXhFHryi"
        },
        "userProperties":{
        },
        "body":"TEST"
    }
}

CloudEvents规范中定义的参数解释,请参见事件概述

data字段包含的参数解释如下表所示。

参数

类型

示例

描述

topic

String

TopicName

Topic名称。

systemProperties

Map

系统属性。

MIN_OFFSET

Int

0

最低位点。

TRACE_ON

Boolean

true

是否有消息轨迹。取值说明如下:

  • true:有消息轨迹。

  • false:无消息轨迹。

MAX_OFFSET

Int

8

最高位点。

MSG_REGION

String

cn-hangzhou

发送消息的地域。

KEYS

String

systemProperties.KEYS

过滤属性。

CONSUME_START_TIME

Long

1628577790396

开始消费时间。单位:毫秒。

UNIQ_KEY

String

AC14C305069E1B28CDFA3181CDA2****

消息唯一键。

TAGS

String

systemProperties.TAGS

过滤属性。

INSTANCE_ID

String

MQ_INST_123456789098****_BXhFHryi

实例ID。

userProperties

Map

用户属性。

body

String

TEST

消息内容。