文档

云消息队列 MQTT 版

更新时间:
一键部署

本文介绍如何在云消息队列 RocketMQ 版控制台创建消息流入任务,以实现将云消息队列 MQTT 版的数据流入至云消息队列 RocketMQ 版

前提条件

  • 您已购买云消息队列 RocketMQ 版实例,且实例处于服务中状态。具体步骤,请参见创建资源

  • 您已创建云消息队列 MQTT 版实例,且实例处于服务中状态。具体步骤,请参见创建资源

开服地域

在消息集成中创建消息流入任务时,所有地域都支持将云消息队列 MQTT 版作为消息流入任务源端。

创建消息流入任务

  1. 登录云消息队列 RocketMQ 版控制台,在左侧导航栏选择消息集成 > 消息流入
  2. 在顶部菜单栏选择地域,如华东1(杭州),然后在消息流入页面中,单击创建任务
  3. 消息流入创建面板,设置任务名称,选择流入类型微消息队列 MQTT版,配置以下信息,然后单击确定

    1. 资源配置区域,设置以下参数。

      表 1. 源端微消息队列MQTT版参数配置

      参数

      说明

      示例

      地域

      默认选择创建消息流入任务时选择的地域且不可更改。

      华东1(杭州)

      MQTT实例

      选择提前创建好的云消息队列 MQTT 版实例。

      testinstance

      MQTT Topic

      选择云消息队列 MQTT 版实例中的Topic。

      testtopic

      表 2. 目标端云消息队列 RocketMQ 版参数配置

      参数

      说明

      示例

      地域

      接收云消息队列 RocketMQ 版消息的目标实例所在的地域。

      华东1(杭州)

      版本

      选择目标端云消息队列 RocketMQ 版实例的版本。

      RocketMQ 4.x

      实例ID

      选择接收云消息队列 RocketMQ 版消息的目标实例。

      MQ_INST_115964845466****_BYEiGXc4

      Topic

      选择目标实例的Topic。

      test

    2. 数据处理区域,设置消息的过滤、转换、重试和死信。

      • 消息过滤

        • 无需过滤:默认投递全部事件。

        • 自定义配置:使用事件模式匹配特定事件。详细规则,请参见消息转换

      • 消息转换

        • 默认配置:默认将消息内容或相关属性映射到目标端。映射关系,请参见默认配置映射关系

        • 自定义配置:根据自身需求配置自定义参数投递至目标端。

          参数

          说明

          示例

          消息体(body)

          • 数据清洗:数据清洗任务提供基本的算子能力,底层逻辑使用函数计算,可实现分割,映射,富化及动态路由等繁杂数据加工能力。更多信息,请参见使用函数计算实现消息数据清洗

          • 数据提取:通过 JSONPath语法提取消息中的数据,将指定的消息内容路由到目标。配置规则,请参见消息转换

          数据清洗

          服务及函数名称

          默认创建名称为<源端服务名称>_<目标端服务名称>_Transform_<TemplateName>_<timestamp>的服务和函数。当消息体(body)选择数据清洗且选择新建函数模板时,需要配置该参数。

          RocketMQ_RocketMQ_Transform_Customized_1688540172662

          函数模版

          选择函数模板。当消息体(body)选择数据清洗且选择新建函数模板时,需要配置该参数。

          • 内容分割:根据正则表达式对消息内容进行分割,将分割后的消息逐条发送至目标。

          • 内容映射:根据正则表达式对消息内容进行映射处理。例如,屏蔽消息中敏感字段或将消息大小缩减至最小标准。

          • 内容富化:根据富化源对消息内容进行富化。如果消息原始内容包含AccountID,处理时根据AccountID查询数据库,获得客户地域后填至源消息体中,并发送至目标服务。

          • 动态路由:根据正则表达式匹配消息内容,将匹配成功的消息路由至对应目标,将匹配不成功的消息路由至默认目标。

          内容分割

          服务

          选择进行数据加工的服务。

          test_service

          函数

          选择进行数据加工的函数。

          test_function

          版本和别名

          选择版本和别名。

          版本
          LATEST

          自定义属性(Properties)

          需要投递的事件的属性。配置规则,请参见消息转换

          模板

          // 变量
          {
            "userProperties":"$.data.userProperties",
            "msgId":"$.data.systemProperties.UNIQ_KEY"
          }
          // 模板
          {
            "EB_SYS_EMBED_OBJECT":"${userProperties}",
            "UNIQ_KEY":"${msgId}"
          }

          消息索引(Keys)

          需要投递的事件的过滤属性。配置规则,请参见消息转换

          部分事件

          $.data.systemProperties.KEYS

          消息标签(Tags)

          需要投递的事件的过滤属性。配置规则,请参见消息转换

          部分事件

          $.data.systemProperties.TAGS
      • 重试和死信:配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信

  4. 返回消息流入页面,找到创建好的任务,在其右侧操作列,单击启用

  5. 提示对话框,阅读提示信息,然后单击确认

    启用任务后,会有30秒~60秒的延迟时间,您可以在消息流入页面的状态栏查看启动进度。

其他操作

消息流入页面,找到目标任务,在其右侧操作列,执行其他操作。

  • 查看任务详情:单击详情,在消息流出详情页面,查看任务的源端配置、目标端配置及资源信息。
  • 编辑任务配置:单击编辑,在消息流出编辑面板,修改资源配置规则配置
  • 启停任务:单击启用或者停用,然后在提示对话框,单击确认
  • 删除任务:单击删除,然后在提示对话框,单击确认

  • 本页导读 (1)
文档反馈