数据演化与行级追踪

更新时间:
复制为 MD 格式

本文介绍 DLF Paimon 表的数据演化(Data Evolution)与行级追踪(Row Tracking)功能,包括功能原理、前置条件及通过 EMR Serverless Spark 和 PyPaimon 的使用方法。

功能概述

行级追踪

行级追踪为 Paimon Append 表的每一行数据分配全局唯一的行标识符(RowId)。RowId 在数据写入时自动生成,贯穿数据的整个生命周期,是数据演化功能的基础。

数据演化

在数据湖场景中,随着业务迭代和 AI 模型升级,表结构频繁变更。传统方式下,新增列后回填数据需要重写整个数据文件,产生大量 I/O 和计算开销。

数据演化模式解决了这一问题:允许对 Append 表进行部分列更新,无需重写原有数据文件。新列数据写入独立文件,读取时通过 RowId 自动合并,对上层查询完全透明。数据演化是 Blob 存储等多模态能力的基础。

核心优势:

  • 高效部分列更新:仅写入变更列,避免全文件重写带来的 I/O 放大。

  • 存储友好:原始文件保持不变,新增列数据追加到独立文件。

  • 读取性能无损:引擎读取时按 RowId 自动合并各列,上层查询无感知。

前置条件

创建表时需同时开启以下两个选项:

选项

参数值

说明

row-tracking.enabled

true

启用行级追踪,为每行分配全局唯一 RowId。

data-evolution.enabled

true

启用数据演化模式。

关于如何创建和管理数据表,请参见 数据表

通过 EMR Serverless Spark 使用

关于 EMR Serverless Spark 对接 DLF 的基础配置,请参见Serverless Spark 访问 DLF

EMR Serverless Spark 版本需为 esr-4.7.0 及以上。

建表

CREATE TABLE my_db.target_table (
    id INT NOT NULL,
    name STRING,
    age INT
) TBLPROPERTIES (
    'row-tracking.enabled' = 'true',
    'data-evolution.enabled' = 'true'
);

INSERT INTO my_db.target_table VALUES (1, 'Alice', 30), (2, 'Bob', 25);

部分列更新(MERGE INTO)

通过 MERGE INTO 仅更新指定列。数据演化模式下,只有变更列写入新文件,原有数据文件保持不变。

CREATE TABLE my_db.source_table (
  id INT NOT NULL, 
  age INT
);
INSERT INTO my_db.source_table VALUES (1, 31), (2, 26), (3, 28);

MERGE INTO my_db.target_table AS t
USING my_db.source_table AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET t.age = s.age
WHEN NOT MATCHED THEN INSERT (id, name, age) VALUES (s.id, '', s.age);

SELECT * FROM my_db.target_table;

+----+-------+-----+
| id | name  | age |
+----+-------+-----+
| 1  | Alice | 31  |
| 2  | Bob   | 26  |
| 3  |       | 28  |
+----+-------+-----+

执行后,仅 age 列的数据写入新文件,id 和 name 列的原始文件不变。查询时自动合并。

通过 PyPaimon 使用

PyPaimon 提供三种数据演化写入方式,适用于 Python 数据处理和 AI 推理场景。

使用数据演化功能需安装 PyPaimon 1.5.dev0 及以上版本。请下载并安装pypaimon-1.5.dev0.tar.gz

数据准备

import pyarrow as pa
from pypaimon import CatalogFactory, Schema
from pypaimon.schema.data_types import AtomicType, DataField

# 连接 DLF Catalog
catalog = CatalogFactory.create({
    'metastore': 'rest',
    'uri': 'https://${regionId}-vpc.dlf.aliyuncs.com',
    'warehouse': 'my_catalog',
    'token.provider': 'dlf',
    'dlf.access-key-id': 'ak',
    'dlf.access-key-secret': 'sk',
    # 公网uri接入时配置
   # 'dlf.signing-algorithm': 'openapi',                   # 公网端点需设为 openapi
   # 'dlf.oss-endpoint': 'https://oss-${regionId}.aliyuncs.com',  # 公网 OSS 端点
})

# 建表
schema = Schema(
    fields=[
        DataField(0, 'id', AtomicType('INT NOT NULL')),
        DataField(1, 'name', AtomicType('STRING')),
        DataField(2, 'age', AtomicType('INT')),
    ],
    options={
        'row-tracking.enabled': 'true',
        'data-evolution.enabled': 'true',
    },
)
catalog.create_table('my_db.target_table', schema, True)
table = catalog.get_table('my_db.target_table')

# 写入初始数据
write_builder = table.new_batch_write_builder()
write = write_builder.new_write()
commit = write_builder.new_commit()

init_data = pa.RecordBatch.from_pydict({
    'id': [1, 2],
    'name': ['Alice', 'Bob'],
    'age': [30, 25],
}, schema=pa.schema([
    ('id', pa.int32()),
    ('name', pa.string()),
    ('age', pa.int32()),
]))

write.write_arrow_batch(init_data)
cmts = write.prepare_commit()
commit.commit(cmts)
commit.close()

# 验证写入
read_builder = table.new_read_builder()
scanner = read_builder.new_scan()
reader = read_builder.new_read()
df = reader.to_arrow(scanner.plan().splits()).to_pandas()
print(df)
#    id   name  age
# 0   1  Alice   30
# 1   2    Bob   25

按 RowId 更新列

读取数据时获取 _ROW_ID,在 Python 中计算新值后回写。

write_builder = table.new_batch_write_builder()
table_update = write_builder.new_update().with_update_type(['age'])
table_commit = write_builder.new_commit()

update_data = pa.Table.from_pydict({
    '_ROW_ID': [0, 1],
    'age': [31, 26],
}, schema=pa.schema([
    ('_ROW_ID', pa.int64()),
    ('age', pa.int32()),
]))

cmts = table_update.update_by_arrow_with_row_id(update_data)
table_commit.commit(cmts)
table_commit.close()

# 验证更新
reader = table.new_read_builder()
print(reader.new_read().to_pandas(reader.new_scan().plan().splits()))
#    id   name  age
# 0   1  Alice   31
# 1   2    Bob   26

按业务键 Upsert

无需手动管理 _ROW_ID,直接通过业务键匹配更新或插入。

write_builder = table.new_batch_write_builder()
table_update = write_builder.new_update()
table_commit = write_builder.new_commit()

upsert_data = pa.Table.from_pydict({
    'id': [1, 3],
    'name': ['Alice', 'Charlie'],
    'age': [35, 28],
}, schema=pa.schema([
    ('id', pa.int32()),
    ('name', pa.string()),
    ('age', pa.int32()),
]))

# 按 id 列匹配:id=1 更新,id=3 插入
cmts = table_update.upsert_by_arrow_with_key(upsert_data, upsert_keys=['id'])
table_commit.commit(cmts)
table_commit.close()

# 验证 Upsert
reader = table.new_read_builder()
print(reader.new_read().to_pandas(reader.new_scan().plan().splits()))
#    id    name  age
# 0   1   Alice   35
# 1   2     Bob   26
# 2   3  Charlie   28

分片更新(Shard Update)

适用于大规模列计算场景(如 AI 特征回填):按分片读取数据、计算新列值、写回。

from pypaimon.schema.schema_change import SchemaChange
from pypaimon.schema.data_types import ArrayType, AtomicType

# 新增 embedding 列
catalog.alter_table('my_db.target_table', [
    SchemaChange.add_column('embedding', ArrayType(True, AtomicType('FLOAT')))
])
table = catalog.get_table('my_db.target_table')  # 重新获取表(Schema 已变更)

write_builder = table.new_batch_write_builder()
update = write_builder.new_update()
update.with_read_projection(['name'])  # 读取哪些列
update.with_update_type(['embedding'])  # 写入哪些列

# 单分片模式(可设置多分片并行)
upd = update.new_shard_updator(shard_num=0, total_shard_count=1)
reader = upd.arrow_reader()

for batch in reader:
    names = batch.column('name').to_pylist()
    # 用模型生成 embedding(此处为示意)
    embeddings = [[float(i) * 0.1] * 4 for i in range(len(names))]
    upd.update_by_arrow_batch(
        pa.RecordBatch.from_pydict(
            {'embedding': embeddings},
            schema=pa.schema([('embedding', pa.list_(pa.float32()))])
        )
    )

commit_messages = upd.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
commit.close()

# 验证分片更新
reader = table.new_read_builder()
print(reader.new_read().to_pandas(reader.new_scan().plan().splits()))

文件组织原理

数据演化通过 RowId 将文件组织为文件组(File Group):

  1. 写入时:MERGE INTO 仅写入变更列到新文件,原始文件不变。每个文件记录 firstRowId 和 writeCols 元数据。

  2. 读取时:引擎按 firstRowId 将相同行范围的文件分组,合并各列得到完整行。

table/bucket-0/
├── data-uuid-0.parquet      # 原始写入:id, name, age
├── data-uuid-1.parquet      # MERGE INTO 更新:仅 age 列
└── ...

读取时,firstRowId 相同的文件自动合并字段,呈现统一视图。