文档

基于事件流实现RocketMQ消息路由

更新时间:

本文介绍如何应用事件总线EventBridge的事件流功能实现云消息队列 RocketMQ 版的消息路由。

前提条件

背景信息

事件流作为更轻量、实时端到端的流式事件通道,提供轻量流式数据的过滤和转换的能力,在不同的数据仓库之间、数据处理程序之间、数据分析和处理系统之间进行数据同步。源端云消息队列 RocketMQ 版生产的消息可以通过事件流这个通道被路由到目标端的云消息队列 RocketMQ 版,无需定义事件总线。更多信息,请参见事件流概述

步骤一:在目标端创建事件流

说明

事件流需要在目标端创建,例如如果需要把华北2(北京)的RocketMQ消息路由到华东1(杭州),那么需要在华东1(杭州)创建事件流任务。

  1. 登录事件总线EventBridge控制台
  2. 在顶部菜单栏,选择地域。
  3. 在左侧导航栏,单击事件流
  4. 事件流页面,单击创建事件流

  5. 创建事件流面板,设置任务名称描述,配置以下参数,然后单击保存

    • 任务创建

      1. Source(源)配置向导,选择数据提供方消息队列 RocketMQ 版,设置以下参数,然后单击下一步

        参数

        说明

        示例

        地域

        选择消息队列RocketMQ版源实例所在的地域。

        华东1(杭州)

        版本

        选择RocketMQ实例版本。

        RocketMQ 4.x

        RocketMQ 实例

        选择生产消息队列RocketMQ版消息的源实例。

        MQ_INST_115964845466****_ByBehioo

        Topic

        选择生产消息队列RocketMQ版消息的Topic。

        topic

        Tag

        配置源实例中用于过滤消息的Tag。

        test

        Group ID

        选择源实例中的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。

        GID_http_1

        消费位点

        选择开始消费消息的位点。

        最新位点

        批量推送

        批量推送可帮您批量聚合多个事件,当批量推送条数批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。

        例如:您设置的推送条数为100 条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等15 s后再推送。

        开启

        批量推送条数

        调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

        100

        批量推送间隔(单位:秒)

        调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

        3

      2. Filtering(过滤)Transform(转换)配置向导,设置事件过滤、转换规则,单击下一步。事件转换的配置说明,请参见使用函数计算实现消息数据清洗

      3. Sink(目标)配置向导,选择服务类型消息队列RocketMQ版,配置以下参数,单击保存

        参数

        说明

        示例

        版本

        选择云消息队列 RocketMQ 版实例版本。

        RocketMQ 4.x

        实例ID

        选择已创建的云消息队列 RocketMQ 版实例。

        test

        Topic

        选择已创建的Topic。

        test

        消息体(body)

        事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。

        部分事件

        $.data.body

        自定义属性(Properties)

        选择模板。您可以自定义一个模板,定义模板里需要的变量,事件总线EventBridge可以提取事件中的字段,按照模板定义的形式进行转换。

        说明

        如果需要全量传递源端的RocketMQ消息的属性,推荐使用示例中的配置。

        变量

        {
          "userProperties":"$.data.userProperties",
          "msgId":"$.data.systemProperties.UNIQ_KEY"
        }

        模板

        {
          "EB_SYS_EMBED_OBJECT":"${userProperties}",
          "UNIQ_KEY":"${msgId}"
        }

        消息索引(Keys)

        事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。

        部分事件

        $.data.systemProperties.KEYS

        消息标签(Tags)

        事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。

        部分事件

        $.data.systemProperties.TAGS
    • 任务属性

      设置事件流的重试策略及死信队列。更多信息,请参见重试和死信

  6. 返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用

  7. 提示对话框,阅读提示信息,然后单击确认

    启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。

步骤二:测试验证

  1. 登录云消息队列 RocketMQ 版控制台

  2. 在顶部菜单栏,选择步骤一:在目标端创建事件流中源实例所在的地域。

  3. 在左侧导航栏,单击实例列表

  4. 实例列表页面,找到步骤一:在目标端创建事件流中配置的源实例,在其操作列,单击详情

    查看实例
  5. 在左侧导航栏,单击Topic 管理

  6. 在Topic列表,单击步骤一:在目标端创建事件流中配置的源实例的Topic名称。

  7. 在Topic详情页面,单击右上角的快速体验

  8. 快速体验的消息生产和消费面板,选择发送方式控制台,然后配置消息内容消息 Key消息 Tag,单击确定

    发送消息消息发送成功后,界面会提示消息发送成功!,并显示Message ID。

  9. 在源实例完成生产消息后,返回实例列表页面。

  10. 实例列表页面,找到步骤一:在目标端创建事件流中配置的目标实例,在其操作列,单击详情

  11. 在左侧导航栏,单击Topic 管理

  12. 在Topic列表,单击步骤一:在目标端创建事件流中配置的目标实例的Topic名称。

  13. 在Topic详情页面,单击消息查询

  14. 配置查询方式查询范围,单击查询

    查询消息
  15. 查看查询到的Message ID、Tag和Key值是否与生产的消息一致。

  • 本页导读 (1)