端到端全流程模板覆盖 AI Agent 运行时数据治理的全部环节(清洗、去重、采样、评估、标注、合成、统计),展示全部算子的协同编排,可直接用于生产环境的周期性调度任务。
模板概述
端到端全流程模板覆盖 AI Agent 运行时数据治理的全部环节,从原始日志中产出高质量 Dataset。
本模板将原始 OpenTelemetry 日志经过 7 个阶段处理,最终产出经过去重、采样、评估、标注和合成的高质量数据集。整条 Pipeline 串联了全部 13 个算子节点,涵盖project、 extend、where、make-instance、dedup-exact、dedup-fuzzy、dedup-semantic、embedding、doc-stats、semantic-cluster、sample、llm-call 和 agentic-call 等算子类型。关于所有可用算子,请参见节点总览。
适用人群
数据平台团队:构建完整的数据治理流水线。
AI 基础设施工程师:实现周期性数据处理和任务调度。
产品/技术负责人:评估 AI Agent 业务和能力全景。
功能特点
端到端数据治理,从原始日志到高质量 Dataset。
三级去重(精确、近似、语义)+ 全局去重,确保数据无冗余。
LLM 三轮处理(评估 + 标注 + 合成),一次性完成所有 AI 增值。
支持直接部署为生产环境的周期性调度任务。
Pipeline 流程
Pipeline 分为 7 个阶段,依次执行:
阶段 | 算子 | 功能 | 说明 |
1 | 字段提取与过滤 | 从原始 OpenTelemetry 日志中提取 session_id、question、output 等关键字段,过滤指定 span 类型。 | |
2 | 会话聚合 | 按 session_id 和 traceId 聚合多条 Span,组装为一个完整的 Agent 会话实例。 | |
3 | 三级去重 | 依次执行精确去重(SimHash 指纹)、近似去重(海明距离)和语义去重(向量距离),支持跨批次全局去重。数据量大幅下降。 | |
4 | 多样性采样 | 语义聚类(复用 | |
5 | llm-call x 3 | AI 多轮处理 | 三次 LLM 调用:多维度质量评估、结构化分类标注、多类型数据合成。 |
6 | 文档统计 | 计算 question 字段的字符数、词数、行数等基础统计指标。 | |
7 | 输出 | 写入 Dataset | 将处理结果写入目标 Dataset,完成端到端数据治理。 |
Pipeline 遵循先减后增原则:先通过去重和采样大幅减少数据量(行数递减),再通过 LLM 调用丰富数据维度(列数递增)。LLM 调用成本较高,务必在数据量降下来之后执行。
完整配置
Pipeline 支持 JSON API 配置格式。关于 Pipeline 的基本概念和创建方式,请参见Pipeline 概述。
以下为完整的 JSON API 配置。将 your-project、your-agent-logstore、your-workspace、your-dataset 等占位符替换为实际值。
{
"name": "full_pipeline",
"description": "端到端全流程:清洗→采样→评估→标注→合成,一站式 Agent 数据治理",
"source": {
"type": "logstore",
"logstore": {
"project": "your-project",
"logstore": "your-agent-logstore",
"query": "serviceName:your-agent-service and *"
}
},
"pipeline": {
"nodes": [
{
"id": "select_fields",
"type": "project",
"parameters": {
"input": "attributes.input.value",
"output": "attributes.output.value",
"model": "attributes.gen_ai.model_name",
"trace_id": "traceId",
"span_id": "spanId"
}
},
{
"id": "extract",
"type": "extend",
"parameters": {
"session_id": "json_extract_scalar(attributes, '$.gen_ai.session.id')",
"span_kind": "json_extract_scalar(attributes, '$.gen_ai.span.kind')",
"question": "json_extract_scalar(attributes, '$.input.value')",
"output": "json_extract_scalar(attributes, '$.output.value')",
"model": "json_extract_scalar(attributes, '$.gen_ai.request.model')",
"tool_name": "json_extract_scalar(attributes, '$.gen_ai.tool.name')",
"input_tokens": "json_extract_scalar(attributes, '$.gen_ai.usage.input_tokens')",
"output_tokens": "json_extract_scalar(attributes, '$.gen_ai.usage.output_tokens')"
}
},
{
"id": "filter_events",
"type": "where",
"parameters": {
"filter": "span_kind IN ('AGENT','LLM','TOOL')"
}
},
{
"id": "assemble",
"type": "make-instance",
"parameters": {
"question": "first(question)",
"output": "last(output)",
"model": "any(model)",
"total_tokens": "sum(input_tokens)",
"tools": "array_distinct(tool_name)",
"tool_chain": "join(tool_name, ' → ')",
"by": "session_id,traceId"
}
},
{
"id": "filter_empty",
"type": "where",
"parameters": {
"filter": "question IS NOT NULL AND length(question) > 0"
}
},
{
"id": "exact_dedup",
"type": "dedup-exact",
"parameters": {
"field": "question"
}
},
{
"id": "fuzzy_dedup",
"type": "dedup-fuzzy",
"parameters": {
"field": "question",
"threshold": "3",
"global": true,
"workspace": "your-workspace",
"dataset": "your-dataset"
}
},
{
"id": "semantic_dedup",
"type": "dedup-semantic",
"parameters": {
"field": "question",
"threshold": "0.1",
"global": true,
"workspace": "your-workspace",
"dataset": "your-dataset"
}
},
{
"id": "cluster",
"type": "semantic-cluster",
"parameters": {
"field": "__dedup_emb",
"n": 100
}
},
{
"id": "sample_per_cluster",
"type": "sample",
"parameters": {
"n": 3,
"by": "__cluster_id"
}
},
{
"id": "evaluate",
"type": "llm-call",
"parameters": {
"prompt": "@eval/agent-quality.md",
"fields": "question,input,output",
"format": "json",
"as": "eval"
}
},
{
"id": "annotate",
"type": "llm-call",
"parameters": {
"prompt": "@anno/agent-label.md",
"fields": "question,output",
"format": "json",
"as": "anno"
}
},
{
"id": "synthesize",
"type": "llm-call",
"parameters": {
"prompt": "@synthetic/data-augment.md",
"fields": "question,input,output",
"format": "json",
"as": "synthetic"
}
},
{
"id": "stats",
"type": "doc-stats",
"parameters": {
"field": "question"
}
}
]
},
"sink": {
"type": "dataset",
"dataset": {
"workspace": "your-workspace",
"dataset": "agent_full_dataset"
}
},
"executePolicy": {
"mode": "scheduled",
"scheduled": {
"fromTime": 1735689600,
"interval": "15m"
}
}
}参数说明
各节点的参数配置说明如下。
project (字段选取)
参数 | 说明 | 示例值 |
| Agent 输入内容字段路径。 |
|
| Agent 输出内容字段路径。 |
|
| 模型名称字段路径。 |
|
| Trace ID 字段,用于关联追踪链路。 |
|
| Span ID 字段,用于关联追踪链路。 |
|
关于 project 算子的完整参数说明,请参见project。
extend(字段提取)
使用 json_extract_scalar 函数从 OpenTelemetry 的 attributes JSON 中提取关键字段。
提取字段 | 来源路径 | 说明 |
|
| 会话 ID,用于聚合同一会话的多条 Span。 |
|
| Span 类型(AGENT、LLM、TOOL),用于后续过滤。 |
|
| 用户提问内容。 |
|
| Agent 回答内容。 |
|
| 请求使用的模型名称。 |
|
| 工具调用名称。 |
|
| Token 用量统计。 |
关于 extend 算子的完整参数说明,请参见extend。
where(条件过滤)
包含两个 where 节点:
filter_events:过滤
span_kind IN ('AGENT','LLM','TOOL'),仅保留 Agent、LLM 和 Tool 类型的 Span。filter_empty:过滤
question IS NOT NULL AND length(question) > 0,排除空问题。
关于 where 算子的完整参数说明,请参见where。
make-instance(会话聚合)
按 session_id 和 traceId 将同一会话的多条 Span 聚合为一条记录。
输出字段 | 聚合函数 | 说明 |
|
| 取会话中第一条用户提问。 |
|
| 取会话中最后一条 Agent 回答。 |
|
| 任取一条模型名称。 |
|
| 累加所有 Span 的 Token 用量。 |
|
| 去重后的工具列表。 |
|
| 工具调用链路,按执行顺序拼接。 |
关于 make-instance 算子的完整参数说明,请参见make-instance。
三级去重
按由粗到细的顺序依次执行三级去重,计算代价递增但前置步骤已大幅削减数据量。
算子 | 去重方式 | 关键参数 | 说明 |
精确去重 |
| 基于 SimHash 指纹匹配,去除完全相同的记录。 | |
近似去重 |
| 基于海明距离匹配,去除高度相似的记录。启用全局模式,支持跨批次去重。 | |
语义去重 |
| 基于向量距离匹配,去除语义相近的记录。启用全局模式,支持跨批次去重。 |
全局去重(global=true)需要指定 workspace 和 dataset 参数。首次运行时无历史数据对比,全局去重效果从第二次调度开始显现。
语义聚类与采样
semantic-cluster:将数据按语义相似度聚为 100 个簇。
field=__dedup_emb直接复用 dedup-semantic 阶段生成的向量,无需重新计算 Embedding。sample:从每个簇中采样 3 条记录(
n=3 by __cluster_id),确保最终数据集的多样性和代表性。
最终数据量 = 簇数 x 每簇采样量。默认为 100 x 3 = 300 条。
关于语义聚类和采样算子的完整参数说明,请参见semantic-cluster和sample。
LLM 三轮处理
采样后数据经过三次 llm-call 算子处理,每次调用通过 Prompt 模板指定不同的处理任务。
调用 | 节点 ID | Prompt 模板 | 输出别名 | 功能 |
第 1 次 |
|
|
| 多维度质量评估(需求理解、回答质量、格式规范等),输出 JSON 格式的评分和理由。 |
第 2 次 |
|
|
| 结构化分类标注(意图、复杂度、场景等),输出 JSON 格式的多维分类和标签。 |
第 3 次 |
|
|
| 多类型数据合成(改写、噪声、追问、对抗),输出 JSON 格式的合成数据。 |
doc-stats(文档统计)
对 question 字段计算字符数、词数、行数等基础统计指标,结果写入 __doc_stats 列。
关于 doc-stats 算子的完整参数说明,请参见doc-stats。
运行结果
以 10,000 条原始日志为例,各阶段数据量变化如下:
步骤 | 算子 | 数据量 | 列数变化 | 说明 |
1 | extend + where | 10,000 | 8 列 | 字段提取 + 过滤指定 Span 类型。 |
2 | make-instance + where | 3,000 | 6 列 | 会话聚合 + 空值过滤。 |
3 | dedup-exact | 2,000 | +3 扩展列 | 精确去重。 |
4 | dedup-fuzzy(global) | 1,200 | 同上 | 近似去重 + 全局去重。 |
5 | dedup-semantic(global) | 800 | +2 扩展列 | 语义去重 + 全局去重。 |
6 | semantic-cluster | 800 | +1 扩展列 | 聚 100 簇。 |
7 | sample | 300 | 同上 | 每簇 3 条。 |
8 | llm-call(eval) | 300 | +1 列 eval | 质量评分。 |
9 | llm-call(anno) | 300 | +1 列 anno | 分类标注。 |
10 | llm-call(synth) | 300 | +1 列 synthetic | 数据合成。 |
11 | doc-stats | 300 | +1 列 __doc_stats | 文本统计。 |
10,000 条原始日志经全流程处理后,最终产出 300 条多维度标注的 Dataset。
输出 Dataset 的列结构
列名 | 来源算子 | 说明 |
| project + extend + make-instance | 原始业务字段和会话聚合字段。 |
| dedup-exact / dedup-fuzzy | 去重特征列。 |
| dedup-semantic | 语义去重向量和记录标识。 |
| semantic-cluster | 语义聚类簇 ID。 |
| llm-call #1 | JSON 格式的多维度质量评分。 |
| llm-call #2 | JSON 格式的结构化分类标注。 |
| llm-call #3 | JSON 格式的合成数据。 |
| doc-stats | JSON 格式的文本统计指标。 |
定制建议
根据实际业务场景调整以下参数:
定制点 | 操作 |
字段选取 | 修改 |
去重阈值 | 调整 |
全局去重目标 | 修改 |
采样规模 | 调整 |
LLM 处理 | 修改 Prompt 模板调整评估维度、标注维度和合成类型。可为 |
精简 Pipeline | 不需要某个功能时,直接删除对应节点即可。例如:不需要数据合成,删除 |
调度策略 | 修改 |
实践原则
原则 | 说明 |
Schema 前置 |
|
先减后增 | 先去重 + 采样(行数递减),再 AI 处理(列数递增)。LLM 调用成本高,务必在数据量降下来之后执行。 |
由粗到细 | 去重顺序:精确 -> 近似 -> 语义,计算代价递增但前置步骤已大幅削减数据量。 |
扩展列复用 |
|
全局去重 |
|
算子原子性 | 每个算子职责单一,通过管道组合实现复杂逻辑。 |
注意事项
使用本模板前,注意以下成本和性能相关事项:
LLM 调用成本:300 条 x 3 轮 = 900 次 LLM 调用。根据模型定价评估成本。
Pipeline 执行时间:LLM 调用为主要耗时,300 条数据约 10-30 分钟。
全局去重首次运行:首批数据无历史数据对比,全局去重效果从第二次调度开始显现。
扩展列体积:
__dedup_emb(向量)体积较大,如不需要可在输出 Dataset 中排除。调度间隔:
interval=15m表示每 15 分钟执行一次,按数据量和成本预算调整。