Pipeline 参考

更新时间:
复制为 MD 格式

Pipeline 是 AgentLoop 的数据处理流水线引擎。本文提供 Pipeline 的配置结构、算子参数、扩展列、REST API 和使用限制等参考信息。

什么是 Pipeline

Pipeline 是 AgentLoop 的数据处理流水线引擎,将原始数据经过多级处理(字段选取、数据组装、去重、采样、AI 调用等),自动产出高质量 Dataset。定义数据源、编排处理算子并指定输出目标后,Pipeline 按调度策略自动执行。

数据处理路径

Pipeline 从 SLS LogStore 读取原始日志,经过多级处理后输出高质量结构化数据集。

数据在 Pipeline 中的处理流程:

  1. 数据接入:从 SLS LogStore 中读取原始日志数据。

  2. 基础处理:选取关键字段、计算派生列、过滤无效记录。

  3. 数据组装:将离散的事件级日志聚合为完整的样本实例。

  4. 数据清洗:通过精确、近似、语义三级去重,消除冗余数据。

  5. 数据采样:基于语义聚类实现多样性采样,确保数据代表性。

  6. AI 处理:调用大语言模型或数字员工,完成智能评估、标注与合成。

  7. 特征计算:生成向量表示、统计文档指标,丰富数据维度。

  8. 数据输出:将处理完毕的高质量数据写入 Dataset。

按需组合处理步骤,可以跳过不需要的环节,也可以多次使用同一类算子,组成适合业务场景的数据管线。

核心能力

特点

说明

与 SLS LogStore 无缝集成

直接对接 SLS LogStore,无需数据导入,支持查询条件在源头过滤数据范围。

内置 LLM 和智能体调用能力

原生支持 llm-callagentic-call,在流水线中直接完成质量评估、智能标注、内容合成等 AI 任务。

单算子表达能力强

每个算子内建丰富的参数选项与函数(字符串处理、JSON 提取、正则匹配、数学运算等),单步即可完成复杂逻辑。

多算子灵活组合

13 个算子自由编排,从简单的字段选取到完整的"去重 - 聚类 - 采样 - AI 评估"全链路,按需增减节点即可。

三级去重与全局去重

精确、近似、语义三种策略逐级串联;全局模式可跨批次对比历史数据,杜绝增量入库重复。

定时调度自动运转

设定起始时间与执行间隔后自动运行,支持从每 15 分钟高频采集到每日批量处理。

算子详解

Pipeline 提供 13 个处理算子,分为 6 大类。

算子快速索引表

类别

算子

说明

行数变化

基础处理

project

从原始数据选取并重命名字段

不变(1:1)

extend

基于表达式计算新列或覆盖已有列

不变(1:1)

where

按条件表达式过滤行

减少或不变

数据组装

make-instance

将离散事件按分组键聚合为行级样本

多行合并为每组一行

数据清洗

dedup-exact

完全相同的文本仅保留一条

减少或不变

dedup-fuzzy

高度相似的文本视为重复

减少或不变

dedup-semantic

含义相同但表述不同的文本视为重复

减少或不变

特征计算

embedding

对文本字段生成向量

不变(1:1)

doc-stats

计算字符数、词数、行数等指标

不变(1:1)

数据采样

semantic-cluster

基于向量为数据分配簇 ID

不变(1:1)

sample

按比例或固定条数抽样

减少

AI 处理

llm-call

调用大语言模型进行评估、标注、合成等

不变(1:1)

agentic-call

调用数字员工发起智能对话

不变(1:1)

基础处理

基础处理算子完成数据准备:选取字段、计算派生列、过滤不符合条件的记录。通常作为 Pipeline 编排的起点。

project:字段选取

从原始数据中选取并重命名字段,仅保留需要的字段。

核心参数

参数

说明

示例

{新字段名}

原始字段名。键为映射后名称,值为原始列名。

"question": "user_query"

行数变化:不变(1:1)。无扩展列。

extend:字段扩展

基于表达式计算新列或覆盖已有列,支持字符串处理、数学运算、条件判断、JSON 提取等丰富的内建函数。

核心参数

参数

说明

示例

{字段名}

计算表达式。列名已存在则覆盖,不存在则新增。

"q_len": "length(question)"

行数变化:不变(1:1)。无扩展列(直接输出新列)。

where:筛选过滤

按条件表达式过滤行,仅保留满足条件的记录。支持比较运算、逻辑组合、模式匹配等丰富的过滤条件。

核心参数

参数

说明

示例

filter

过滤条件表达式,结果须为布尔值。

"length(question) > 10 AND output IS NOT NULL"

行数变化:减少或不变。无扩展列。

数据组装

数据组装算子将离散的事件级日志聚合为完整的样本实例。一次用户会话通常由多条事件记录组成,通过数据组装可合并为一行完整样本。

make-instance:实例构建

将离散的事件级日志按分组键聚合为行级样本实例。例如将一次会话中的多条消息合并为一行完整的对话样本。

核心参数

参数

说明

示例

by

分组键,逗号分隔。

"session_id,trace_id"

{输出列}

聚合函数调用(不支持裸字段名)。

"question": "first(question)"

行数变化:多行合并为每组一行。无扩展列。

数据清洗

数据清洗算子消除冗余记录。Pipeline 提供三种粒度的去重策略,支持逐级串联:先用精确去重快速削减数据量,再用近似去重和语义去重逐步收敛。

dedup-exact:精确去重

去除完全相同的文本记录,每组保留文本最长的一条。计算代价最低,建议作为去重的第一步。

核心参数

参数

说明

示例

field

去重依据的文本字段。

"question"

global

是否开启全局去重(跨批次与 Dataset 增量去重)。

true

行数变化:减少或不变。扩展列:__dedup_hash__dedup_weight__dedup_rnk

dedup-fuzzy:近似去重

去除高度相似但不完全相同的文本记录,每组保留文本最长的一条。通过 threshold 控制相似度判定的严格程度。

核心参数

参数

说明

示例

field

去重依据的文本字段。

"question"

threshold

相似度阈值(默认 "3"),越小越严格。

"3"

global

是否开启全局去重。

true

行数变化:减少或不变。扩展列:__dedup_hash__dedup_weight__dedup_rnk

dedup-semantic:语义去重

去除含义相同但表述不同的文本记录,通过向量距离衡量语义相似度,每组保留一条代表性记录。

核心参数

参数

说明

示例

field

去重依据的文本字段。

"question"

threshold

向量距离阈值(0~1,默认 "0.1"),越小越严格。

"0.1"

model

Embedding 模型名称。

"sls-multilang-600m-i32k-o1024"

global

是否开启全局去重。

true

行数变化:减少或不变。扩展列:__dedup_emb(向量,可被下游复用)、__dedup_rid

特征计算

特征计算算子为数据增加向量表示和统计指标等衍生特征,可供下游算子使用(如语义聚类依赖向量输入),也可作为 Dataset 数据列保留。

embedding:向量生成

对指定文本字段生成 Embedding 向量,供下游聚类、相似度计算等算子使用。

核心参数

参数

说明

示例

field

待向量化的文本字段。

"question"

model

Embedding 模型名称(可选)。

"sls-multilang-600m-i32k-o1024"

as

输出列名(默认 {field}_embedding)。

"q_emb"

说明

如果上游已有 dedup-semantic,可直接使用其 __dedup_emb 扩展列,无需重复调用 embedding

行数变化:不变(1:1)。扩展列:{as}(默认 {field}_embedding)。

doc-stats:文档统计

计算文本字段的文档级统计指标,输出包含字符数、词数、行数的统计 JSON。

核心参数

参数

说明

示例

field

待统计的文本字段。

"question"

as

输出列名(默认 "__doc_stats")。

"stats"

输出格式:JSON 对象,包含 doc_len_char(字符数)、doc_len_words(词数)、line_counts(行数)三个数值字段。

行数变化:不变(1:1)。扩展列:{as}(默认 __doc_stats,JSON 类型)。

数据采样

数据采样算子在去重后进一步精选数据:通过语义聚类按主题分组,再从每个聚类中抽取代表性样本,确保数据集精简且语义多样。

semantic-cluster:语义聚类

基于 Embedding 向量为数据分配簇 ID,是实现多样性采样的关键步骤。通常与 sample 算子配合使用,每簇取样保证语义多样性。

核心参数

参数

说明

示例

field

Embedding 向量列名。

"__dedup_emb"

n

聚类簇数(正整数)。

100

行数变化:不变(1:1)。扩展列:__cluster_id(簇编号,从 0 开始)。

sample:随机采样

从数据中随机采样指定比例或数量的记录。支持按分组列做分层采样,常与 semantic-cluster 搭配实现"每簇取 N 条"的多样性采样。

核心参数

参数

说明

示例

ratio

采样率 (0, 1],与 n 互斥。

0.1

n

采样条数,与 ratio 互斥。

1

by

分组列名(可选),多列逗号分隔。

"__cluster_id"

行数变化:减少。无扩展列。

AI 处理

AI 处理算子将大语言模型和智能体能力嵌入数据处理流程,对每行数据进行智能评估、质量标注、内容合成等操作。

llm-call:LLM 调用

调用大语言模型对每行数据进行智能处理,支持 Prompt 模板渲染、模型选择和输出格式解析。

核心参数

参数

说明

示例

prompt

Prompt 模板,支持 {{列名}} 占位符或 @<path> 引用。

"@eval/prompt.md"

fields

参与渲染的输入列名,逗号分隔。

"question,output"

format

输出格式:raw(默认)或 json

"json"

model

LLM 模型标识(可选)。

"qwen-plus"

as

输出列名(默认 "__llm_result")。

"eval"

行数变化:不变(1:1)。扩展列:{as}(默认 __llm_result)。

agentic-call:智能体调用

调用数字员工对每行数据发起智能对话。与 llm-call 不同,agentic-call 调用用户构建的数字员工,支持多步推理、知识库检索、工具调用等。

核心参数

参数

说明

示例

prompt

Prompt 模板,支持 {{列名}} 占位符。

"请分析 {{host_name}} 的异常"

fields

参与渲染的输入列名,逗号分隔。

"host_name,metric_name"

employee

数字员工名称。

"skill_bench_analysis"

as

输出列名(默认 "__agentic_result")。

"analysis"

行数变化:不变(1:1)。扩展列:{as}(默认 __agentic_result)。

编排

合理编排算子顺序,构建从简单字段处理到完整 AI 评估的数据链路。

编排原则

原则

说明

Schema 前置

Pipeline 以 project 选取字段开头,声明统一字段名,确保下游所有算子使用一致的字段命名。

组装优先

离散事件数据先用 make-instance 聚合为样本级,再进入后续处理。

先减后增

先做去重和采样(减少行数),再做 AI 处理(增加列数)。LLM 调用成本较高,务必在数据量降下来之后再执行。

由粗到细

去重顺序:精确去重 - 近似去重 - 语义去重。计算代价递增,但前置步骤已大幅削减数据量。

扩展列复用

上游算子产生的扩展列(如 __dedup_emb__cluster_id)可被下游直接引用,无需重复计算。

算子原子性

每个算子职责单一——聚类只标注簇 ID,采样只过滤行,AI 只增列。通过组合实现复杂逻辑。

常见编排组合

根据数据处理需求,选择合适的算子组合。

场景

编排链路

说明

简单字段提取

project - where

从原始数据中选取字段并按条件过滤,适合数据量小、无需去重的场景。

基础清洗入库

project - extend - where - dedup-exact

选取字段、计算派生列、过滤后做精确去重,适合结构化程度高的数据。

三级去重

project - dedup-exact - dedup-fuzzy - dedup-semantic

逐级去重,由粗到细,计算代价递增但数据量逐步递减。

多样性采样

dedup-semantic - semantic-cluster - sample

语义去重后复用其向量列做聚类,再按簇抽样,保证数据多样性。

AI 质量评估

sample - llm-call(评估)- llm-call(标注)

采样后串联多次 LLM 调用,分别完成不同维度的评估和标注。

会话级全链路

project - make-instance - dedup-exact - dedup-fuzzy - dedup-semantic - semantic-cluster - sample - llm-call

从事件聚合到 AI 评估的完整链路,适合对话类数据的全流程处理。

调度模式

Pipeline 支持单次执行和定时调度两种模式。

单次执行

适用于一次性数据处理任务,手动触发后处理完毕即结束。

定时调度

适用于持续性数据采集和处理。设置起始时间和执行间隔后,Pipeline 自动按计划运行。

调度参数

参数

说明

示例

调度模式

当前支持定时调度(scheduled)。

"scheduled"

起始时间

调度开始的时间点(Unix 时间戳,秒)。

1700000000

执行间隔

两次执行之间的时间间隔。

"15m""1h""1d"

常见调度间隔参考

间隔

适用场景

15m

高频数据采集,需要近实时处理的场景。

1h

中等频率,适合日常数据处理。

6h

低频处理,适合数据量不大或对实时性要求不高的场景。

1d

每日批处理,适合日报级别的数据汇总。

每次执行时,Pipeline 自动处理上一次执行以来的增量数据。

API 与配置

Pipeline JSON 配置结构

完整的 Pipeline 配置由四个顶层模块组成:source(数据源)、pipeline(处理管线)、sink(输出目标)、executePolicy(调度策略)。

顶层字段

字段

类型

必填

说明

name

String

Pipeline 名称,项目内唯一。

description

String

Pipeline 描述。

source

Object

数据源配置。

pipeline

Object

处理管线配置。

sink

Object

输出目标配置。

executePolicy

Object

调度策略配置。

source 配置

定义 Pipeline 从哪里读取原始数据。

参数

类型

必填

说明

type

String

数据源类型,目前支持 "LogStore"

LogStore.project

String

Project 名称。

LogStore.LogStore

String

LogStore 名称。

LogStore.query

String

查询过滤条件。

pipeline.nodes 配置

每个算子的通用结构:

参数

类型

必填

说明

id

String

算子唯一标识。

type

String

算子类型(见上方算子详解)。

parameters

Object

算子参数(各类型不同)。

sink 配置

定义处理结果输出到哪里。

参数

类型

必填

说明

type

String

输出类型,目前支持 "dataset"

dataset.workspace

String

Dataset 所在工作空间。

dataset.dataset

String

目标 Dataset 名称。

executePolicy 配置

定义 Pipeline 的调度执行策略。

参数

类型

必填

说明

mode

String

调度模式,目前支持 "scheduled"

scheduled.fromTime

Integer

调度起始时间(Unix 时间戳,秒)。

scheduled.interval

String

调度间隔,如 "15m""1h""1d"

配置示例

以下示例展示一个完整的 Pipeline 配置,从 LogStore 读取数据,经过字段选取、精确去重和 LLM 评估后输出到 Dataset。

{
  "name": "quality-eval-pipeline",
  "description": "对话质量评估管线",
  "source": {
    "type": "LogStore",
    "LogStore": {
      "project": "my-sls-project",
      "LogStore": "agent-logs",
      "query": "* | WHERE level = 'INFO'"
    }
  },
  "pipeline": {
    "nodes": [
      {
        "id": "select-fields",
        "type": "project",
        "parameters": {
          "question": "user_query",
          "answer": "agent_response",
          "session": "session_id"
        }
      },
      {
        "id": "remove-duplicates",
        "type": "dedup-exact",
        "parameters": {
          "field": "question",
          "global": false
        }
      },
      {
        "id": "eval-quality",
        "type": "llm-call",
        "parameters": {
          "prompt": "评估以下对话的回答质量(1-5分):\n问题:{{question}}\n回答:{{answer}}",
          "fields": "question,answer",
          "format": "json",
          "model": "qwen-plus",
          "as": "quality_score"
        }
      }
    ]
  },
  "sink": {
    "type": "dataset",
    "dataset": {
      "workspace": "my-workspace",
      "dataset": "evaluated-conversations"
    }
  },
  "executePolicy": {
    "mode": "scheduled",
    "scheduled": {
      "fromTime": 1700000000,
      "interval": "1h"
    }
  }
}

REST API 参考

Pipeline 提供 REST API,通过 SDK 或 HTTP 请求管理 Pipeline 的完整生命周期。

Pipeline 类型:

类型

说明

DATASET

数据集导出,将 LogStore 数据经过处理后导出到 Dataset。

ETL

数据加工,对数据进行转换和处理。

CreatePipeline

创建一个新的 Pipeline。

POST /pipelines

请求参数

字段

类型

必填

说明

PipelineName

String

Pipeline 名称,项目内唯一。

Description

String

Pipeline 描述。

PipelineType

String

DATASET(数据集导出)或 ETL(数据加工)。

Source

Object

数据源配置(含 Type、LogStore.Project、LogStore.LogStore、LogStore.RoleArn)。

Configuration

Object

Pipeline 配置参数。

Destination

Object

输出目标配置(含 Type、Dataset.Project、Dataset.Dataset)。

所需权限CreatePipelineacs:log:*:*:project/{project}/pipeline/{name})+ ram:PassRole

请求示例

POST /pipelines HTTP/1.1

{
  "PipelineName": "quality-eval",
  "PipelineType": "DATASET",
  "Description": "对话质量评估管线",
  "Source": {
    "Type": "LogStore",
    "LogStore": {
      "Project": "my-sls-project",
      "LogStore": "agent-logs"
    }
  },
  "Configuration": { ... },
  "Destination": {
    "Type": "dataset",
    "Dataset": {
      "Project": "my-workspace",
      "Dataset": "eval-results"
    }
  }
}

GetPipeline

查询指定 Pipeline 的详细信息。

GET /pipelines/{name}

响应包含完整的 Pipeline 配置(与创建时结构一致),以及额外的时间戳字段:

字段

说明

CreateTime

创建时间(Unix 时间戳)。

UpdateTime

最近更新时间(Unix 时间戳)。

所需权限GetPipelineacs:log:*:*:project/{project}/pipeline/{name})。

UpdatePipeline

更新指定 Pipeline 的配置。请求体结构与 CreatePipeline 一致,限制如下:

  • PipelineType 不可修改。

  • Source 中的 ProjectLogStore 不可修改(RoleArn 可更新)。

PUT /pipelines/{name}

所需权限UpdatePipelineacs:log:*:*:project/{project}/pipeline/{name})+ ram:PassRole

DeletePipeline

删除指定 Pipeline。

DELETE /pipelines/{name}

所需权限DeletePipelineacs:log:*:*:project/{project}/pipeline/{name})。

ListPipelines

查询项目下的 Pipeline 列表,支持分页和名称过滤。

GET /pipelines?size=100&offset=0&name=keyword

查询参数

说明

size

每页条数(可选)。

offset

分页偏移量(可选)。

name

按名称模糊过滤(可选)。

响应包含 total(总数)、count(本页条数)、pipelines(Pipeline 摘要列表,含 name、description、createTime、updateTime)。

所需权限ListPipelinesacs:log:*:*:project/{project}/pipeline/*)。

限制与配额

限制项

说明

Pipeline 数量上限

每个 Project 最多 200 个 Pipeline。

Pipeline 类型

创建后不可修改。

数据源

创建后 Project 和 LogStore 不可修改。

使用限制

通用约束

约束项

说明

算子顺序

算子数组按声明顺序执行,前一算子的输出作为后一算子的输入。

算子 ID 唯一性

同一 Pipeline 内每个算子的 id 必须唯一。

字段引用

算子引用的字段必须存在于上游输出中,否则运行时报错。

NULL 值与类型要求

算子

NULL 值行为

字段类型要求

dedup-exact / dedup-fuzzy / dedup-semantic

NULL 行不参与去重且不出现在输出中。

文本类型

embedding

NULL 行被过滤。

文本类型

doc-stats

NULL 时输出零值统计。

文本类型

semantic-cluster

-

向量类型 array(double)

make-instance

分组键为 NULL 的事件不参与分组。

-

llm-call / agentic-call

可能返回不完整结果。

-

扩展列速览

以下汇总所有算子产生的扩展列:

扩展列

类型

来源算子

说明

__dedup_hash

bigint

dedup-exact、dedup-fuzzy

文本指纹值。

__dedup_weight

integer

dedup-exact、dedup-fuzzy

文本长度(去重时保留最长文本)。

__dedup_rnk

integer

dedup-exact、dedup-fuzzy

组内排名(去重后恒为 1)。

__dedup_emb

array(double)

dedup-semantic

Embedding 向量(可被 semantic-cluster 复用)。

__dedup_rid

bigint

dedup-semantic

批内行标识。

__cluster_id

bigint

semantic-cluster

聚类簇编号(从 0 开始)。

{as} / __llm_result

varchar / json

llm-call

LLM 调用结果。

{as} / __agentic_result

varchar

agentic-call

数字员工回复。

{as} / {field}_embedding

array(double)

embedding

Embedding 向量。

{as} / __doc_stats

json

doc-stats

文档统计 JSON。

说明

扩展列可被下游算子直接引用。典型复用:dedup-semantic__dedup_emb 可作为 semantic-clusterfield 输入。

全局去重

三种去重算子均支持全局去重模式。

工作机制

设置 global=true 并指定 workspacedataset 后,去重范围扩展为当前批次与 Dataset 历史数据的联合比对:

  1. 当前批次数据先做批内去重。

  2. 去重后的结果与 Dataset 中已有数据做增量比对。

  3. 仅保留 Dataset 中不存在的"全新"记录。

配置要求

参数

说明

global

设为 true

workspace

目标 Dataset 所在的工作空间。

dataset

目标 Dataset 名称。

column_name

dedup-semantic 可选,指定 Dataset 中用于语义比对的列名。

适用场景

  • 增量数据入库:每批新数据需与历史数据做去重,确保 Dataset 无重复。

  • 推荐顺序:精确去重(全局)- 近似去重(全局)- 语义去重(全局),逐级收敛。