project 是 Pipeline 的起始节点,用于从原始数据中选取需要的字段并重命名,声明 Pipeline 的输入 Schema。
字段选取与重命名
project 节点从原始数据中选取指定字段,并重命名为 Pipeline 内部统一的字段名称。通过 project 节点,可以显式声明 Pipeline 使用哪些字段、每个字段对应原始数据中的哪一列。后续所有节点均基于 project 定义的字段名进行操作,与原始数据的列名解耦。
适用场景:
从原始日志或数据表中提取需要的列,并重命名为 Pipeline 标准字段名。
丢弃不需要的列,减少数据传输量。
统一不同数据源的字段命名差异。
节点配置
project 节点的配置格式如下:
{
"id": "node_1",
"type": "project",
"parameters": {
"<新字段名>": "<原始字段名>",
"<新字段名>": "<原始字段名>"
}
}参数说明
参数 | 类型 | 是否必填 | 默认值 | 说明 |
| String | 是 | 无 | 键为映射后的字段名,值为原始数据中的字段名。至少包含一组映射。 |
parameters 是一个动态 key-value 映射,每个键是 Pipeline 中使用的字段名,对应的值是原始数据中的字段名。
输入和输出
输入要求
原始数据源(LogStore 或其他数据源)。
值所引用的原始字段必须存在于数据源中。
输出说明
列名 | 类型 | 来源 | 说明 |
| 与原始字段类型一致 | 映射 | 从原始字段投影并重命名。 |
行数变化:M → N(M = N)。project 节点不改变行数,仅做列选择与重命名。
使用示例
示例 1:简单字段选择
以下示例中,原始数据中的 question 和 answer 字段保持原名透传。
{
"id": "n1",
"type": "project",
"parameters": {
"question": "question",
"answer": "answer"
}
}示例 2:字段重命名
以下示例将原始数据的 user_query 列映射为 question,bot_response 映射为 answer,model_name 映射为 model。
{
"id": "n1",
"type": "project",
"parameters": {
"question": "user_query",
"answer": "bot_response",
"model": "model_name"
}
}示例 3:与其他节点组合使用
以下示例展示 project 节点与其他节点的组合使用。先通过 project 映射字段,后续节点基于映射后的字段名进行操作。
{
"nodes": [
{
"id": "n1",
"type": "project",
"parameters": {
"question": "a",
"input": "b",
"output": "c"
}
},
{
"id": "n2",
"type": "dedup-exact",
"parameters": {
"field": "question"
}
}
]
}在此示例中,project 节点将原始字段 a 映射为 question,后续 dedup-exact 节点基于映射后的 question 字段进行去重。
使用建议
建议在 Pipeline 开头使用 project 明确输入 Schema,提高 Pipeline 的可读性和可维护性。
仅选择下游实际需要的列,减少后续节点的数据扫描量。
字段重命名通过
"新名": "原名"的键值对实现,便于统一列名格式。后续可接extend(字段扩展)节点做字段派生计算,再接where(筛选过滤)节点做条件过滤。
边界情况说明:
场景 | 行为 |
| 校验失败,至少需要一组字段选取。 |
值引用的原始字段不存在 | 运行时报错,提示字段不存在。 |
多个键映射到同一原始字段 | 允许,产生内容相同的多个列。 |