消息数据清洗功能提供常见的消息处理模板,如消息分割、动态路由和消息富化等。您可以直接利用模板处理消息,也可以根据业务情况在模板基础上修改代码。本文介绍消息队列RocketMQ版消息数据清洗模板的类型和使用方式。

背景信息

消息数据清洗任务提供基本的算子能力,底层逻辑使用函数计算。消息队列RocketMQ版消息数据清洗任务创建完成后,您可以登录函数计算控制台,进行代码自定义及相应函数配置的修改。

算子 算子能力说明
消息过滤 按照正则表达式匹配消息内容,将匹配成功的消息发送至目标。更多信息,请参见事件模式
消息转换 根据字符串匹配,进行消息内容替换,例如字符大小写转换。将转换后的消息发送至目标。更多信息,请参见事件内容转换
内容分割 根据正则表达式对消息内容进行分割,将分割后的消息逐条发送至目标。
动态路由 根据正则表达式匹配消息内容,将匹配成功的消息路由至对应目标,将匹配不成功的消息路由至默认目标。
内容富化 根据富化源对消息内容进行富化。如果消息原始内容包含AccountID,处理时根据AccountID查询数据库,获得客户地域后填至源消息体中,并发送至目标服务。
内容映射 根据正则表达式对消息内容进行映射处理。例如,屏蔽消息中敏感字段或将消息大小缩减至最小标准。

内容分割

使用示例

例如,以下是一份学生名单。
message:
[张三,男,17,4班;李四,女,17,3班;王五,男,17,4班]
需要将消息拆分为单个学生的信息,然后分三条消息推送至各目标服务。如下所示。
message:
    [张三,男,17,4班]
message:
    [李四,女,17,3班]
message:
    [王五,男,17,4班]
dataclean_split

操作步骤

  1. 登录消息队列RocketMQ版控制台
  2. 在左侧导航栏,选择消息集成 > 消息流出,然后在顶部菜单栏,选择地域。
  3. 消息流出页面,单击创建任务
  4. 消息流出创建面板,设置以下配置项,然后单击确定
    关键配置项说明如下,其余保持默认值即可。
    • 任务名称:设置任务名称。
    • 流出类型:选择函数计算
    • 源 消息队列RocketMQ区域:选择地域及该地域下已创建的消息队列RocketMQ版相关资源。
    • 目标 函数计算区域:选择相同地域下函数计算服务函数
    • 消息转换区域:事件选择新建函数模板函数模板选择内容分割 transform_split,然后根据业务情况修改函数代码。
      说明 无需设置服务及函数名称,默认创建名称为RocketMQ_FC_Transform_<TemplateName>_<timestamp>服务函数,便于识别。

    您可以编写您自己的代码逻辑,将消息内容分割后发送至不同的目标服务。

    创建完成后,您可以登录函数计算控制台查看自动创建的服务函数。如您未设置服务及函数名称,将分别创建名称以transform_split开头的服务和函数。

动态路由

使用示例

例如,以下是一份牙膏信息清单。
message:
[BrandA, toothpaste, $12.98, 100g
 BrandB, toothpaste, $7.99, 80g
 BrandC, toothpaste, $1.99, 100g]
需要按照自定义动态规则,将列表路由至目标Topic。规则描述如下所示。
  • 如果消息以BrandA开头,发送至BrandA-item-topic和BrandA-discount-topic这两个topic。
  • 如果消息以BrandB开头,发送至BrandB-item-topic和BrandB-discount-topic这两个topic。
  • 其余消息发送至Unknown-brand-topic。
规则的JSON描述如下。
{
    "defaultTopic": "Unknown-brand-topic",
    "rules": [
          {
                "regex": "^BrandA",
                "targetTopics": [
        "BrandA-item-topic",
        "BrandA-discount-topic"
      ]
    },
    {
                "regex": "^BrandB",
                "targetTopics": [
        "BrandB-item-topic",
        "BrandB-discount-topic"
      ]
    }
  ]
}
dataclean_dynamicroute

操作步骤

具体操作,请参见消息分割操作步骤。其中,函数模板需选择为动态路由 dynamic_routing

内容富化

使用示例

本文以一个IP地址段处理的场景富化为例。假设某服务的访问日志如下所示。
{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX"
}
需要统计IP地址的来源,并且映射关系存储于数据库MySQL。
CREATE TABLE `tb_ip` (
    ->      `IP` VARCHAR(256) NOT NULL,
    ->     `Region` VARCHAR(256) NOT NULL,
    ->      `ISP` VARCHAR(256) NOT NULL,
    ->      PRIMARY KEY (`IP`)
    -> );
处理后的消息结果如下所示。
{
  "accountID": "164901546557****",
  "hostIP": "192.168.XX.XX",
  "region": "beijing"
}
dataclean_enrich

操作步骤

具体操作,请参见消息分割操作步骤。其中,函数模板需选择为内容富化 transform_enrichment

内容映射

使用示例

例如,以下是某公司员工登记信息,涉及了员工工号、电话号码等隐私内容。
张三,工号1,131 1111 1111
李四,工号2,132 2222 2222
王五,工号3,133 3333 3333
需要将以上消息中员工隐私信息进行屏蔽,然后推送至目标服务。如下所示。
张*,工号*,*** **** ****
李*,工号*,*** **** ****
王*,工号*,*** **** ****
dataclean_projection

操作步骤

具体操作,请参见消息分割操作步骤。其中,函数模板需选择为内容映射 transform_projection