云消息队列 RocketMQ 版

本文介绍如何在事件总线EventBridge控制台添加云消息队列 RocketMQ 版作为事件流中的事件提供方。

前提条件

操作步骤

  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流
  2. 在顶部菜单栏,选择地域,然后单击创建事件流
  3. 创建事件流面板,设置任务名称描述,配置以下参数,然后单击保存

    • 任务创建

      1. Source(源)配置向导,选择数据提供方消息队列 RocketMQ 版,设置以下参数,然后单击下一步

        参数

        说明

        示例

        地域

        选择消息队列RocketMQ版源实例所在的地域。

        华东1(杭州)

        版本

        选择RocketMQ实例版本。

        RocketMQ 4.x

        RocketMQ 实例

        选择生产消息队列RocketMQ版消息的源实例。

        MQ_INST_115964845466****_ByBeUp3p

        Topic

        选择生产消息队列RocketMQ版消息的Topic。

        topic

        Tag

        配置源实例中用于过滤消息的Tag。

        test

        Group ID

        选择源实例中的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。

        GID_http_1

        消费位点

        选择开始消费消息的位点。

        最新位点

        批量推送

        批量推送可帮您批量聚合多个事件,当批量推送条数批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。

        例如:您设置的推送条数为100 条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等15 s后再推送。

        开启

        批量推送条数

        调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

        100

        批量推送间隔(单位:秒)

        调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

        3

      2. Filtering(过滤)Transform(转换)Sink(目标)配置向导,设置事件过滤、转换规则及事件目标。事件转换的配置说明,请参见使用函数计算实现消息数据清洗

    • 任务属性

      设置事件流的重试策略及死信队列。更多信息,请参见重试和死信

  4. 返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用

    启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。

事件示例

{
    "specversion":"1.0",
    "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
    "source":"acs:mq",
    "type":"mq:Topic:SendMessage",
    "subject":"acs:mq:cn-hangzhou:123456789098****:MQ_INST_123456789098****_BXhFHryi%TopicName",
    "datacontenttype":"application/json; charset=utf-8",
    "time":"2021-04-08T06:01:20.766Z",
    "aliyunpublishtime":"2021-04-08T06:01:20.725Z",
    "aliyuneventbusname":"BusName",
    "data":{
        "topic":"TopicName",
        "systemProperties":{
            "MIN_OFFSET":"0",
            "TRACE_ON":"true",
            "MAX_OFFSET":"8",
            "MSG_REGION":"cn-hangzhou",
            "KEYS":"systemProperties.KEYS",
            "CONSUME_START_TIME":1628577790396,
            "UNIQ_KEY":"AC14C305069E1B28CDFA3181CDA2****",
            "TAGS":"systemProperties.TAGS",
            "INSTANCE_ID":"MQ_INST_123456789098****_BXhFHryi"
        },
        "userProperties":{
        },
        "body":"TEST"
    }
}

CloudEvents规范中定义的参数解释,请参见事件概述

data字段包含的参数解释如下表所示。

参数

类型

示例

描述

topic

String

TopicName

Topic名称。

systemProperties

Map

系统属性。

MIN_OFFSET

Int

0

最低位点。

TRACE_ON

Boolean

true

是否有消息轨迹。取值说明如下:

  • true:有消息轨迹。

  • false:无消息轨迹。

MAX_OFFSET

Int

8

最高位点。

MSG_REGION

String

cn-hangzhou

发送消息的地域。

KEYS

String

systemProperties.KEYS

过滤属性。

CONSUME_START_TIME

Long

1628577790396

开始消费时间。单位:毫秒。

UNIQ_KEY

String

AC14C305069E1B28CDFA3181CDA2****

消息唯一键。

TAGS

String

systemProperties.TAGS

过滤属性。

INSTANCE_ID

String

MQ_INST_123456789098****_BXhFHryi

实例ID。

userProperties

Map

用户属性。

body

String

TEST

消息内容。