文档

动态路由

更新时间:
一键部署

本文介绍如何使用数据清洗功能中的动态路由模板处理消息数据。

背景信息

数据清洗功能提供常见的消息处理模板,包括内容分割、动态路由、内容富化和内容映射等。您可以直接利用模板处理消息,也可以根据业务情况在模板基础上修改代码。

消息数据清洗任务提供基本的算子能力,底层逻辑使用函数计算。支持进行数据清洗的产品包含云消息队列 RocketMQ 版云消息队列 Kafka 版云消息队列 MQTT 版云消息队列 RabbitMQ 版消息服务。数据清洗任务创建完成后,您可以登录函数计算控制台,进行代码自定义及相应函数配置的修改。

算子

算子能力说明

内容分割

根据正则表达式对消息内容进行分割,将分割后的消息逐条发送至目标。

动态路由

根据正则表达式匹配消息内容,将匹配成功的消息路由至对应目标,将匹配不成功的消息路由至默认目标。

内容富化

根据富化源对消息内容进行富化。如果消息原始内容包含AccountID,处理时根据AccountID查询数据库,获得客户地域后填至源消息体中,并发送至目标服务。

内容映射

根据正则表达式对消息内容进行映射处理。例如,屏蔽消息中敏感字段或将消息大小缩减至最小标准。

本文以云消息队列 MQTT 版为例介绍如何使用动态路由模板进行数据清洗。

使用示例

例如,以下是一份牙膏信息清单。

message:
[BrandA, toothpaste, $12.98, 100g
 BrandB, toothpaste, $7.99, 80g
 BrandC, toothpaste, $1.99, 100g]

需要按照自定义动态规则,将列表路由至目标Topic。规则描述如下所示。

  • 如果消息以BrandA开头,发送至BrandA-item-topic和BrandA-discount-topic这两个topic。

  • 如果消息以BrandB开头,发送至BrandB-item-topic和BrandB-discount-topic这两个topic。

  • 其余消息发送至Unknown-brand-topic。

规则的JSON描述如下。

{
  "defaultTopic": "Unknown-brand-topic",
  "rules": [
    {
      "regex": "^BrandA",
      "targetTopics": [
        "BrandA-item-topic",
        "BrandA-discount-topic"
      ]
    },
    {
      "regex": "^BrandB",
      "targetTopics": [
        "BrandB-item-topic",
        "BrandB-discount-topic"
      ]
    }
  ]
}

dataclean_dynamicroute

操作步骤

  1. 登录云消息队列 MQTT 版控制台,并在左侧导航栏选择消息集成 > 任务列表

  2. 在顶部菜单栏选择地域,如华东1(杭州),然后在任务列表页面中,单击创建任务

  3. 创建任务面板,设置任务名称描述,配置以下参数。

    • 任务创建

      1. Source(源)配置向导,选择数据提供方消息队列 MQTT 版,设置以下参数,然后单击下一步

        参数

        说明

        示例

        地域

        默认选择创建任务时所在的地域。

        华东1(杭州)

        MQTT 实例

        选择MQTT实例。

        post-cn-jajh8i****

        MQTT Topic

        选择MQTT实例中的Topic。

        test-topic

        数据格式

        数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力。支持多种数据格式编码,如无特殊编码诉求可将格式设置为Json。

        • Json(默认Json格式编码,二进制数据按照utf-8 编码为Json格式放入Payload。)

        • Text(文本格式编码,二进制数据按照utf-8编码为字符串放入Payload。)

        • Binary(二进制格式编码,二进制数据按照Base64编码为字符串放入Payload。)

        Json

        批量推送

        批量推送可帮您批量聚合多个事件,当批量推送条数批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。

        例如:您设置的推送条数为100 条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等15 s后再推送。

        开启

        批量推送条数

        调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

        100

        批量推送间隔(单位:秒)

        调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

        3

      2. Filtering(过滤)配置向导,不需要配置,可以直接跳过。

      3. Transform(转换)配置向导,选择阿里云服务下拉框中选择函数计算acs.fc.function,并选中新建函数模版函数模版选择动态路由 dynamic_routing,然后根据业务情况修改函数代码。

      4. Sink(目标)配置向导,选择相应的服务类型完成配置。

创建完成后,您可以登录函数计算控制台查看自动创建的服务函数

  • 本页导读