本文介绍如何通过 PyPaimon 与 Ray Data 对 DLF Paimon 表进行分布式并行读写,适用于大规模数据处理和 ML 训练管线场景。
概述
PyPaimon 是 Paimon 的原生 Python SDK,支持 Python 生态(Pandas、PyArrow、PyTorch、Ray)直接访问 DLF Paimon 表,无需依赖 JVM。在多模态数据湖场景中,PyPaimon 是连接 AI 推理管线与数据湖的关键桥梁。
Ray Data 是 Ray 生态的分布式数据处理框架。PyPaimon 内置 Ray Data 集成,通过 to_ray() 和 write_ray() API 在 Ray 集群上对 Paimon 表进行分布式并行读写,适合大规模数据处理和 ML 训练管线。
前提条件
已创建新建数据目录。
Python 版本为 3.8 或更高。
已安装pypaimon-1.5.dev0.tar.gz及以上版本。
已安装 Ray(
pip3 install ray)。
数据准备
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
# 连接 DLF Catalog
catalog = CatalogFactory.create({
'metastore': 'rest',
'uri': 'http://${region_id}-vpc.dlf.aliyuncs.com',
'warehouse': '${catalog_name}',
'dlf.region': '${region_id}',
'token.provider': 'dlf',
'dlf.access-key-id': '<AK>',
'dlf.access-key-secret': '<SK>',
})
# 建表并写入测试数据
pa_schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('category', pa.string()),
('score', pa.float64()),
('value', pa.float64()),
])
schema = Schema.from_pyarrow_schema(pa_schema)
catalog.create_table('my_db.my_table', schema, True)
catalog.drop_table('my_db.target_table', True)
catalog.create_table('my_db.target_table', schema, True)
table = catalog.get_table('my_db.my_table')
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_arrow(pa.Table.from_pydict({
'id': list(range(1, 101)),
'name': [f'item_{i}' for i in range(1, 101)],
'category': ['A', 'B'] * 50,
'score': [float(i) for i in range(1, 101)],
'value': [float(i) * 0.1 for i in range(1, 101)],
}))
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()Ray Data 分布式读取
基本用法
import ray
ray.init() # 本地模式,或 ray.init(address='ray://cluster:10001') 连接远程集群
table = catalog.get_table('my_db.my_table')
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()
# 转换为 Ray Dataset(自动并行读取)
ray_dataset = table_read.to_ray(splits)
print(ray_dataset)
# MaterializedDataset(num_blocks=3, num_rows=100, schema={id: int64, name: string, category: string, score: double, value: double})
# Ray Data 操作
print(ray_dataset.count())
print(ray_dataset.take(5))
df = ray_dataset.to_pandas()并行度控制
to_ray() 会自动将 splits 均匀分配到各 Ray Task 中。
# 指定输出 block 数量
ray_dataset = table_read.to_ray(splits, override_num_blocks=8)
# 限制并发 Task 数
ray_dataset = table_read.to_ray(splits, concurrency=4)
# 配置 Ray remote 参数
ray_dataset = table_read.to_ray(
splits,
override_num_blocks=8,
ray_remote_args={'num_cpus': 2, 'max_retries': 3}
)to_ray() 参数说明:
参数 | 类型 | 说明 |
override_num_blocks | int | 指定输出的 block 数量。 |
concurrency | int | 限制并发 Ray Task 数。 |
ray_remote_args | dict | Ray remote 参数,如 num_cpus、max_retries 等。 |
Block 大小配置
当 Paimon split 数据量较大时,可能超出 Ray 默认的 128 MB block 限制:
from ray.data import DataContext
ctx = DataContext.get_current()
ctx.target_max_block_size = 256 * 1024 * 1024 # 256MB
ray_dataset = table_read.to_ray(splits)数据处理管线
# 过滤
filtered = ray_dataset.filter(lambda row: row['score'] > 80)
# 转换
mapped = ray_dataset.map(lambda row: {**row, 'score_normalized': row['score'] / 100.0})
# 分组聚合
grouped = ray_dataset.groupby('category').sum('value')
# 转换为其他格式
df = ray_dataset.to_pandas()
arrow_table = ray_dataset.to_arrow_refs()与谓词下推和投影结合
在 Paimon 层面做谓词下推和列裁剪,可减少 Ray 读取的数据量。
# 在Paimon层面做谓词下推和列裁剪,减少Ray读取数据量
read_builder = table.new_read_builder()
pb = read_builder.new_predicate_builder()
read_builder = read_builder \
.with_filter(pb.equal('category', 'A')) \
.with_projection(['id', 'name', 'value'])
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()
ray_dataset = table_read.to_ray(splits)Ray Data 分布式写入
write_ray() 内部自动处理 commit,不需要手动调用 prepare_commit() 和 commit()。
import ray
# 从源表读取,经 Ray 处理后写入目标表
source_table = catalog.get_table('my_db.source_table')
read_builder = source_table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()
# 1. 转为 Ray Dataset
ray_dataset = table_read.to_ray(splits)
# 2. Ray 数据处理
ray_dataset = ray_dataset.filter(lambda row: row['score'] > 80)
# 3. 写入目标表
target_table = catalog.get_table('my_db.target_table')
write_builder = target_table.new_batch_write_builder()
table_write = write_builder.new_write()
table_write.write_ray(ray_dataset, overwrite=False, concurrency=4)
# 4. 关闭(write_ray 已自动提交,无需手动 commit)
table_write.close()write_ray() 参数说明:
参数 | 类型 | 说明 |
overwrite | bool | 是否覆盖已有数据。默认 False。 |
concurrency | int | 并发写入 Task 数。 |
ray_remote_args | dict | Ray remote 参数。 |