本文介绍如何在云消息队列 RocketMQ 版控制台创建消息流入任务,实现将其他数据源中的数据快速导入云消息队列 RocketMQ 版

前提条件

您已购买云消息队列 RocketMQ 版实例,且实例处于服务中状态。具体步骤,请参见创建资源

创建消息流入任务

  1. 登录云消息队列 RocketMQ 版控制台,在左侧导航栏选择消息集成 > 消息流入
  2. 在顶部菜单栏选择地域,如华东1(杭州),然后在消息流入页面中,单击创建任务
  3. 消息流入创建面板,设置任务名称,选择流入类型消息队列RocketMQ,配置以下信息,然后单击确定
    1. 资源配置区域,设置以下参数。
      表 1. 源端消息队列RocketMQ版参数配置
      参数说明示例
      地域默认选择创建消息流入任务时选择的地域且不可更改。华东1(杭州)
      版本选择云消息队列 RocketMQ 版实例的版本。
      • RocketMQ 4.x:服务端4.x版本。
      • RocketMQ 5.x:服务端5.x版本。
      RocketMQ 4.x
      RocketMQ 实例选择生产消息的源云消息队列 RocketMQ 版实例。MQ_INST_115964845466****_ByBeUp3p
      Topic选择生产消息的源Topic。topic
      Tag云消息队列 RocketMQ 版中用于过滤消息的标签。test_tag
      Group ID
      • 快速创建:自动创建以GID_EVENTBRIDGE_xxx 命名的Group ID。
      • 使用已有:选择源实例中的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。
      GID_http_1
      消费位点选择开始消费消息的位点。
      • 最新位点:从最新位点开始消费。
      • 最早位点:从最初位点开始消费。
      • 指定时间戳:从指定时间开始消费。
      最新位点
      表 2. 目标端消息队列RocketMQ版参数配置
      参数说明示例
      地域接收云消息队列 RocketMQ 版消息的目标实例所在的地域。华东1(杭州)
      版本选择目标端云消息队列 RocketMQ 版实例的版本。RocketMQ 4.x
      实例ID选择接收云消息队列 RocketMQ 版消息的目标实例。MQ_INST_115964845466****_BYEiGXc4
      Topic选择目标实例的Topic。test
    2. 数据处理区域,设置消息的过滤、转换、重试和死信。
      • 消息过滤
        • 无需过滤:默认投递全部事件。
        • 自定义配置:使用事件模式匹配特定事件。详细规则,请参见消息过滤
      • 消息转换
        • 默认配置:默认将消息内容或相关属性映射到目标端。映射关系,请参见默认配置映射关系
        • 自定义配置:根据自身需求配置自定义参数投递至目标端。
          参数说明示例
          消息体(body)
          • 数据清洗:数据清洗任务提供基本的算子能力,底层逻辑使用函数计算,可实现分割,映射,富化及动态路由等繁杂数据加工能力。更多信息,请参见使用函数计算实现消息数据清洗
          • 数据提取:通过 JSONPath语法提取消息中的数据,将指定的消息内容路由到目标。配置规则,请参见消息转换
          数据清洗
          服务及函数名称默认创建名称为RocketMQ_RocketMQ_Transform_<TemplateName>_<timestamp>的服务和函数。当消息体(body)选择数据清洗且选择新建函数模板时,需要配置该参数。RocketMQ_RocketMQ_Transform_Customized_1688540172662
          函数模版选择函数模板。当消息体(body)选择数据清洗且选择新建函数模板时,需要配置该参数。
          • 内容分割:根据正则表达式对消息内容进行分割,将分割后的消息逐条发送至目标。
          • 内容映射:根据正则表达式对消息内容进行映射处理。例如,屏蔽消息中敏感字段或将消息大小缩减至最小标准。
          • 内容富化:根据富化源对消息内容进行富化。如果消息原始内容包含AccountID,处理时根据AccountID查询数据库,获得客户地域后填至源消息体中,并发送至目标服务。
          • 动态路由:根据正则表达式匹配消息内容,将匹配成功的消息路由至对应目标,将匹配不成功的消息路由至默认目标。
          内容分割
          服务选择进行数据加工的服务。test_service
          函数选择进行数据加工的函数。test_function
          版本和别名选择版本和别名。
          版本
          LATEST
          自定义属性(Properties)需要投递的事件的属性。配置规则,请参见消息转换模板
          // 变量
          {
            "userProperties":"$.data.userProperties",
            "msgId":"$.data.systemProperties.UNIQ_KEY"
          }
          // 模板
          {
            "EB_SYS_EMBED_OBJECT":"${userProperties}",
            "UNIQ_KEY":"${msgId}"
          }
          消息索引(Keys)需要投递的事件的过滤属性。配置规则,请参见消息转换部分事件
          $.data.systemProperties.KEYS
          消息标签(Tags)需要投递的事件的过滤属性。配置规则,请参见消息转换部分事件
          $.data.systemProperties.TAGS
      • 重试和死信:配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信
  4. 返回消息流入页面,找到创建好的任务,在其右侧操作列,单击启用
  5. 提示对话框,阅读提示信息,然后单击确认
    启用任务后,会有30秒~60秒的延迟时间,您可以在消息流入页面的状态栏查看启动进度。

其他操作

消息流入页面,找到目标任务,在其右侧操作列,执行其他操作。

消息流入
  • 查看任务详情:单击详情,在消息流入详情页面,查看任务的源端配置、目标端配置及资源信息。
  • 编辑任务配置:单击编辑,在消息流入编辑面板,修改资源配置规则配置
  • 启停任务:单击启用或者停用,然后在提示对话框,单击确认
  • 删除任务:单击删除,然后在提示对话框,单击确认