project(字段选取)

更新时间:
复制为 MD 格式

project 是 Pipeline 的起始节点,用于从原始数据中选取需要的字段并重命名,声明 Pipeline 的输入 Schema。

字段选取与重命名

project 节点从原始数据中选取指定字段,并重命名为 Pipeline 内部统一的字段名称。通过 project 节点,可以显式声明 Pipeline 使用哪些字段、每个字段对应原始数据中的哪一列。后续所有节点均基于 project 定义的字段名进行操作,与原始数据的列名解耦。

适用场景:

  • 从原始日志或数据表中提取需要的列,并重命名为 Pipeline 标准字段名。

  • 丢弃不需要的列,减少数据传输量。

  • 统一不同数据源的字段命名差异。

节点配置

project 节点的配置格式如下:

{
  "id": "node_1",
  "type": "project",
  "parameters": {
    "<新字段名>": "<原始字段名>",
    "<新字段名>": "<原始字段名>"
  }
}

参数说明

参数

类型

是否必填

默认值

说明

<新字段名>

String

键为映射后的字段名,值为原始数据中的字段名。至少包含一组映射。

说明

parameters 是一个动态 key-value 映射,每个键是 Pipeline 中使用的字段名,对应的值是原始数据中的字段名。

输入和输出

输入要求

  • 原始数据源(LogStore 或其他数据源)。

  • 值所引用的原始字段必须存在于数据源中。

输出说明

列名

类型

来源

说明

parameters 中的每个键

与原始字段类型一致

映射

从原始字段投影并重命名。

行数变化:M → N(M = N)。project 节点不改变行数,仅做列选择与重命名。

使用示例

示例 1:简单字段选择

以下示例中,原始数据中的 questionanswer 字段保持原名透传。

{
  "id": "n1",
  "type": "project",
  "parameters": {
    "question": "question",
    "answer": "answer"
  }
}

示例 2:字段重命名

以下示例将原始数据的 user_query 列映射为 questionbot_response 映射为 answermodel_name 映射为 model

{
  "id": "n1",
  "type": "project",
  "parameters": {
    "question": "user_query",
    "answer": "bot_response",
    "model": "model_name"
  }
}

示例 3:与其他节点组合使用

以下示例展示 project 节点与其他节点的组合使用。先通过 project 映射字段,后续节点基于映射后的字段名进行操作。

{
  "nodes": [
    {
      "id": "n1",
      "type": "project",
      "parameters": {
        "question": "a",
        "input": "b",
        "output": "c"
      }
    },
    {
      "id": "n2",
      "type": "dedup-exact",
      "parameters": {
        "field": "question"
      }
    }
  ]
}

在此示例中,project 节点将原始字段 a 映射为 question,后续 dedup-exact 节点基于映射后的 question 字段进行去重。

使用建议

  • 建议在 Pipeline 开头使用 project 明确输入 Schema,提高 Pipeline 的可读性和可维护性。

  • 仅选择下游实际需要的列,减少后续节点的数据扫描量。

  • 字段重命名通过 "新名": "原名" 的键值对实现,便于统一列名格式。

  • 后续可接extend(字段扩展)节点做字段派生计算,再接where(筛选过滤)节点做条件过滤。

边界情况说明:

场景

行为

parameters 为空

校验失败,至少需要一组字段选取。

值引用的原始字段不存在

运行时报错,提示字段不存在。

多个键映射到同一原始字段

允许,产生内容相同的多个列。