PyPaimon and Ray Data

更新时间:
复制 MD 格式

Use PyPaimon with Ray Data to perform distributed, parallel reads from and writes to Data Lake Formation (DLF) Paimon tables, ideal for large-scale data processing and ML training pipelines.

Overview

PyPaimon is the native Python SDK for Paimon. It provides direct access to DLF Paimon tables from the Python ecosystem, including Pandas, PyArrow, PyTorch, and Ray, with no Java Virtual Machine (JVM) dependency. In multi-modal data lake scenarios, PyPaimon serves as a bridge between AI inference pipelines and the data lake.

Ray Data is a distributed data processing framework within the Ray ecosystem. PyPaimon integrates with Ray Data through the to_ray() and write_ray() APIs, enabling distributed, parallel reads from and writes to Paimon tables across a Ray cluster.

Prerequisites

Prepare the data

import pyarrow as pa
from pypaimon import CatalogFactory, Schema

# Connect to the DLF catalog
catalog = CatalogFactory.create({
    'metastore': 'rest',
    'uri': 'http://${region_id}-vpc.dlf.aliyuncs.com',       
    'warehouse': '${catalog_name}',                           
    'dlf.region': '${region_id}',                            
    'token.provider': 'dlf',
    'dlf.access-key-id': '<AK>',
    'dlf.access-key-secret': '<SK>',
})

# Create a table and write sample data
pa_schema = pa.schema([
    ('id', pa.int64()),
    ('name', pa.string()),
    ('category', pa.string()),
    ('score', pa.float64()),
    ('value', pa.float64()),
])
schema = Schema.from_pyarrow_schema(pa_schema)
catalog.create_table('my_db.my_table', schema, True)

catalog.drop_table('my_db.target_table', True)
catalog.create_table('my_db.target_table', schema, True)

table = catalog.get_table('my_db.my_table')
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

table_write.write_arrow(pa.Table.from_pydict({
    'id': list(range(1, 101)),
    'name': [f'item_{i}' for i in range(1, 101)],
    'category': ['A', 'B'] * 50,
    'score': [float(i) for i in range(1, 101)],
    'value': [float(i) * 0.1 for i in range(1, 101)],
}))
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

Distributed reads with Ray Data

Basic usage

import ray

ray.init()  # Initialize in local mode, or connect to a remote cluster with ray.init(address='ray://cluster:10001')

table = catalog.get_table('my_db.my_table')
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()

# Convert to a Ray Dataset (reads in parallel automatically)
ray_dataset = table_read.to_ray(splits)

print(ray_dataset)
# MaterializedDataset(num_blocks=3, num_rows=100, schema={id: int64, name: string, category: string, score: double, value: double})

# Ray Data operations
print(ray_dataset.count())
print(ray_dataset.take(5))
df = ray_dataset.to_pandas()

Control parallelism

The to_ray() method automatically distributes splits evenly among Ray tasks.

# Specify the number of output blocks
ray_dataset = table_read.to_ray(splits, override_num_blocks=8)

# Limit the number of concurrent tasks
ray_dataset = table_read.to_ray(splits, concurrency=4)

# Configure Ray remote arguments
ray_dataset = table_read.to_ray(
    splits,
    override_num_blocks=8,
    ray_remote_args={'num_cpus': 2, 'max_retries': 3}
)

to_ray() parameters:

Parameter

Type

Description

override_num_blocks

int

The number of output blocks.

concurrency

int

The maximum number of concurrent Ray tasks.

ray_remote_args

dict

Ray remote arguments, such as num_cpus and max_retries.

Configure block size

If a Paimon split is large, it might exceed Ray's default 128 MB block limit. You can increase the limit as follows:

from ray.data import DataContext

ctx = DataContext.get_current()
ctx.target_max_block_size = 256 * 1024 * 1024  # 256MB
ray_dataset = table_read.to_ray(splits)

Data processing pipeline

# Filter
filtered = ray_dataset.filter(lambda row: row['score'] > 80)

# Transform
mapped = ray_dataset.map(lambda row: {**row, 'score_normalized': row['score'] / 100.0})

# Group and aggregate
grouped = ray_dataset.groupby('category').sum('value')

# Convert to other formats
df = ray_dataset.to_pandas()
arrow_table = ray_dataset.to_arrow_refs()

Predicate pushdown and projection

Apply predicate pushdown and column pruning at the Paimon layer to reduce the volume of data read by Ray.

read_builder = table.new_read_builder()
pb = read_builder.new_predicate_builder()
read_builder = read_builder \
    .with_filter(pb.equal('category', 'A')) \
    .with_projection(['id', 'name', 'value'])

splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()
ray_dataset = table_read.to_ray(splits)

Distributed writes with Ray Data

The write_ray() method handles commits automatically, so you do not need to call prepare_commit() or commit().

import ray

# Read from a source table, process with Ray, and write to a target table.
source_table = catalog.get_table('my_db.my_table')
read_builder = source_table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()

# 1. Convert to a Ray Dataset.
ray_dataset = table_read.to_ray(splits)

# 2. Process the data with Ray.
ray_dataset = ray_dataset.filter(lambda row: row['score'] > 80)

# 3. Write to the target table.
target_table = catalog.get_table('my_db.target_table')
write_builder = target_table.new_batch_write_builder()
table_write = write_builder.new_write()
table_write.write_ray(ray_dataset, overwrite=False, concurrency=4)

# 4. Close the writer. Commits are handled automatically by `write_ray()`.
table_write.close()

write_ray() parameters:

Parameter

Type

Description

overwrite

bool

Whether to overwrite existing data. Default: False.

concurrency

int

The number of concurrent write tasks.

ray_remote_args

dict

Ray remote arguments.