本文介绍 DLF Paimon 表的数据演化(Data Evolution)与行级追踪(Row Tracking)功能,包括功能原理、前置条件及通过 EMR Serverless Spark 和 PyPaimon 的使用方法。
功能概述
行级追踪
行级追踪为 Paimon Append 表的每一行数据分配全局唯一的行标识符(RowId)。RowId 在数据写入时自动生成,贯穿数据的整个生命周期,是数据演化功能的基础。
数据演化
在数据湖场景中,随着业务迭代和 AI 模型升级,表结构频繁变更。传统方式下,新增列后回填数据需要重写整个数据文件,产生大量 I/O 和计算开销。
数据演化模式解决了这一问题:允许对 Append 表进行部分列更新,无需重写原有数据文件。新列数据写入独立文件,读取时通过 RowId 自动合并,对上层查询完全透明。数据演化是 Blob 存储等多模态能力的基础。
核心优势:
高效部分列更新:仅写入变更列,避免全文件重写带来的 I/O 放大。
存储友好:原始文件保持不变,新增列数据追加到独立文件。
读取性能无损:引擎读取时按 RowId 自动合并各列,上层查询无感知。
前置条件
创建表时需同时开启以下两个选项:
选项 | 参数值 | 说明 |
row-tracking.enabled | true | 启用行级追踪,为每行分配全局唯一 RowId。 |
data-evolution.enabled | true | 启用数据演化模式。 |
关于如何创建和管理数据表,请参见 数据表。
通过 EMR Serverless Spark 使用
关于 EMR Serverless Spark 对接 DLF 的基础配置,请参见Serverless Spark 访问 DLF。
EMR Serverless Spark 版本需为 esr-4.7.0 及以上。
建表
CREATE TABLE my_db.target_table (
id INT NOT NULL,
name STRING,
age INT
) TBLPROPERTIES (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true'
);
INSERT INTO my_db.target_table VALUES (1, 'Alice', 30), (2, 'Bob', 25);部分列更新(MERGE INTO)
通过 MERGE INTO 仅更新指定列。数据演化模式下,只有变更列写入新文件,原有数据文件保持不变。
CREATE TABLE my_db.source_table (
id INT NOT NULL,
age INT
);
INSERT INTO my_db.source_table VALUES (1, 31), (2, 26), (3, 28);
MERGE INTO my_db.target_table AS t
USING my_db.source_table AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET t.age = s.age
WHEN NOT MATCHED THEN INSERT (id, name, age) VALUES (s.id, '', s.age);
SELECT * FROM my_db.target_table;
+----+-------+-----+
| id | name | age |
+----+-------+-----+
| 1 | Alice | 31 |
| 2 | Bob | 26 |
| 3 | | 28 |
+----+-------+-----+执行后,仅 age 列的数据写入新文件,id 和 name 列的原始文件不变。查询时自动合并。
通过 PyPaimon 使用
PyPaimon 提供三种数据演化写入方式,适用于 Python 数据处理和 AI 推理场景。
使用数据演化功能需安装 PyPaimon 1.5.dev0 及以上版本。请下载并安装pypaimon-1.5.dev0.tar.gz。
数据准备
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
from pypaimon.schema.data_types import AtomicType, DataField
# 连接 DLF Catalog
catalog = CatalogFactory.create({
'metastore': 'rest',
'uri': 'https://${regionId}-vpc.dlf.aliyuncs.com',
'warehouse': 'my_catalog',
'token.provider': 'dlf',
'dlf.access-key-id': 'ak',
'dlf.access-key-secret': 'sk',
# 公网uri接入时配置
# 'dlf.signing-algorithm': 'openapi', # 公网端点需设为 openapi
# 'dlf.oss-endpoint': 'https://oss-${regionId}.aliyuncs.com', # 公网 OSS 端点
})
# 建表
schema = Schema(
fields=[
DataField(0, 'id', AtomicType('INT NOT NULL')),
DataField(1, 'name', AtomicType('STRING')),
DataField(2, 'age', AtomicType('INT')),
],
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
},
)
catalog.create_table('my_db.target_table', schema, True)
table = catalog.get_table('my_db.target_table')
# 写入初始数据
write_builder = table.new_batch_write_builder()
write = write_builder.new_write()
commit = write_builder.new_commit()
init_data = pa.RecordBatch.from_pydict({
'id': [1, 2],
'name': ['Alice', 'Bob'],
'age': [30, 25],
}, schema=pa.schema([
('id', pa.int32()),
('name', pa.string()),
('age', pa.int32()),
]))
write.write_arrow_batch(init_data)
cmts = write.prepare_commit()
commit.commit(cmts)
commit.close()
# 验证写入
read_builder = table.new_read_builder()
scanner = read_builder.new_scan()
reader = read_builder.new_read()
df = reader.to_arrow(scanner.plan().splits()).to_pandas()
print(df)
# id name age
# 0 1 Alice 30
# 1 2 Bob 25按 RowId 更新列
读取数据时获取 _ROW_ID,在 Python 中计算新值后回写。
write_builder = table.new_batch_write_builder()
table_update = write_builder.new_update().with_update_type(['age'])
table_commit = write_builder.new_commit()
update_data = pa.Table.from_pydict({
'_ROW_ID': [0, 1],
'age': [31, 26],
}, schema=pa.schema([
('_ROW_ID', pa.int64()),
('age', pa.int32()),
]))
cmts = table_update.update_by_arrow_with_row_id(update_data)
table_commit.commit(cmts)
table_commit.close()
# 验证更新
reader = table.new_read_builder()
print(reader.new_read().to_pandas(reader.new_scan().plan().splits()))
# id name age
# 0 1 Alice 31
# 1 2 Bob 26按业务键 Upsert
无需手动管理 _ROW_ID,直接通过业务键匹配更新或插入。
write_builder = table.new_batch_write_builder()
table_update = write_builder.new_update()
table_commit = write_builder.new_commit()
upsert_data = pa.Table.from_pydict({
'id': [1, 3],
'name': ['Alice', 'Charlie'],
'age': [35, 28],
}, schema=pa.schema([
('id', pa.int32()),
('name', pa.string()),
('age', pa.int32()),
]))
# 按 id 列匹配:id=1 更新,id=3 插入
cmts = table_update.upsert_by_arrow_with_key(upsert_data, upsert_keys=['id'])
table_commit.commit(cmts)
table_commit.close()
# 验证 Upsert
reader = table.new_read_builder()
print(reader.new_read().to_pandas(reader.new_scan().plan().splits()))
# id name age
# 0 1 Alice 35
# 1 2 Bob 26
# 2 3 Charlie 28
分片更新(Shard Update)
适用于大规模列计算场景(如 AI 特征回填):按分片读取数据、计算新列值、写回。
from pypaimon.schema.schema_change import SchemaChange
from pypaimon.schema.data_types import ArrayType, AtomicType
# 新增 embedding 列
catalog.alter_table('my_db.target_table', [
SchemaChange.add_column('embedding', ArrayType(True, AtomicType('FLOAT')))
])
table = catalog.get_table('my_db.target_table') # 重新获取表(Schema 已变更)
write_builder = table.new_batch_write_builder()
update = write_builder.new_update()
update.with_read_projection(['name']) # 读取哪些列
update.with_update_type(['embedding']) # 写入哪些列
# 单分片模式(可设置多分片并行)
upd = update.new_shard_updator(shard_num=0, total_shard_count=1)
reader = upd.arrow_reader()
for batch in reader:
names = batch.column('name').to_pylist()
# 用模型生成 embedding(此处为示意)
embeddings = [[float(i) * 0.1] * 4 for i in range(len(names))]
upd.update_by_arrow_batch(
pa.RecordBatch.from_pydict(
{'embedding': embeddings},
schema=pa.schema([('embedding', pa.list_(pa.float32()))])
)
)
commit_messages = upd.prepare_commit()
commit = write_builder.new_commit()
commit.commit(commit_messages)
commit.close()
# 验证分片更新
reader = table.new_read_builder()
print(reader.new_read().to_pandas(reader.new_scan().plan().splits()))文件组织原理
数据演化通过 RowId 将文件组织为文件组(File Group):
写入时:MERGE INTO 仅写入变更列到新文件,原始文件不变。每个文件记录
firstRowId和writeCols元数据。读取时:引擎按
firstRowId将相同行范围的文件分组,合并各列得到完整行。
table/bucket-0/
├── data-uuid-0.parquet # 原始写入:id, name, age
├── data-uuid-1.parquet # MERGE INTO 更新:仅 age 列
└── ...读取时,firstRowId 相同的文件自动合并字段,呈现统一视图。