Pipeline用户使用指南

更新时间:
复制为 MD 格式

AgentLoop Pipeline 是数据处理流水线引擎,用于将 AI Agent 运行时日志自动加工为高质量 Dataset。Pipeline 提供 13 个处理节点,覆盖字段选取、三级去重、语义聚类采样、AI 评估/标注/合成等全链路数据处理能力。

Pipeline 是什么

Pipeline 是 AgentLoop 的数据处理流水线引擎。它从 Logstore 中读取原始数据,经过编排的多级处理节点(字段选取、去重清洗、聚类采样、AI 评估/标注/合成等),自动产出高质量 Dataset。

能力

说明

灵活编排

13 个处理节点自由组合,构建自定义数据处理管线

三级去重

精确、近似、语义逐级去重,从字面到语义全方位消除冗余

多样性采样

语义聚类 + 分组采样,在降量的同时保证数据多样性

AI 增值

内置 LLM 调用和智能体调用,一站式完成评估、标注、合成

全局去重

跨批次增量去重,确保 Dataset 中无重复数据

定时调度

周期性自动执行,持续产出高质量数据

为什么需要 Pipeline

AI Agent 在运行过程中会产生大量原始日志——用户提问、模型回答、工具调用、Token 消耗等。这些数据散落在不同字段、存在大量重复、缺少质量标注。如果直接用于训练或评测,会带来噪声、偏差和浪费。

Pipeline 解决的正是这个问题:将原始运行时日志自动加工为可直接使用的高质量数据集。通过编排处理节点描述数据处理流程,Pipeline 自动完成清洗、标注和去重。

开始之前

使用 Pipeline 前,请确认以下前置条件已满足:

  • 已开通 AgentLoop服务。

  • 已开通日志服务 SLS,并拥有包含 AI Agent 运行日志的 Logstore。

如需快速上手实操,请参见快速开始。本文侧重于帮助您理解 Pipeline 的核心概念和编排思维。

核心概念

Pipeline 由四大组件构成:数据源(Source)、处理节点(Pipeline Nodes)、输出目标(Sink)和调度策略(ExecutePolicy)。

数据源(Source)

Pipeline 的输入端,定义系统从哪里读取原始数据。目前支持从 Logstore 读取,可以指定数据来源并配置过滤条件,精准圈定需要处理的数据范围。

处理节点(Pipeline Nodes)

Pipeline 的核心处理逻辑,由一组按顺序排列的节点组成。每个节点完成一项原子任务(如字段选取(project)、去重、采样、AI 调用等),数据依次流过各节点逐步加工。

Pipeline 提供 13 个处理节点,按功能分为 6 大类

类别

节点

一句话说明

基础处理

project(字段选取)

从原始数据中选取并重命名字段,声明 Pipeline 输入 Schema

extend(正则提取)

基于表达式计算新列或覆盖已有列

where(空值过滤)

按条件表达式过滤行

数据组装

make-instance(实例构建)

将离散事件按分组键聚合为行级样本宽表(多行合一行)

数据清洗

dedup-exact(精确去重)

完全相同的文本记录仅保留一条

dedup-fuzzy(近似去重)

高度相似(字面差异极小)的文本记录视为重复

dedup-semantic(语义去重)

表述不同但含义相同的文本记录视为重复

特征计算

embedding(向量生成)

对文本字段生成 Embedding 向量

doc-stats(文本统计)

计算字符数、词数、行数等统计指标

数据采样

semantic-cluster(语义聚类)

基于 Embedding 向量为数据分配簇 ID

sample(随机采样)

按比例或固定条数抽样,支持分组

AI 处理

llm-call(LLM 调用)

模板渲染 + 模型推理 + 输出解析,支持评估、标注、合成等

agentic-call(智能体调用)

调用数字员工发起智能对话,支持分析、问答、洞察等

输出目标(Sink)

Pipeline 的输出端,定义处理结果写到哪里。处理结果自动写入目标 Dataset,只需指定目标工作区(Workspace)和数据集(Dataset)名称。

调度策略(ExecutePolicy)

控制 Pipeline 何时运行。支持定时周期执行——例如每 15 分钟自动运行一次,系统自动按时间窗口拉取新数据并处理,实现持续数据产出。

数据处理的六个阶段

Pipeline 的数据处理按逻辑分为六个阶段,每个阶段可根据需要选用。

第一阶段:字段选取与预处理

为什么需要?

原始日志字段繁多、命名各异,直接使用会导致下游节点无法统一处理。这一阶段的核心任务是「统一语言」——为整条 Pipeline 声明统一的 Schema,同时过滤掉无效数据,减少后续无用计算。

什么时候用?

几乎所有 Pipeline 都需要。project(字段选取)选取字段几乎是必选的第一步;extend(正则提取)在需要从嵌套字段中提取信息时使用;where(空值过滤)在数据中存在空值或无效记录时使用。

第二阶段:数据组装

为什么需要?

AI Agent 运行时产生的数据往往是离散事件(如 OT Trace Span),每条只包含交互片段。要做有意义的去重、采样和评估,需要先将这些片段「拼回」一条完整的交互记录。

什么时候用?

当数据源是事件级数据(每次完整交互被拆成多条记录)时,需要用 make-instance(实例构建)将它们聚合为行级样本。如果数据已经是一行一条完整记录,可以跳过这一阶段。

make-instance 内置了三类聚合函数(选值、计算、组合),覆盖了绝大多数数据组装需求。整个过程纯 CPU 运算,零 AI 成本。

第三阶段:三级去重

为什么需要?

真实数据中存在大量重复——从完全相同的重复提交,到一字之差的错别字变体,再到表述不同但含义相同的语义重复。这些冗余会浪费存储、扭曲分布、浪费 AI 调用成本。三级去重逐层消除不同深度的冗余。

什么时候用?

几乎所有需要清洗的场景。三个级别可按需选用:

级别

节点

消除什么类型的重复

计算代价

L1 精确去重

dedup-exact

完全相同的记录

极低

L2 近似去重

dedup-fuzzy

一字之差的变体(错别字、标点、空格)

L3 语义去重

dedup-semantic

不同表述但含义相同的记录

中等

设计理念:计算代价 L1 < L2 < L3,排列在前的节点已大幅削减数据量,使高代价节点的输入规模可控。这就是「由粗到细」的编排哲学。

全局去重dedup-fuzzydedup-semantic 还支持全局模式,跨批次与目标 Dataset 中的已有数据做增量去重,确保 Dataset 中绝无重复。

第四阶段:多样性采样

为什么需要?

去重之后数据量仍可能很大。直接全部送入 AI 处理既不经济也没必要。但简单随机采样可能导致某些语义类别被遗漏。「语义聚类 + 分组采样」的组合确保每个语义类别都有代表,在降量的同时保证多样性。

什么时候用?

当需要构建评测数据集、训练样本集,或者需要控制 AI 调用成本时。聚类采样可以用少量数据覆盖全部语义空间。

说明

semantic-cluster(语义聚类)可复用上游 dedup-semantic(语义去重)产生的向量列,无需重复计算 Embedding。最终产出条数不超过簇数乘以每簇条数,可按预算逆推参数。

第五阶段:AI 处理

为什么需要?

数据清洗和采样解决了「量」的问题,但数据的「质量标签」和「增值信息」仍然缺失。AI 处理阶段通过 LLM 或智能体为每条数据增加评估、标注、合成等增值信息,让数据真正可用。

什么时候用?

当需要自动化质量评分、结构化分类标注、数据增强合成、或通过智能体进行深度分析时。

Pipeline 提供两个 AI 处理节点:

  • llm-call(LLM 调用):一次纯 LLM 推理。通过 Prompt 模板和字段占位符,灵活实现评估、标注、合成、翻译、过滤等任意任务。支持 JSON 结构化输出,确保结果可解析。

  • agentic-call(智能体调用):调用数字员工,内部封装了完整的 SOP 流程、知识库检索、工具调用等能力,适合需要多步推理的复杂分析场景。

第六阶段:特征计算

为什么需要?

有时需要为 Dataset 补充额外的特征信息——文本的向量表示(用于后续检索或聚类)、或字符数/词数/行数等统计指标(用于数据分析和质量监控)。

什么时候用?

embedding(向量生成)适用于无上游向量列时独立生成向量;doc-stats(文本统计)常放在 Pipeline 末尾,为 Dataset 补充文本统计信息。

典型场景

以下 6 个场景覆盖了最常见的数据处理需求。每个场景展示节点组合方式,帮助快速理解「什么场景用什么节点组合」。

场景一:数据去重清洗

目标:快速去除重复数据,产出干净 Dataset。零 AI 调用成本,秒级处理万级数据。

适用场景:日志中存在大量重复提问,只需要一份干净去重的基础数据集。

节点链路:LogStore → project(字段选取) → where(空值过滤) → dedup-exact(精确去重) → dedup-fuzzy(近似去重) → Dataset

先选取关键字段、过滤空值,再用精确 + 近似两级去重消除重复。纯 CPU 运算,无 AI 成本,适合作为所有场景的起步配置。

场景二:多样性采样

目标:三级去重 + 聚类采样,构建覆盖各语义类别的代表性子集。

适用场景:需要构建评测数据集或训练样本,要求数据多样性而非数量。

节点链路:LogStore → projectdedup-exactdedup-fuzzydedup-semanticsemantic-clustersample → Dataset

完整的三级去重消除各层次冗余,语义聚类确保每个类别都有代表性样本入选。语义去重产生的向量列可直接复用于聚类,无需重复计算。

场景三:AI 质量评估

目标:去重采样后自动化质量评分,替代人工评审。

适用场景:需要快速了解 Agent 回答质量的整体水平,或在大规模数据中筛选出高/低质量样本。

节点链路:LogStore → projectdedup-exactsample(降量) → llm-call(多维评估) → Dataset

先去重采样将数据量降到可控范围,再通过 LLM 多维评估对每条数据打分。评估结果以 JSON 结构化输出,可直接用于聚合分析和质量监控。

场景四:数据增强合成

目标:从种子数据出发,通过 LLM 合成多样化样本,扩充训练数据。

适用场景:训练数据不足,需要通过改写、噪声注入、追问生成、对抗样本等方式扩充数据量和多样性。

节点链路:LogStore → projectsample(种子采样) → llm-call(合成) → Dataset

采样一批种子数据,通过 LLM 合成多种变体。超长 Prompt 建议注册为命名模板,便于版本管理和复用。

场景五:端到端全流程

目标:清洗 → 采样 → 评估 → 标注 → 合成 → 统计,一站式产出高质量 Dataset。

适用场景:需要一次性完成从原始日志到带多维标注的高质量数据集的全链路处理。这是 Pipeline 最完整的参考配置。

节点链路:LogStore → project + extendwherededup-exactdedup-fuzzy(全局) → dedup-semantic(全局) → semantic-clustersamplellm-call x 3(评估 + 标注 + 合成) → doc-stats → Dataset

10,000 条原始数据经三级去重(含全局去重)+ 聚类采样后约剩 300 条,再做 3 轮 LLM 调用(评估 + 标注 + 合成),最后补充文本统计,最终产出带多维标注的高质量 Dataset。

场景六:OT-Trace 数据处理

目标:从 OT Span 离散事件组装为完整样本,再清洗评估。

适用场景:AI Agent 接入了 OpenTelemetry 采集规范,运行时数据以离散 Span 形式存储,需要先组装为完整交互记录再进行后续处理。

节点链路:OT Span → extend(JSON 提取) → where(事件过滤) → make-instance(Trace 聚合) → where(空值过滤) → dedup-exactsamplellm-call(评估) → Dataset

先用 extend 从嵌套 JSON 中提取扁平字段,where 过滤无关事件类型,make-instance 按 Trace 聚合为样本宽表,再进入常规的去重、采样、评估流程。数据组装阶段是这个场景的关键差异点。

编排思维

掌握以下思维模式,可以为任意场景设计高效的 Pipeline。

先问自己三个问题

编排前先明确以下问题:

  1. 数据长什么样?是已经一行一条完整记录,还是离散事件需要组装?字段嵌套在 JSON 里吗?——这决定了是否需要 make-instanceextend

  2. 最终要什么?是干净的去重数据集,还是带评分的评测集,还是用于训练的增强数据?——这决定了需要哪些 AI 处理节点。

  3. 预算和时效要求是什么?LLM 调用有成本,处理有时间——这决定了采样策略和模型选择。

四条编排哲学

哲学

核心思想

为什么这样做

Schema 前置

project 统一字段名开头

后续所有节点使用同一套字段名,避免命名混乱

先减后增

先去重/采样(减少行数),再 AI 处理(增加列数)

LLM 调用按 Token 计费。10,000 条数据全部做 AI 评估 vs 300 条做 AI 评估,成本差 30 倍以上

由粗到细

去重顺序:精确 → 近似 → 语义

计算代价递增,但前置步骤已大幅削减数据量,使高代价节点的输入规模可控

节点原子,组合万象

每个节点只做一件事,复杂逻辑通过组合实现

聚类只标注簇 ID,采样只过滤行,AI 只增列。灵活插拔,创造出适合自己场景的独特组合

灵活编排

上面的六个典型场景是常见模式,但不是唯一选择。Pipeline 的节点可以自由组合:

  • 跳过某些去重级别——如果数据几乎没有近似重复,直接用精确去重 + 语义去重

  • 多轮 AI 处理——先评估打分,再按分数过滤,然后对优质数据做标注和合成

  • 采样前置——如果原始数据量巨大但只需要少量样本,可以在去重之前就先采样降量

  • 纯 AI 增值——数据已经很干净,只需要加 llm-call 做评估或标注

根据业务场景特点自由组合节点,构建适合实际需求的 Pipeline。

预置模板导航

Pipeline 提供 9 个预置模板,覆盖从简单去重到端到端全流程的各类场景。支持直接使用或基于模板定制。

按目标快速选择

目标

推荐模板

复杂度

快速清理重复数据

Agent 数据去重清洗

三级去重 + 聚类采样

多样性采样

自动化质量评分

对话质量评估

结构化多维标注

自动标注分类

种子数据扩充

数据增强合成

端到端全流程

端到端全流程

快速体验全部能力

仿真数据 Demo

低~高

分析 LLM 调用质量

OT-LLM 质量分析

Trace 级数据治理

OT-Trace 数据治理

按角色选择

角色

推荐模板

数据工程师

去重清洗、多样性采样

算法工程师

多样性采样、质量评估、数据增强

评测工程师

质量评估、自动标注

数据平台团队

全流程、仿真 Demo

新用户上手

仿真数据 Demo

OT 数据使用者

OT-LLM 质量分析、OT-Trace 数据治理

使用方式

  1. 直接使用:选择模板,修改数据源和输出目标配置,调整字段映射为实际日志字段,提交创建。

  2. 基于模板定制:选择最接近的模板,参考其文档中的「定制建议」,增减节点、调整参数、替换 Prompt。

  3. 自由组合:不同模板的节点可自由混搭——如去重清洗模板的清洗链 + 质量评估模板的评估 Prompt。

最佳实践

Pipeline 设计

  • 从简单开始:先用 2~3 个节点验证数据流转,确认字段映射正确后再逐步添加更多节点。

  • 关注数据量变化:在每个节点后观察行数变化,确保去重和采样的效果符合预期。

  • Prompt 独立管理:将 LLM 的 Prompt 注册为命名模板(@path 引用),与 Pipeline 配置解耦,便于独立迭代。

  • 善用预置模板:从最接近的模板出发定制,比从零开始编排效率高得多。

成本控制

  • 先减后增是铁律:LLM 调用按 Token 计费,务必在去重 + 采样后再执行。以 10,000 条原始数据为例,经三级去重 + 采样后约剩 300 条,LLM 调用成本降低 97%。

  • 模型分级使用:评估等高精度任务用 qwen-max,标注等相对简单的任务用 qwen-plusqwen-turbo,灵活控制成本。

  • 采样规模可算:最终数据量约等于聚类簇数乘以每簇采样条数,可按预算逆推参数。

  • 调度间隔合理:定时调度的间隔应与数据量和预算匹配,避免过于频繁导致成本失控。

全局去重

  • 首批数据无效果:全局去重是与目标 Dataset 已有数据比对,首批执行时 Dataset 为空,效果从第二批开始显现。

  • 仅在必要时启用:全局去重需要额外的向量计算和 Dataset 查询,如果 Pipeline 不是增量运行(只跑一次),可以省略。

  • 目标必须一致:全局去重的目标必须与输出 Dataset 一致,否则去重无意义。

Schema 一致性

  • 统一命名:通过 project 在 Pipeline 入口统一字段名,下游所有节点使用相同的列名引用数据。

  • 注意系统扩展列:系统自动生成的扩展列以 __ 开头(如 __dedup_emb__cluster_id__doc_stats),避免自定义列名与之冲突。

  • 关注列的来源:当 Pipeline 较长时,注意确认下游节点引用的列是否已在上游产生。

常见问题

入门常见问题

Q1:Pipeline 和 Dataset 是什么关系?

Pipeline 是数据加工引擎,Dataset 是数据存储。Pipeline 从 Logstore 读取原始数据,经过处理后写入 Dataset。一个 Pipeline 对应一个输出 Dataset。

Q2:应该从哪个模板开始?

如果是新用户,建议从仿真数据Demo模板开始——它基于 Mock 数据,无需准备真实日志即可体验 Pipeline 的全部能力。确认理解后,再选择与业务场景最匹配的模板。

Q3:Pipeline 创建后多久开始执行?

配置调度策略后,Pipeline 会按设定的间隔自动执行。首次执行从指定的起始时间开始,之后按周期运行。

Q4:一个 Pipeline 可以输出到多个 Dataset 吗?

目前一个 Pipeline 对应一个输出 Dataset。如果需要将数据写入多个 Dataset,可以创建多个 Pipeline,共享相同的数据源但配置不同的处理逻辑和输出目标。

编排与设计

Q5:节点的执行顺序是怎样的?

严格按节点列表的顺序依次执行,前一个节点的输出是后一个节点的输入。不支持分支或并行。

Q6:Prompt 模板怎么引用字段?

在 Prompt 中使用 {{列名}} 占位符,并在配置中声明所有参与渲染的列名。例如:Prompt 写「请评估:{{question}} 的回答:{{output}}」,对应声明 question,output。系统会自动校验两者的一致性。

Q7:上游产生的扩展列可以在下游使用吗?

可以。上游节点产生的扩展列(如语义去重的向量列、聚类的簇 ID 列)对下游完全可见,可直接引用。这也是「扩展列复用」原则的核心——避免重复计算。

Q8:make-instance 的分组键怎么选?

分组键应选择能唯一标识一次完整交互的字段组合。例如 OT-Trace 场景下,session_id + traceId 可以将同一次请求的所有 Span 聚合为一行样本。

成本与性能

Q9:LLM 调用成本怎么控制?

三个手段:(1)先去重 + 采样,大幅减少调用行数;(2)选择性价比合适的模型;(3)控制采样数量限制最终数据量。以 300 条 x 3 轮 LLM 调用为例,约 20~50 元(视模型定价)。

Q10:去重阈值怎么选?

近似去重推荐阈值 3(越小越严格),语义去重推荐阈值 0.1(越小越严格)。可先用默认值运行,观察去重效果后再微调。

Q11:Pipeline 处理大规模数据需要多久?

纯 CPU 节点(字段选取、去重、采样等)处理万级数据通常在秒级完成。LLM 调用是主要耗时环节,300 条数据 x 单轮 LLM 调用约 10~30 分钟,具体取决于模型和 Prompt 复杂度。

故障排查

Q12:Pipeline 执行失败时如何查看错误信息?

在 Pipeline 列表中单击失败的执行记录,查看执行日志和错误详情。常见错误类型包括:

  • 数据源问题:Logstore 无数据、字段名错误、查询语法错误。

  • 配置问题:节点参数错误、字段映射不一致、命名模板未注册。

  • 权限问题:无 Logstore 读取权限、无 Dataset 写入权限。

  • 资源问题:处理超时、内存不足。

Q13:Pipeline 执行失败了怎么排查?

检查三个常见原因:(1)字段映射中引用的原始字段名是否与 Logstore 实际字段一致;(2)节点中声明的列是否在上游已产生;(3)命名模板是否已正确注册。详细错误信息可在执行日志中查看。

Q14:全局去重配置后没有效果?

首批执行时目标 Dataset 为空,全局去重无数据可比对,效果从第二批开始显现。另外请确认全局去重的目标与输出 Dataset 一致。

Q15:LLM 返回结果不是合法 JSON 怎么办?

配置 JSON 输出格式后,系统会自动重试并修复非法 JSON。如果仍然失败,输出中的错误信息字段会记录具体原因,可用条件过滤出这些异常行进行排查。

Q16:数据量没有按预期减少怎么排查?

可能原因:(1)去重字段选择不当——确保选择最能代表数据唯一性的文本字段(如用户提问而非系统输出);(2)阈值过于严格——适当放宽;(3)数据本身重复率低——这是正常情况,说明数据质量较好。