Agent 数据去重清洗 Pipeline 模板通过精确去重和近似去重两级清洗,去除 AI Agent 运行日志中的重复数据,零 AI 成本产出干净数据集。
适用场景与核心价值
AI Agent(如 Copilot、ChatBot、RAG 系统等)在线运行时会产生大量用户交互日志,其中普遍存在以下两类冗余数据:
完全重复:同一用户多次提交相同问题(页面刷新、重试、网络抖动)。
字面相似:不同用户的提问仅有标点、空格、换行等细微差异。
冗余数据浪费存储资源,干扰下游分析和模型训练。本模板通过精确去重和近似去重两级清洗,为后续评估、标注、训练等任务提供干净的数据基础。
适用人群
角色 | 典型场景 |
数据工程师 | 日常数据维护、数据仓库入库前清洗 |
运维人员 | Agent 日志治理、存储成本优化 |
算法工程师 | 训练数据预处理、去除噪声样本 |
核心价值
无LLM调用,零AI成本。
精确+近似两级去重,覆盖完全相同和字面高度相似的重复。
秒级处理大规模日志(单批万级数据秒级完成)。
支持跨批次全局去重(可选),避免历史数据重复入库。
Pipeline 流程
模板算子链路如下:
阶段 | 节点 | 类型 | 说明 |
数据准备 | 字段选取 | 从原始日志中选取 | |
字段扩展 | 使用正则表达式从 | ||
筛选过滤 | 过滤 | ||
去重清洗 | 精确去重 | 基于SimHash 指纹完全匹配,去除内容完全相同的记录。 | |
近似去重 | 基于SimHash 海明距离,去除字面高度相似的记录(默认阈值为3)。 |
完整配置
支持 JSON API 配置方式。JSON API 适用于通过 API 创建 Pipeline 任务。
JSON API配置
以下为完整的 Pipeline JSON 配置示例,需要将source中的project、logstore和query替换为实际值。
{
"name": "agent_data_cleaning",
"description": "Agent 数据去重清洗:精确 + 近似两级去重,快速产出干净数据集",
"source": {
"type": "logstore",
"logstore": {
"project": "your-project",
"logstore": "your-agent-logstore",
"query": "serviceName:your-agent-service and *"
}
},
"pipeline": {
"nodes": [
{
"id": "select_fields",
"type": "project",
"parameters": {
"input": "attributes.input.value",
"output": "attributes.output.value",
"model": "attributes.gen_ai.model_name",
"trace_id": "traceId",
"span_id": "spanId"
}
},
{
"id": "extract",
"type": "extend",
"parameters": {
"question": "regexp_extract(input, '(?s)用户提问原文:\\s*(.*?)\\s*,\\s*\"files\"', 1)"
}
},
{
"id": "filter_empty",
"type": "where",
"parameters": {
"filter": "question IS NOT NULL AND length(question) > 0"
}
},
{
"id": "exact_dedup",
"type": "dedup-exact",
"parameters": {
"field": "question"
}
},
{
"id": "fuzzy_dedup",
"type": "dedup-fuzzy",
"parameters": {
"field": "question",
"threshold": "3"
}
}
]
},
"sink": {
"type": "dataset",
"dataset": {
"workspace": "your-workspace",
"dataset": "agent_data_cleaned"
}
},
"executePolicy": {
"mode": "scheduled",
"scheduled": {
"fromTime": 1735689600,
"interval": "15m"
}
}
}参数说明
数据源参数
参数 | 说明 | 示例值 |
| 日志服务Project 名称。 |
|
| 存储Agent 日志的Logstore 名称。 |
|
| 日志查询条件,用于筛选目标Agent的日志数据。 |
|
Pipeline 节点参数
节点 | 参数 | 说明 | 默认值 |
|
| 字段映射,将原始日志字段映射为统一的字段名。Key为目标字段名,Value为源字段路径。 | 参见上方配置示例 |
|
| 字段扩展表达式。支持 | 无 |
|
| 过滤条件表达式。支持 | 无 |
|
| 去重依据字段。基于该字段的SimHash 指纹进行精确匹配去重。 |
|
|
| 去重依据字段。基于该字段的SimHash 海明距离进行近似匹配去重。 |
|
|
| 阈值。值越小越严格(1=极严格,5=较宽松)。两条记录的SimHash 指纹差异位数不超过该值时视为相似。 |
|
输出参数
参数 | 说明 | 示例值 |
| 输出数据集所属的工作空间。 |
|
| 输出数据集名称。去重后的干净数据将写入该数据集。 |
|
调度参数
参数 | 说明 | 示例值 |
| 执行模式。 |
|
| 调度间隔。支持 |
|
配置与使用
前提条件
已开通日志服务,并创建了Project和Logstore。关于Pipeline的基本概念,参见 Pipeline。
Agent 日志已接入目标Logstore。
已创建用于存储清洗结果的数据集(Dataset)。
操作步骤
根据实际的Agent 日志Schema,修改
project节点的字段映射。将parameters中的Key保持不变,将 Value 替换为实际日志中对应的字段路径。修改
extend节点的正则表达式,使其匹配实际日志中用户提问的格式。说明如果日志中用户提问已经是独立字段(无需正则提取),可以将
extend节点替换为直接引用该字段,或删除此节点并修改后续节点的field参数。根据需要调整
where节点的过滤条件。例如,添加按服务名或时间范围过滤的条件。根据数据特征调整
dedup-fuzzy的threshold参数。说明threshold表示允许的SimHash 差异位数。值为1时极其严格(仅去除几乎完全相同的文本),值为5时较宽松(允许更多差异)。建议从默认值3开始,根据效果调整。将
source中的project、logstore和query替换为实际值,将sink中的workspace和dataset替换为目标数据集信息。通过API提交Pipeline 配置。
运行结果
以下为典型的处理效果示例。
处理前(原始Agent 日志):
# | question |
1 | 帮我查一下昨天的错误日志 |
2 | 帮我查一下昨天的错误日志 |
3 | 帮我查一下昨天的 错误日志 |
4 | 帮我看下昨天错误日志 |
5 | 统计最近1小时的PV |
6 | 统计 最近1小时的PV |
7 | 分析一下慢查询的原因 |
处理后(去重清洗):
# | question | 去重说明 |
1 | 帮我查一下昨天的错误日志 | 保留(#2精确去重移除,#3、#4近似去重移除) |
5 | 统计最近1小时的PV | 保留(#6近似去重移除) |
7 | 分析一下慢查询的原因 | 保留 |
7条原始记录经过两级去重后剩余3条,去重率57%。
实践原则
原则 | 说明 |
Schema 前置 | 使用 |
精确在前 |
|
阈值适中 |
|
空值过滤 | 正则提取可能返回NULL, |
定制建议
定制点 | 说明 |
字段选取 | 修改 |
过滤条件 | 修改 |
去重阈值 | 调整 |
跨批次全局去重 | 给 |
追加语义去重 | 在 |
边界场景与行为
场景 | 行为 |
| 被 |
| 可能误删仅有少量编辑差异的不同问题。 |
数据量极大(百万级) | 建议启用全局去重模式,避免批次间重复。 |
需要保留去重前后的映射关系 | 当前模板不输出去重关联信息,如需可添加 |