PyPaimon与Ray Data

更新时间:
复制为 MD 格式

本文介绍如何通过 PyPaimon 与 Ray Data 对 DLF Paimon 表进行分布式并行读写,适用于大规模数据处理和 ML 训练管线场景。

概述

PyPaimon 是 Paimon 的原生 Python SDK,支持 Python 生态(Pandas、PyArrow、PyTorch、Ray)直接访问 DLF Paimon 表,无需依赖 JVM。在多模态数据湖场景中,PyPaimon 是连接 AI 推理管线与数据湖的关键桥梁。

Ray Data 是 Ray 生态的分布式数据处理框架。PyPaimon 内置 Ray Data 集成,通过 to_ray()write_ray() API 在 Ray 集群上对 Paimon 表进行分布式并行读写,适合大规模数据处理和 ML 训练管线。

前提条件

数据准备

import pyarrow as pa
from pypaimon import CatalogFactory, Schema

# 连接 DLF Catalog
catalog = CatalogFactory.create({
    'metastore': 'rest',
    'uri': 'http://${region_id}-vpc.dlf.aliyuncs.com',       
    'warehouse': '${catalog_name}',                           
    'dlf.region': '${region_id}',                            
    'token.provider': 'dlf',
    'dlf.access-key-id': '<AK>',
    'dlf.access-key-secret': '<SK>',
})

# 建表并写入测试数据
pa_schema = pa.schema([
    ('id', pa.int64()),
    ('name', pa.string()),
    ('category', pa.string()),
    ('score', pa.float64()),
    ('value', pa.float64()),
])
schema = Schema.from_pyarrow_schema(pa_schema)
catalog.create_table('my_db.my_table', schema, True)

catalog.drop_table('my_db.target_table', True)
catalog.create_table('my_db.target_table', schema, True)

table = catalog.get_table('my_db.my_table')
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

table_write.write_arrow(pa.Table.from_pydict({
    'id': list(range(1, 101)),
    'name': [f'item_{i}' for i in range(1, 101)],
    'category': ['A', 'B'] * 50,
    'score': [float(i) for i in range(1, 101)],
    'value': [float(i) * 0.1 for i in range(1, 101)],
}))
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

Ray Data 分布式读取

基本用法

import ray

ray.init()  # 本地模式,或 ray.init(address='ray://cluster:10001') 连接远程集群

table = catalog.get_table('my_db.my_table')
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()

# 转换为 Ray Dataset(自动并行读取)
ray_dataset = table_read.to_ray(splits)

print(ray_dataset)
# MaterializedDataset(num_blocks=3, num_rows=100, schema={id: int64, name: string, category: string, score: double, value: double})

# Ray Data 操作
print(ray_dataset.count())
print(ray_dataset.take(5))
df = ray_dataset.to_pandas()

并行度控制

to_ray() 会自动将 splits 均匀分配到各 Ray Task 中。

# 指定输出 block 数量
ray_dataset = table_read.to_ray(splits, override_num_blocks=8)

# 限制并发 Task 数
ray_dataset = table_read.to_ray(splits, concurrency=4)

# 配置 Ray remote 参数
ray_dataset = table_read.to_ray(
    splits,
    override_num_blocks=8,
    ray_remote_args={'num_cpus': 2, 'max_retries': 3}
)

to_ray() 参数说明:

参数

类型

说明

override_num_blocks

int

指定输出的 block 数量。

concurrency

int

限制并发 Ray Task 数。

ray_remote_args

dict

Ray remote 参数,如 num_cpus、max_retries 等。

Block 大小配置

当 Paimon split 数据量较大时,可能超出 Ray 默认的 128 MB block 限制:

from ray.data import DataContext

ctx = DataContext.get_current()
ctx.target_max_block_size = 256 * 1024 * 1024  # 256MB
ray_dataset = table_read.to_ray(splits)

数据处理管线

# 过滤
filtered = ray_dataset.filter(lambda row: row['score'] > 80)

# 转换
mapped = ray_dataset.map(lambda row: {**row, 'score_normalized': row['score'] / 100.0})

# 分组聚合
grouped = ray_dataset.groupby('category').sum('value')

# 转换为其他格式
df = ray_dataset.to_pandas()
arrow_table = ray_dataset.to_arrow_refs()

与谓词下推和投影结合

在 Paimon 层面做谓词下推和列裁剪,可减少 Ray 读取的数据量。

# 在Paimon层面做谓词下推和列裁剪,减少Ray读取数据量
read_builder = table.new_read_builder()
pb = read_builder.new_predicate_builder()
read_builder = read_builder \
    .with_filter(pb.equal('category', 'A')) \
    .with_projection(['id', 'name', 'value'])

splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()
ray_dataset = table_read.to_ray(splits)

Ray Data 分布式写入

write_ray() 内部自动处理 commit,不需要手动调用 prepare_commit()commit()

import ray

# 从源表读取,经 Ray 处理后写入目标表
source_table = catalog.get_table('my_db.source_table')
read_builder = source_table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()

# 1. 转为 Ray Dataset
ray_dataset = table_read.to_ray(splits)

# 2. Ray 数据处理
ray_dataset = ray_dataset.filter(lambda row: row['score'] > 80)

# 3. 写入目标表
target_table = catalog.get_table('my_db.target_table')
write_builder = target_table.new_batch_write_builder()
table_write = write_builder.new_write()
table_write.write_ray(ray_dataset, overwrite=False, concurrency=4)

# 4. 关闭(write_ray 已自动提交,无需手动 commit)
table_write.close()

write_ray() 参数说明:

参数

类型

说明

overwrite

bool

是否覆盖已有数据。默认 False。

concurrency

int

并发写入 Task 数。

ray_remote_args

dict

Ray remote 参数。