本文介绍如何使用数据清洗功能中的内容富化模板处理消息数据。
背景信息
数据清洗功能提供常见的消息处理模板,包括内容分割、动态路由、内容富化和内容映射等。您可以直接利用模板处理消息,也可以根据业务情况在模板基础上修改代码。
消息数据清洗任务提供基本的算子能力,底层逻辑使用函数计算。支持进行数据清洗的产品包含云消息队列 RocketMQ 版、云消息队列 Kafka 版、云消息队列 MQTT 版、云消息队列 RabbitMQ 版消息服务。数据清洗任务创建完成后,您可以登录函数计算控制台,进行代码自定义及相应函数配置的修改。
算子 | 算子能力说明 |
内容分割 | 根据正则表达式对消息内容进行分割,将分割后的消息逐条发送至目标。 |
动态路由 | 根据正则表达式匹配消息内容,将匹配成功的消息路由至对应目标,将匹配不成功的消息路由至默认目标。 |
内容富化 | 根据富化源对消息内容进行富化。如果消息原始内容包含AccountID,处理时根据AccountID查询数据库,获得客户地域后填至源消息体中,并发送至目标服务。 |
内容映射 | 根据正则表达式对消息内容进行映射处理。例如,屏蔽消息中敏感字段或将消息大小缩减至最小标准。 |
本文以云消息队列 RocketMQ 版为例介绍如何使用内容富化模板进行数据清洗。
使用示例
本文以一个IP地址段处理的场景富化为例。假设某服务的访问日志如下所示。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX"
}
需要统计IP地址的来源,并且映射关系存储于数据库MySQL。
CREATE TABLE `tb_ip` (
-> `IP` VARCHAR(256) NOT NULL,
-> `Region` VARCHAR(256) NOT NULL,
-> `ISP` VARCHAR(256) NOT NULL,
-> PRIMARY KEY (`IP`)
-> );
处理后的消息结果如下所示。
{
"accountID": "164901546557****",
"hostIP": "192.168.XX.XX",
"region": "beijing"
}
操作步骤
在左侧导航栏,选择 ,然后在顶部菜单栏,选择地域。
在消息流出(Sink)页面,单击创建任务。
在消息流出创建面板,设置以下配置项,然后单击确定。
关键配置项说明如下,其余保持默认值即可。
基础信息
配置项
说明
任务名称
填写任务名称。
流出类型
本文选择消息队列 RocketMQ。支持的消息服务包括消息队列 RocketMQ、消息队列 RabbitMQ、消息服务 MNS和消息队列 Kafka等。
资源配置
配置项
说明
源
地域
本文选择华东1(杭州)。
版本
选择RocketMQ的版本,本文选择RocketMQ 5.x。
RocketMQ 实例
选择生产消息的RocketMQ实例。
Topic
选择源实例的Topic。
Tag
选择用于过滤消息的Tag。
Group ID
本文选择快速创建,自动创建以GID_EVENTBRIDGE_xxx 命名的Group ID。
消费位点
本文选择最新位点。
目标
版本
选择接收消息的RocketMQ的版本,本文选择RocketMQ 5.x。
实例 ID
选择接收消息的RocketMQ的实例 ID。
Topic
选择目标实例的Topic。
数据处理
消息过滤:选择无需过滤。
消息转换:选择自定义配置。消息体(body)选择数据清洗,并选中新建函数模版。函数模版选择内容富化 transform_enrichment,然后根据业务情况修改函数代码。
创建完成后,您可以登录函数计算控制台查看自动创建的服务和函数。
- 本页导读 (1)