Pipeline 是 AgentLoop 的数据处理流水线引擎。本文提供 Pipeline 的配置结构、算子参数、扩展列、REST API 和使用限制等参考信息。
什么是 Pipeline
Pipeline 是 AgentLoop 的数据处理流水线引擎,将原始数据经过多级处理(字段选取、数据组装、去重、采样、AI 调用等),自动产出高质量 Dataset。定义数据源、编排处理算子并指定输出目标后,Pipeline 按调度策略自动执行。
数据处理路径
Pipeline 从 SLS LogStore 读取原始日志,经过多级处理后输出高质量结构化数据集。
数据在 Pipeline 中的处理流程:
数据接入:从 SLS LogStore 中读取原始日志数据。
基础处理:选取关键字段、计算派生列、过滤无效记录。
数据组装:将离散的事件级日志聚合为完整的样本实例。
数据清洗:通过精确、近似、语义三级去重,消除冗余数据。
数据采样:基于语义聚类实现多样性采样,确保数据代表性。
AI 处理:调用大语言模型或数字员工,完成智能评估、标注与合成。
特征计算:生成向量表示、统计文档指标,丰富数据维度。
数据输出:将处理完毕的高质量数据写入 Dataset。
按需组合处理步骤,可以跳过不需要的环节,也可以多次使用同一类算子,组成适合业务场景的数据管线。
核心能力
特点 | 说明 |
与 SLS LogStore 无缝集成 | 直接对接 SLS LogStore,无需数据导入,支持查询条件在源头过滤数据范围。 |
内置 LLM 和智能体调用能力 | 原生支持 |
单算子表达能力强 | 每个算子内建丰富的参数选项与函数(字符串处理、JSON 提取、正则匹配、数学运算等),单步即可完成复杂逻辑。 |
多算子灵活组合 | 13 个算子自由编排,从简单的字段选取到完整的"去重 - 聚类 - 采样 - AI 评估"全链路,按需增减节点即可。 |
三级去重与全局去重 | 精确、近似、语义三种策略逐级串联;全局模式可跨批次对比历史数据,杜绝增量入库重复。 |
定时调度自动运转 | 设定起始时间与执行间隔后自动运行,支持从每 15 分钟高频采集到每日批量处理。 |
算子详解
Pipeline 提供 13 个处理算子,分为 6 大类。
算子快速索引表
类别 | 算子 | 说明 | 行数变化 |
基础处理 |
| 从原始数据选取并重命名字段 | 不变(1:1) |
| 基于表达式计算新列或覆盖已有列 | 不变(1:1) | |
| 按条件表达式过滤行 | 减少或不变 | |
数据组装 |
| 将离散事件按分组键聚合为行级样本 | 多行合并为每组一行 |
数据清洗 |
| 完全相同的文本仅保留一条 | 减少或不变 |
| 高度相似的文本视为重复 | 减少或不变 | |
| 含义相同但表述不同的文本视为重复 | 减少或不变 | |
特征计算 |
| 对文本字段生成向量 | 不变(1:1) |
| 计算字符数、词数、行数等指标 | 不变(1:1) | |
数据采样 |
| 基于向量为数据分配簇 ID | 不变(1:1) |
| 按比例或固定条数抽样 | 减少 | |
AI 处理 |
| 调用大语言模型进行评估、标注、合成等 | 不变(1:1) |
| 调用数字员工发起智能对话 | 不变(1:1) |
基础处理
基础处理算子完成数据准备:选取字段、计算派生列、过滤不符合条件的记录。通常作为 Pipeline 编排的起点。
project:字段选取
从原始数据中选取并重命名字段,仅保留需要的字段。
核心参数
参数 | 说明 | 示例 |
| 原始字段名。键为映射后名称,值为原始列名。 |
|
行数变化:不变(1:1)。无扩展列。
extend:字段扩展
基于表达式计算新列或覆盖已有列,支持字符串处理、数学运算、条件判断、JSON 提取等丰富的内建函数。
核心参数
参数 | 说明 | 示例 |
| 计算表达式。列名已存在则覆盖,不存在则新增。 |
|
行数变化:不变(1:1)。无扩展列(直接输出新列)。
where:筛选过滤
按条件表达式过滤行,仅保留满足条件的记录。支持比较运算、逻辑组合、模式匹配等丰富的过滤条件。
核心参数
参数 | 说明 | 示例 |
| 过滤条件表达式,结果须为布尔值。 |
|
行数变化:减少或不变。无扩展列。
数据组装
数据组装算子将离散的事件级日志聚合为完整的样本实例。一次用户会话通常由多条事件记录组成,通过数据组装可合并为一行完整样本。
make-instance:实例构建
将离散的事件级日志按分组键聚合为行级样本实例。例如将一次会话中的多条消息合并为一行完整的对话样本。
核心参数
参数 | 说明 | 示例 |
| 分组键,逗号分隔。 |
|
| 聚合函数调用(不支持裸字段名)。 |
|
行数变化:多行合并为每组一行。无扩展列。
数据清洗
数据清洗算子消除冗余记录。Pipeline 提供三种粒度的去重策略,支持逐级串联:先用精确去重快速削减数据量,再用近似去重和语义去重逐步收敛。
dedup-exact:精确去重
去除完全相同的文本记录,每组保留文本最长的一条。计算代价最低,建议作为去重的第一步。
核心参数
参数 | 说明 | 示例 |
| 去重依据的文本字段。 |
|
| 是否开启全局去重(跨批次与 Dataset 增量去重)。 |
|
行数变化:减少或不变。扩展列:__dedup_hash、__dedup_weight、__dedup_rnk。
dedup-fuzzy:近似去重
去除高度相似但不完全相同的文本记录,每组保留文本最长的一条。通过 threshold 控制相似度判定的严格程度。
核心参数
参数 | 说明 | 示例 |
| 去重依据的文本字段。 |
|
| 相似度阈值(默认 |
|
| 是否开启全局去重。 |
|
行数变化:减少或不变。扩展列:__dedup_hash、__dedup_weight、__dedup_rnk。
dedup-semantic:语义去重
去除含义相同但表述不同的文本记录,通过向量距离衡量语义相似度,每组保留一条代表性记录。
核心参数
参数 | 说明 | 示例 |
| 去重依据的文本字段。 |
|
| 向量距离阈值(0~1,默认 |
|
| Embedding 模型名称。 |
|
| 是否开启全局去重。 |
|
行数变化:减少或不变。扩展列:__dedup_emb(向量,可被下游复用)、__dedup_rid。
特征计算
特征计算算子为数据增加向量表示和统计指标等衍生特征,可供下游算子使用(如语义聚类依赖向量输入),也可作为 Dataset 数据列保留。
embedding:向量生成
对指定文本字段生成 Embedding 向量,供下游聚类、相似度计算等算子使用。
核心参数
参数 | 说明 | 示例 |
| 待向量化的文本字段。 |
|
| Embedding 模型名称(可选)。 |
|
| 输出列名(默认 |
|
如果上游已有 dedup-semantic,可直接使用其 __dedup_emb 扩展列,无需重复调用 embedding。
行数变化:不变(1:1)。扩展列:{as}(默认 {field}_embedding)。
doc-stats:文档统计
计算文本字段的文档级统计指标,输出包含字符数、词数、行数的统计 JSON。
核心参数
参数 | 说明 | 示例 |
| 待统计的文本字段。 |
|
| 输出列名(默认 |
|
输出格式:JSON 对象,包含 doc_len_char(字符数)、doc_len_words(词数)、line_counts(行数)三个数值字段。
行数变化:不变(1:1)。扩展列:{as}(默认 __doc_stats,JSON 类型)。
数据采样
数据采样算子在去重后进一步精选数据:通过语义聚类按主题分组,再从每个聚类中抽取代表性样本,确保数据集精简且语义多样。
semantic-cluster:语义聚类
基于 Embedding 向量为数据分配簇 ID,是实现多样性采样的关键步骤。通常与 sample 算子配合使用,每簇取样保证语义多样性。
核心参数
参数 | 说明 | 示例 |
| Embedding 向量列名。 |
|
| 聚类簇数(正整数)。 |
|
行数变化:不变(1:1)。扩展列:__cluster_id(簇编号,从 0 开始)。
sample:随机采样
从数据中随机采样指定比例或数量的记录。支持按分组列做分层采样,常与 semantic-cluster 搭配实现"每簇取 N 条"的多样性采样。
核心参数
参数 | 说明 | 示例 |
| 采样率 (0, 1],与 |
|
| 采样条数,与 |
|
| 分组列名(可选),多列逗号分隔。 |
|
行数变化:减少。无扩展列。
AI 处理
AI 处理算子将大语言模型和智能体能力嵌入数据处理流程,对每行数据进行智能评估、质量标注、内容合成等操作。
llm-call:LLM 调用
调用大语言模型对每行数据进行智能处理,支持 Prompt 模板渲染、模型选择和输出格式解析。
核心参数
参数 | 说明 | 示例 |
| Prompt 模板,支持 |
|
| 参与渲染的输入列名,逗号分隔。 |
|
| 输出格式: |
|
| LLM 模型标识(可选)。 |
|
| 输出列名(默认 |
|
行数变化:不变(1:1)。扩展列:{as}(默认 __llm_result)。
agentic-call:智能体调用
调用数字员工对每行数据发起智能对话。与 llm-call 不同,agentic-call 调用用户构建的数字员工,支持多步推理、知识库检索、工具调用等。
核心参数
参数 | 说明 | 示例 |
| Prompt 模板,支持 |
|
| 参与渲染的输入列名,逗号分隔。 |
|
| 数字员工名称。 |
|
| 输出列名(默认 |
|
行数变化:不变(1:1)。扩展列:{as}(默认 __agentic_result)。
编排
合理编排算子顺序,构建从简单字段处理到完整 AI 评估的数据链路。
编排原则
原则 | 说明 |
Schema 前置 | Pipeline 以 |
组装优先 | 离散事件数据先用 |
先减后增 | 先做去重和采样(减少行数),再做 AI 处理(增加列数)。LLM 调用成本较高,务必在数据量降下来之后再执行。 |
由粗到细 | 去重顺序:精确去重 - 近似去重 - 语义去重。计算代价递增,但前置步骤已大幅削减数据量。 |
扩展列复用 | 上游算子产生的扩展列(如 |
算子原子性 | 每个算子职责单一——聚类只标注簇 ID,采样只过滤行,AI 只增列。通过组合实现复杂逻辑。 |
常见编排组合
根据数据处理需求,选择合适的算子组合。
场景 | 编排链路 | 说明 |
简单字段提取 |
| 从原始数据中选取字段并按条件过滤,适合数据量小、无需去重的场景。 |
基础清洗入库 |
| 选取字段、计算派生列、过滤后做精确去重,适合结构化程度高的数据。 |
三级去重 |
| 逐级去重,由粗到细,计算代价递增但数据量逐步递减。 |
多样性采样 |
| 语义去重后复用其向量列做聚类,再按簇抽样,保证数据多样性。 |
AI 质量评估 |
| 采样后串联多次 LLM 调用,分别完成不同维度的评估和标注。 |
会话级全链路 |
| 从事件聚合到 AI 评估的完整链路,适合对话类数据的全流程处理。 |
调度模式
Pipeline 支持单次执行和定时调度两种模式。
单次执行
适用于一次性数据处理任务,手动触发后处理完毕即结束。
定时调度
适用于持续性数据采集和处理。设置起始时间和执行间隔后,Pipeline 自动按计划运行。
调度参数
参数 | 说明 | 示例 |
调度模式 | 当前支持定时调度( |
|
起始时间 | 调度开始的时间点(Unix 时间戳,秒)。 |
|
执行间隔 | 两次执行之间的时间间隔。 |
|
常见调度间隔参考
间隔 | 适用场景 |
| 高频数据采集,需要近实时处理的场景。 |
| 中等频率,适合日常数据处理。 |
| 低频处理,适合数据量不大或对实时性要求不高的场景。 |
| 每日批处理,适合日报级别的数据汇总。 |
每次执行时,Pipeline 自动处理上一次执行以来的增量数据。
API 与配置
Pipeline JSON 配置结构
完整的 Pipeline 配置由四个顶层模块组成:source(数据源)、pipeline(处理管线)、sink(输出目标)、executePolicy(调度策略)。
顶层字段
字段 | 类型 | 必填 | 说明 |
| String | 是 | Pipeline 名称,项目内唯一。 |
| String | 否 | Pipeline 描述。 |
| Object | 是 | 数据源配置。 |
| Object | 是 | 处理管线配置。 |
| Object | 是 | 输出目标配置。 |
| Object | 是 | 调度策略配置。 |
source 配置
定义 Pipeline 从哪里读取原始数据。
参数 | 类型 | 必填 | 说明 |
| String | 是 | 数据源类型,目前支持 |
| String | 是 | Project 名称。 |
| String | 是 | LogStore 名称。 |
| String | 否 | 查询过滤条件。 |
pipeline.nodes 配置
每个算子的通用结构:
参数 | 类型 | 必填 | 说明 |
| String | 是 | 算子唯一标识。 |
| String | 是 | 算子类型(见上方算子详解)。 |
| Object | 是 | 算子参数(各类型不同)。 |
sink 配置
定义处理结果输出到哪里。
参数 | 类型 | 必填 | 说明 |
| String | 是 | 输出类型,目前支持 |
| String | 是 | Dataset 所在工作空间。 |
| String | 是 | 目标 Dataset 名称。 |
executePolicy 配置
定义 Pipeline 的调度执行策略。
参数 | 类型 | 必填 | 说明 |
| String | 是 | 调度模式,目前支持 |
| Integer | 是 | 调度起始时间(Unix 时间戳,秒)。 |
| String | 是 | 调度间隔,如 |
配置示例
以下示例展示一个完整的 Pipeline 配置,从 LogStore 读取数据,经过字段选取、精确去重和 LLM 评估后输出到 Dataset。
{
"name": "quality-eval-pipeline",
"description": "对话质量评估管线",
"source": {
"type": "LogStore",
"LogStore": {
"project": "my-sls-project",
"LogStore": "agent-logs",
"query": "* | WHERE level = 'INFO'"
}
},
"pipeline": {
"nodes": [
{
"id": "select-fields",
"type": "project",
"parameters": {
"question": "user_query",
"answer": "agent_response",
"session": "session_id"
}
},
{
"id": "remove-duplicates",
"type": "dedup-exact",
"parameters": {
"field": "question",
"global": false
}
},
{
"id": "eval-quality",
"type": "llm-call",
"parameters": {
"prompt": "评估以下对话的回答质量(1-5分):\n问题:{{question}}\n回答:{{answer}}",
"fields": "question,answer",
"format": "json",
"model": "qwen-plus",
"as": "quality_score"
}
}
]
},
"sink": {
"type": "dataset",
"dataset": {
"workspace": "my-workspace",
"dataset": "evaluated-conversations"
}
},
"executePolicy": {
"mode": "scheduled",
"scheduled": {
"fromTime": 1700000000,
"interval": "1h"
}
}
}REST API 参考
Pipeline 提供 REST API,通过 SDK 或 HTTP 请求管理 Pipeline 的完整生命周期。
Pipeline 类型:
类型 | 说明 |
| 数据集导出,将 LogStore 数据经过处理后导出到 Dataset。 |
| 数据加工,对数据进行转换和处理。 |
CreatePipeline
创建一个新的 Pipeline。
POST /pipelines
请求参数
字段 | 类型 | 必填 | 说明 |
| String | 是 | Pipeline 名称,项目内唯一。 |
| String | 否 | Pipeline 描述。 |
| String | 是 |
|
| Object | 是 | 数据源配置(含 Type、LogStore.Project、LogStore.LogStore、LogStore.RoleArn)。 |
| Object | 是 | Pipeline 配置参数。 |
| Object | 是 | 输出目标配置(含 Type、Dataset.Project、Dataset.Dataset)。 |
所需权限:CreatePipeline(acs:log:*:*:project/{project}/pipeline/{name})+ ram:PassRole。
请求示例
POST /pipelines HTTP/1.1
{
"PipelineName": "quality-eval",
"PipelineType": "DATASET",
"Description": "对话质量评估管线",
"Source": {
"Type": "LogStore",
"LogStore": {
"Project": "my-sls-project",
"LogStore": "agent-logs"
}
},
"Configuration": { ... },
"Destination": {
"Type": "dataset",
"Dataset": {
"Project": "my-workspace",
"Dataset": "eval-results"
}
}
}GetPipeline
查询指定 Pipeline 的详细信息。
GET /pipelines/{name}
响应包含完整的 Pipeline 配置(与创建时结构一致),以及额外的时间戳字段:
字段 | 说明 |
| 创建时间(Unix 时间戳)。 |
| 最近更新时间(Unix 时间戳)。 |
所需权限:GetPipeline(acs:log:*:*:project/{project}/pipeline/{name})。
UpdatePipeline
更新指定 Pipeline 的配置。请求体结构与 CreatePipeline 一致,限制如下:
PipelineType不可修改。Source中的Project和LogStore不可修改(RoleArn 可更新)。
PUT /pipelines/{name}
所需权限:UpdatePipeline(acs:log:*:*:project/{project}/pipeline/{name})+ ram:PassRole。
DeletePipeline
删除指定 Pipeline。
DELETE /pipelines/{name}
所需权限:DeletePipeline(acs:log:*:*:project/{project}/pipeline/{name})。
ListPipelines
查询项目下的 Pipeline 列表,支持分页和名称过滤。
GET /pipelines?size=100&offset=0&name=keyword
查询参数 | 说明 |
| 每页条数(可选)。 |
| 分页偏移量(可选)。 |
| 按名称模糊过滤(可选)。 |
响应包含 total(总数)、count(本页条数)、pipelines(Pipeline 摘要列表,含 name、description、createTime、updateTime)。
所需权限:ListPipelines(acs:log:*:*:project/{project}/pipeline/*)。
限制与配额
限制项 | 说明 |
Pipeline 数量上限 | 每个 Project 最多 200 个 Pipeline。 |
Pipeline 类型 | 创建后不可修改。 |
数据源 | 创建后 Project 和 LogStore 不可修改。 |
使用限制
通用约束
约束项 | 说明 |
算子顺序 | 算子数组按声明顺序执行,前一算子的输出作为后一算子的输入。 |
算子 ID 唯一性 | 同一 Pipeline 内每个算子的 |
字段引用 | 算子引用的字段必须存在于上游输出中,否则运行时报错。 |
NULL 值与类型要求
算子 | NULL 值行为 | 字段类型要求 |
| NULL 行不参与去重且不出现在输出中。 | 文本类型 |
| NULL 行被过滤。 | 文本类型 |
| NULL 时输出零值统计。 | 文本类型 |
| - | 向量类型 array(double) |
| 分组键为 NULL 的事件不参与分组。 | - |
| 可能返回不完整结果。 | - |
扩展列速览
以下汇总所有算子产生的扩展列:
扩展列 | 类型 | 来源算子 | 说明 |
| bigint | dedup-exact、dedup-fuzzy | 文本指纹值。 |
| integer | dedup-exact、dedup-fuzzy | 文本长度(去重时保留最长文本)。 |
| integer | dedup-exact、dedup-fuzzy | 组内排名(去重后恒为 1)。 |
| array(double) | dedup-semantic | Embedding 向量(可被 semantic-cluster 复用)。 |
| bigint | dedup-semantic | 批内行标识。 |
| bigint | semantic-cluster | 聚类簇编号(从 0 开始)。 |
| varchar / json | llm-call | LLM 调用结果。 |
| varchar | agentic-call | 数字员工回复。 |
| array(double) | embedding | Embedding 向量。 |
| json | doc-stats | 文档统计 JSON。 |
扩展列可被下游算子直接引用。典型复用:dedup-semantic 的 __dedup_emb 可作为 semantic-cluster 的 field 输入。
全局去重
三种去重算子均支持全局去重模式。
工作机制
设置 global=true 并指定 workspace 和 dataset 后,去重范围扩展为当前批次与 Dataset 历史数据的联合比对:
当前批次数据先做批内去重。
去重后的结果与 Dataset 中已有数据做增量比对。
仅保留 Dataset 中不存在的"全新"记录。
配置要求
参数 | 说明 |
| 设为 |
| 目标 Dataset 所在的工作空间。 |
| 目标 Dataset 名称。 |
| 仅 |
适用场景
增量数据入库:每批新数据需与历史数据做去重,确保 Dataset 无重复。
推荐顺序:精确去重(全局)- 近似去重(全局)- 语义去重(全局),逐级收敛。