基于事件流实现轻量消息队列(原 MNS)路由

本文介绍如何应用事件总线EventBridge的事件流功能实现轻量消息队列(原 MNS)的消息路由。

前提条件

背景信息

事件流作为更轻量、实时端到端的流式事件通道,提供轻量流式数据的过滤和转换的能力,在不同的数据仓库之间、数据处理程序之间、数据分析和处理系统之间进行数据同步。源端轻量消息队列(原 MNS)生产的消息可以通过事件流这个通道被路由到目标端的轻量消息队列(原 MNS),无需定义事件总线。更多信息,请参见事件流概述

步骤一:创建事件流

说明

事件总线EventBridge暂不支持跨地域创建轻量消息队列(原 MNS)的事件流。

  1. 登录事件总线EventBridge控制台
  2. 在顶部菜单栏,选择创建事件流的地域。

  3. 在左侧导航栏,单击事件流
  4. 事件流页面,单击创建事件流

  5. 创建事件流面板,设置任务名称描述,配置以下参数,然后单击保存

    • 任务创建

      1. Source(源)配置向导,选择数据提供方轻量消息队列(原 MNS),设置以下参数,然后单击下一步

        参数

        说明

        示例

        队列名称

        选择轻量消息队列(原 MNS)的队列。

        test-queue

        Base64 解码

        勾选此项会帮助您将轻量消息队列(原 MNS)的数据解码后进行投递。

        test

        批量推送

        批量推送可帮您批量聚合多个事件,当批量推送条数批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。

        例如:您设置的推送条数为100 条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等15 s后再推送。

        开启

        批量推送条数

        调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

        100

        批量推送间隔(单位:秒)

        调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

        3

      2. Filtering(过滤)Transform(转换)配置向导,设置事件过滤、转换规则,单击下一步。事件转换的配置说明,请参见使用函数计算实现消息数据清洗

      3. Sink(目标)配置向导,选择服务类型轻量消息队列(原 MNS),配置以下参数,单击保存

        参数

        说明

        示例

        队列名称

        选择已创建的轻量消息队列(原 MNS)队列。

        test

        开启 Base64 编码

        选择是否开启Base64编码,如不开启,会导致接收到的消息为乱码。

        消息体(body)

        选择事件内容转换类型。更多信息,请参考事件内容转换

        完整事件

    • 任务属性

      设置事件流的重试策略及死信队列。更多信息,请参见重试和死信

  6. 返回事件流页面,找到创建好的事件流,在其右侧操作栏,单击启用

    启用事件流后,会有30秒~60秒的延迟时间,您可以在事件流页面的状态栏查看启动进度。

步骤二:验证事件流

  1. 登录轻量消息队列(原 MNS)控制台

  2. 在顶部菜单栏,选择步骤一:创建事件流中事件流所在的地域。

  3. 在左侧导航栏,单击队列列表

  4. 队列列表页面,找到步骤一:创建事件流中配置的源队列,在其操作列,单击详情

    查看详情

  5. 在队列详情页面,单击右上角的收发消息

  6. 队列收发消息快速体验页面,配置消息内容消息定时时间,单击发送消息

    发送消息

    消息发送成功后,该页面会显示发送的消息ID。

  7. 在源队列完成发送消息后,返回队列列表页面。

  8. 队列列表页面,找到步骤一:创建事件流中配置的目标队列,在其操作列,单击详情

  9. 在队列详情页面,单击右上角的收发消息

  10. 队列收发消息快速体验页面,单击接收消息

    接收消息

  11. 查看查询到的消息ID和消息内容是否与生产的消息一致。