Agent数据去重清洗

更新时间:
复制为 MD 格式

Agent 数据去重清洗 Pipeline 模板通过精确去重和近似去重两级清洗,去除 AI Agent 运行日志中的重复数据,零 AI 成本产出干净数据集。

适用场景与核心价值

AI Agent(如 Copilot、ChatBot、RAG 系统等)在线运行时会产生大量用户交互日志,其中普遍存在以下两类冗余数据:

  • 完全重复:同一用户多次提交相同问题(页面刷新、重试、网络抖动)。

  • 字面相似:不同用户的提问仅有标点、空格、换行等细微差异。

冗余数据浪费存储资源,干扰下游分析和模型训练。本模板通过精确去重近似去重两级清洗,为后续评估、标注、训练等任务提供干净的数据基础。

适用人群

角色

典型场景

数据工程师

日常数据维护、数据仓库入库前清洗

运维人员

Agent 日志治理、存储成本优化

算法工程师

训练数据预处理、去除噪声样本

核心价值

  • LLM调用,AI成本

  • 精确+近似两级去重,覆盖完全相同和字面高度相似的重复。

  • 秒级处理大规模日志(单批万级数据秒级完成)。

  • 支持跨批次全局去重(可选),避免历史数据重复入库。

Pipeline 流程

模板算子链路如下:

阶段

节点

类型

说明

数据准备

project

字段选取

从原始日志中选取inputoutputmodel等关键字段。

extend

字段扩展

使用正则表达式从input字段中提取用户提问原文,生成question字段。

where

筛选过滤

过滤questionNULL 或空字符串的无效记录。

去重清洗

dedup-exact

精确去重

基于SimHash 指纹完全匹配,去除内容完全相同的记录。

dedup-fuzzy

近似去重

基于SimHash 海明距离,去除字面高度相似的记录(默认阈值为3)。

完整配置

支持 JSON API 配置方式。JSON API 适用于通过 API 创建 Pipeline 任务。

JSON API配置

以下为完整的 Pipeline JSON 配置示例,需要将source中的projectlogstorequery替换为实际值。

{
  "name": "agent_data_cleaning",
  "description": "Agent 数据去重清洗:精确 + 近似两级去重,快速产出干净数据集",
  "source": {
    "type": "logstore",
    "logstore": {
      "project": "your-project",
      "logstore": "your-agent-logstore",
      "query": "serviceName:your-agent-service and *"
    }
  },
  "pipeline": {
    "nodes": [
      {
        "id": "select_fields",
        "type": "project",
        "parameters": {
          "input": "attributes.input.value",
          "output": "attributes.output.value",
          "model": "attributes.gen_ai.model_name",
          "trace_id": "traceId",
          "span_id": "spanId"
        }
      },
      {
        "id": "extract",
        "type": "extend",
        "parameters": {
          "question": "regexp_extract(input, '(?s)用户提问原文:\\s*(.*?)\\s*,\\s*\"files\"', 1)"
        }
      },
      {
        "id": "filter_empty",
        "type": "where",
        "parameters": {
          "filter": "question IS NOT NULL AND length(question) > 0"
        }
      },
      {
        "id": "exact_dedup",
        "type": "dedup-exact",
        "parameters": {
          "field": "question"
        }
      },
      {
        "id": "fuzzy_dedup",
        "type": "dedup-fuzzy",
        "parameters": {
          "field": "question",
          "threshold": "3"
        }
      }
    ]
  },
  "sink": {
    "type": "dataset",
    "dataset": {
      "workspace": "your-workspace",
      "dataset": "agent_data_cleaned"
    }
  },
  "executePolicy": {
    "mode": "scheduled",
    "scheduled": {
      "fromTime": 1735689600,
      "interval": "15m"
    }
  }
}

参数说明

数据源参数

参数

说明

示例值

source.logstore.project

日志服务Project 名称。

your-project

source.logstore.logstore

存储Agent 日志的Logstore 名称。

your-agent-logstore

source.logstore.query

日志查询条件,用于筛选目标Agent的日志数据。

serviceName:your-agent-service and *

Pipeline 节点参数

节点

参数

说明

默认值

project

parameters

字段映射,将原始日志字段映射为统一的字段名。Key为目标字段名,Value为源字段路径。

参见上方配置示例

extend

parameters

字段扩展表达式。支持regexp_extract等函数,从已有字段中提取新字段。

where

filter

过滤条件表达式。支持IS NOT NULLlength()SQL语法。

dedup-exact

field

去重依据字段。基于该字段的SimHash 指纹进行精确匹配去重。

question

dedup-fuzzy

field

去重依据字段。基于该字段的SimHash 海明距离进行近似匹配去重。

question

dedup-fuzzy

threshold

阈值。值越小越严格(1=极严格,5=较宽松)。两条记录的SimHash 指纹差异位数不超过该值时视为相似。

3

输出参数

参数

说明

示例值

sink.dataset.workspace

输出数据集所属的工作空间。

your-workspace

sink.dataset.dataset

输出数据集名称。去重后的干净数据将写入该数据集。

agent_data_cleaned

调度参数

参数

说明

示例值

executePolicy.mode

执行模式。scheduled表示定时调度执行。

scheduled

executePolicy.scheduled.interval

调度间隔。支持15m1h1d等格式。

15m

配置与使用

前提条件

  • 已开通日志服务,并创建了ProjectLogstore。关于Pipeline的基本概念,参见 Pipeline

  • Agent 日志已接入目标Logstore。

  • 已创建用于存储清洗结果的数据集(Dataset)。

操作步骤

  1. 根据实际的Agent 日志Schema,修改project 节点的字段映射。将parameters中的Key保持不变,将 Value 替换为实际日志中对应的字段路径。

  2. 修改extend 节点的正则表达式,使其匹配实际日志中用户提问的格式。

    说明

    如果日志中用户提问已经是独立字段(无需正则提取),可以将extend 节点替换为直接引用该字段,或删除此节点并修改后续节点的field参数。

  3. 根据需要调整where 节点的过滤条件。例如,添加按服务名或时间范围过滤的条件。

  4. 根据数据特征调整dedup-fuzzythreshold参数。

    说明

    threshold表示允许的SimHash 差异位数。值为1时极其严格(仅去除几乎完全相同的文本),值为5时较宽松(允许更多差异)。建议从默认值3开始,根据效果调整。

  5. source中的projectlogstorequery替换为实际值,将sink中的workspacedataset替换为目标数据集信息。

  6. 通过API提交Pipeline 配置。

运行结果

以下为典型的处理效果示例。

处理前(原始Agent 日志):

#

question

1

帮我查一下昨天的错误日志

2

帮我查一下昨天的错误日志

3

帮我查一下昨天的 错误日志

4

帮我看下昨天错误日志

5

统计最近1小时的PV

6

统计 最近1小时的PV

7

分析一下慢查询的原因

处理后(去重清洗):

#

question

去重说明

1

帮我查一下昨天的错误日志

保留(#2精确去重移除,#3、#4近似去重移除)

5

统计最近1小时的PV

保留(#6近似去重移除)

7

分析一下慢查询的原因

保留

7条原始记录经过两级去重后剩余3条,去重率57%。

实践原则

原则

说明

Schema 前置

使用project+extend算子完成字段选取和扩展处理,声明统一字段名。

精确在前

dedup-exact成本最低(O(N)),放在第一步可大幅削减后续算子的输入量。

阈值适中

dedup-fuzzythreshold=3表示允许3SimHash 差异,适合大多数文本场景。

空值过滤

正则提取可能返回NULL,where过滤可避免后续算子异常。

定制建议

定制点

说明

字段选取

修改projectparameters,适配实际日志 Schema。

过滤条件

修改wherefilter,如按服务名、时间范围等条件过滤。

去重阈值

调整dedup-fuzzythreshold(1=极严格,5=较宽松)。

跨批次全局去重

dedup-fuzzy添加"global": true"workspace": "...""dataset": "..."参数,启用跨批次去重。

追加语义去重

dedup-fuzzy后添加dedup-semantic节点,消除表述不同但含义相同的重复。

边界场景与行为

场景

行为

question提取为NULL

where过滤,不会进入去重流程。

threshold设置过小(如1)

可能误删仅有少量编辑差异的不同问题。

数据量极大(百万级)

建议启用全局去重模式,避免批次间重复。

需要保留去重前后的映射关系

当前模板不输出去重关联信息,如需可添加output参数。