文档

云消息队列 RocketMQ 版

更新时间:
一键部署

本文介绍如何在云消息队列 RocketMQ 版控制台创建消息流入任务,实现将其他数据源中的数据快速导入云消息队列 RocketMQ 版

前提条件

您已购买云消息队列 RocketMQ 版实例,且实例处于服务中状态。具体步骤,请参见创建资源

创建消息流入任务

  1. 登录云消息队列 RocketMQ 版控制台,在左侧导航栏选择消息集成 > 消息流入
  2. 在顶部菜单栏选择地域,如华东1(杭州),然后在消息流入页面中,单击创建任务
  3. 消息流入创建面板,设置任务名称,选择流入类型消息队列RocketMQ,配置以下信息,然后单击确定

    1. 资源配置区域,设置以下参数。

      表 1. 源端云消息队列 RocketMQ 版参数配置

      参数

      说明

      示例

      地域

      默认选择创建消息流入任务时选择的地域且不可更改。

      华东1(杭州)

      版本

      选择云消息队列 RocketMQ 版实例的版本。

      • RocketMQ 4.x:服务端4.x版本。

      • RocketMQ 5.x:服务端5.x版本。

      RocketMQ 4.x

      RocketMQ 实例

      选择生产消息的源云消息队列 RocketMQ 版实例。

      MQ_INST_115964845466****_ByBeUp3p

      Topic

      选择生产消息的源Topic。

      topic

      Tag

      云消息队列 RocketMQ 版中用于过滤消息的标签。

      test_tag

      Group ID

      • 快速创建:自动创建以GID_EVENTBRIDGE_xxx 命名的Group ID。

      • 使用已有:选择源实例中的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。

      GID_http_1

      消费位点

      选择开始消费消息的位点。

      • 最新位点:从最新位点开始消费。

      • 最早位点:从最初位点开始消费。

      • 指定时间戳:从指定时间开始消费。

      最新位点

      表 2. 目标端云消息队列 RocketMQ 版参数配置

      参数

      说明

      示例

      地域

      接收云消息队列 RocketMQ 版消息的目标实例所在的地域。

      华东1(杭州)

      版本

      选择目标端云消息队列 RocketMQ 版实例的版本。

      RocketMQ 4.x

      实例ID

      选择接收云消息队列 RocketMQ 版消息的目标实例。

      MQ_INST_115964845466****_BYEiGXc4

      Topic

      选择目标实例的Topic。

      test

    2. 数据处理区域,设置消息的过滤、转换、重试和死信。

      • 消息过滤

        • 无需过滤:默认投递全部事件。

        • 自定义配置:使用事件模式匹配特定事件。详细规则,请参见消息转换

      • 消息转换

        • 默认配置:默认将消息内容或相关属性映射到目标端。映射关系,请参见默认配置映射关系

        • 自定义配置:根据自身需求配置自定义参数投递至目标端。

          参数

          说明

          示例

          消息体(body)

          • 数据清洗:数据清洗任务提供基本的算子能力,底层逻辑使用函数计算,可实现分割,映射,富化及动态路由等繁杂数据加工能力。更多信息,请参见使用函数计算实现消息数据清洗

          • 数据提取:通过 JSONPath语法提取消息中的数据,将指定的消息内容路由到目标。配置规则,请参见消息转换

          数据清洗

          服务及函数名称

          默认创建名称为<源端服务名称>_<目标端服务名称>_Transform_<TemplateName>_<timestamp>的服务和函数。当消息体(body)选择数据清洗且选择新建函数模板时,需要配置该参数。

          RocketMQ_RocketMQ_Transform_Customized_1688540172662

          函数模版

          选择函数模板。当消息体(body)选择数据清洗且选择新建函数模板时,需要配置该参数。

          • 内容分割:根据正则表达式对消息内容进行分割,将分割后的消息逐条发送至目标。

          • 内容映射:根据正则表达式对消息内容进行映射处理。例如,屏蔽消息中敏感字段或将消息大小缩减至最小标准。

          • 内容富化:根据富化源对消息内容进行富化。如果消息原始内容包含AccountID,处理时根据AccountID查询数据库,获得客户地域后填至源消息体中,并发送至目标服务。

          • 动态路由:根据正则表达式匹配消息内容,将匹配成功的消息路由至对应目标,将匹配不成功的消息路由至默认目标。

          内容分割

          服务

          选择进行数据加工的服务。

          test_service

          函数

          选择进行数据加工的函数。

          test_function

          版本和别名

          选择版本和别名。

          版本
          LATEST

          自定义属性(Properties)

          需要投递的事件的属性。配置规则,请参见消息转换

          模板

          // 变量
          {
            "userProperties":"$.data.userProperties",
            "msgId":"$.data.systemProperties.UNIQ_KEY"
          }
          // 模板
          {
            "EB_SYS_EMBED_OBJECT":"${userProperties}",
            "UNIQ_KEY":"${msgId}"
          }

          消息索引(Keys)

          需要投递的事件的过滤属性。配置规则,请参见消息转换

          部分事件

          $.data.systemProperties.KEYS

          消息标签(Tags)

          需要投递的事件的过滤属性。配置规则,请参见消息转换

          部分事件

          $.data.systemProperties.TAGS
      • 重试和死信:配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信

  4. 返回消息流入页面,找到创建好的任务,在其右侧操作列,单击启用

  5. 提示对话框,阅读提示信息,然后单击确认

    启用任务后,会有30秒~60秒的延迟时间,您可以在消息流入页面的状态栏查看启动进度。

其他操作

消息流入页面,找到目标任务,在其右侧操作列,执行其他操作。

  • 查看任务详情:单击详情,在消息流入详情页面,查看任务的源端配置、目标端配置及资源信息。

  • 编辑任务配置:单击编辑,在消息流入编辑面板,修改资源配置规则配置

  • 启停任务:单击启用或者停用,然后在提示对话框,单击确认

  • 删除任务:单击删除,然后在提示对话框,单击确认

  • 本页导读 (1)
文档反馈