Use PyPaimon with Ray Data to perform distributed, parallel reads from and writes to Data Lake Formation (DLF) Paimon tables, ideal for large-scale data processing and ML training pipelines.
Overview
PyPaimon is the native Python SDK for Paimon. It provides direct access to DLF Paimon tables from the Python ecosystem, including Pandas, PyArrow, PyTorch, and Ray, with no Java Virtual Machine (JVM) dependency. In multi-modal data lake scenarios, PyPaimon serves as a bridge between AI inference pipelines and the data lake.
Ray Data is a distributed data processing framework within the Ray ecosystem. PyPaimon integrates with Ray Data through the to_ray() and write_ray() APIs, enabling distributed, parallel reads from and writes to Paimon tables across a Ray cluster.
Prerequisites
-
Create a new data catalog.
-
Python 3.8 or later.
-
Install pypaimon-1.5.dev0.tar.gz or a later version.
-
Install Ray (
pip3 install ray).
Prepare the data
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
# Connect to the DLF catalog
catalog = CatalogFactory.create({
'metastore': 'rest',
'uri': 'http://${region_id}-vpc.dlf.aliyuncs.com',
'warehouse': '${catalog_name}',
'dlf.region': '${region_id}',
'token.provider': 'dlf',
'dlf.access-key-id': '<AK>',
'dlf.access-key-secret': '<SK>',
})
# Create a table and write sample data
pa_schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('category', pa.string()),
('score', pa.float64()),
('value', pa.float64()),
])
schema = Schema.from_pyarrow_schema(pa_schema)
catalog.create_table('my_db.my_table', schema, True)
catalog.drop_table('my_db.target_table', True)
catalog.create_table('my_db.target_table', schema, True)
table = catalog.get_table('my_db.my_table')
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_arrow(pa.Table.from_pydict({
'id': list(range(1, 101)),
'name': [f'item_{i}' for i in range(1, 101)],
'category': ['A', 'B'] * 50,
'score': [float(i) for i in range(1, 101)],
'value': [float(i) * 0.1 for i in range(1, 101)],
}))
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
Distributed reads with Ray Data
Basic usage
import ray
ray.init() # Initialize in local mode, or connect to a remote cluster with ray.init(address='ray://cluster:10001')
table = catalog.get_table('my_db.my_table')
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()
# Convert to a Ray Dataset (reads in parallel automatically)
ray_dataset = table_read.to_ray(splits)
print(ray_dataset)
# MaterializedDataset(num_blocks=3, num_rows=100, schema={id: int64, name: string, category: string, score: double, value: double})
# Ray Data operations
print(ray_dataset.count())
print(ray_dataset.take(5))
df = ray_dataset.to_pandas()
Control parallelism
The to_ray() method automatically distributes splits evenly among Ray tasks.
# Specify the number of output blocks
ray_dataset = table_read.to_ray(splits, override_num_blocks=8)
# Limit the number of concurrent tasks
ray_dataset = table_read.to_ray(splits, concurrency=4)
# Configure Ray remote arguments
ray_dataset = table_read.to_ray(
splits,
override_num_blocks=8,
ray_remote_args={'num_cpus': 2, 'max_retries': 3}
)
to_ray() parameters:
|
Parameter |
Type |
Description |
|
override_num_blocks |
int |
The number of output blocks. |
|
concurrency |
int |
The maximum number of concurrent Ray tasks. |
|
ray_remote_args |
dict |
Ray remote arguments, such as |
Configure block size
If a Paimon split is large, it might exceed Ray's default 128 MB block limit. You can increase the limit as follows:
from ray.data import DataContext
ctx = DataContext.get_current()
ctx.target_max_block_size = 256 * 1024 * 1024 # 256MB
ray_dataset = table_read.to_ray(splits)
Data processing pipeline
# Filter
filtered = ray_dataset.filter(lambda row: row['score'] > 80)
# Transform
mapped = ray_dataset.map(lambda row: {**row, 'score_normalized': row['score'] / 100.0})
# Group and aggregate
grouped = ray_dataset.groupby('category').sum('value')
# Convert to other formats
df = ray_dataset.to_pandas()
arrow_table = ray_dataset.to_arrow_refs()
Predicate pushdown and projection
Apply predicate pushdown and column pruning at the Paimon layer to reduce the volume of data read by Ray.
read_builder = table.new_read_builder()
pb = read_builder.new_predicate_builder()
read_builder = read_builder \
.with_filter(pb.equal('category', 'A')) \
.with_projection(['id', 'name', 'value'])
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()
ray_dataset = table_read.to_ray(splits)
Distributed writes with Ray Data
The write_ray() method handles commits automatically, so you do not need to call prepare_commit() or commit().
import ray
# Read from a source table, process with Ray, and write to a target table.
source_table = catalog.get_table('my_db.my_table')
read_builder = source_table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
table_read = read_builder.new_read()
# 1. Convert to a Ray Dataset.
ray_dataset = table_read.to_ray(splits)
# 2. Process the data with Ray.
ray_dataset = ray_dataset.filter(lambda row: row['score'] > 80)
# 3. Write to the target table.
target_table = catalog.get_table('my_db.target_table')
write_builder = target_table.new_batch_write_builder()
table_write = write_builder.new_write()
table_write.write_ray(ray_dataset, overwrite=False, concurrency=4)
# 4. Close the writer. Commits are handled automatically by `write_ray()`.
table_write.close()
write_ray() parameters:
|
Parameter |
Type |
Description |
|
overwrite |
bool |
Whether to overwrite existing data. Default: |
|
concurrency |
int |
The number of concurrent write tasks. |
|
ray_remote_args |
dict |
Ray remote arguments. |