文档

动态路由

更新时间:

本文介绍如何使用数据清洗功能中的动态路由模板处理消息数据。

背景信息

数据清洗功能提供常见的消息处理模板,包括内容分割、动态路由、内容富化和内容映射等。您可以直接利用模板处理消息,也可以根据业务情况在模板基础上修改代码。

消息数据清洗任务提供基本的算子能力,底层逻辑使用函数计算。支持进行数据清洗的产品包含云消息队列 RocketMQ 版云消息队列 Kafka 版云消息队列 MQTT 版云消息队列 RabbitMQ 版消息服务。数据清洗任务创建完成后,您可以登录函数计算控制台,进行代码自定义及相应函数配置的修改。

算子

算子能力说明

内容分割

根据正则表达式对消息内容进行分割,将分割后的消息逐条发送至目标。

动态路由

根据正则表达式匹配消息内容,将匹配成功的消息路由至对应目标,将匹配不成功的消息路由至默认目标。

内容富化

根据富化源对消息内容进行富化。如果消息原始内容包含AccountID,处理时根据AccountID查询数据库,获得客户地域后填至源消息体中,并发送至目标服务。

内容映射

根据正则表达式对消息内容进行映射处理。例如,屏蔽消息中敏感字段或将消息大小缩减至最小标准。

本文以云消息队列 RocketMQ 版为例介绍如何使用动态路由模板进行数据清洗。

使用示例

例如,以下是一份牙膏信息清单。

message:
[BrandA, toothpaste, $12.98, 100g
 BrandB, toothpaste, $7.99, 80g
 BrandC, toothpaste, $1.99, 100g]

需要按照自定义动态规则,将列表路由至目标Topic。规则描述如下所示。

  • 如果消息以BrandA开头,发送至BrandA-item-topic和BrandA-discount-topic这两个topic。

  • 如果消息以BrandB开头,发送至BrandB-item-topic和BrandB-discount-topic这两个topic。

  • 其余消息发送至Unknown-brand-topic。

规则的JSON描述如下。

{
  "defaultTopic": "Unknown-brand-topic",
  "rules": [
    {
      "regex": "^BrandA",
      "targetTopics": [
        "BrandA-item-topic",
        "BrandA-discount-topic"
      ]
    },
    {
      "regex": "^BrandB",
      "targetTopics": [
        "BrandB-item-topic",
        "BrandB-discount-topic"
      ]
    }
  ]
}

dataclean_dynamicroute

操作步骤

  1. 登录云消息队列 RocketMQ 版控制台

  2. 在左侧导航栏,选择消息集成 > 消息流出,然后在顶部菜单栏,选择地域。

  3. 消息流出(Sink)页面,单击创建任务

  4. 消息流出创建面板,设置以下配置项,然后单击确定

    关键配置项说明如下,其余保持默认值即可。

    • 基础信息

      配置项

      说明

      任务名称

      填写任务名称。

      流出类型

      本文选择消息队列 RocketMQ。支持的消息服务包括消息队列 RocketMQ消息队列 RabbitMQ消息服务 MNS消息队列 Kafka等。

    • 资源配置

      配置项

      说明

      地域

      本文选择华东1(杭州)

      版本

      选择RocketMQ的版本,本文选择RocketMQ 5.x

      RocketMQ 实例

      选择生产消息的RocketMQ实例。

      Topic

      选择源实例的Topic。

      Tag

      选择用于过滤消息的Tag。

      Group ID

      本文选择快速创建,自动创建以GID_EVENTBRIDGE_xxx 命名的Group ID。

      消费位点

      本文选择最新位点

      目标

      版本

      选择接收消息的RocketMQ的版本,本文选择RocketMQ 5.x

      实例 ID

      选择接收消息的RocketMQ的实例 ID。

      Topic

      选择目标实例的Topic。

    • 数据处理

      • 消息过滤:选择无需过滤

      • 消息转换:选择自定义配置消息体(body)选择数据清洗,并选中新建函数模版函数模版选择动态路由 dynamic_routing,然后根据业务情况修改函数代码。

    创建完成后,您可以登录函数计算控制台查看自动创建的服务函数

  • 本页导读 (1)
文档反馈