This article explains how to use PyPaimon to create, read from, and write to Paimon tables in Data Lake Formation (DLF).
PyPaimon and DLF integration
PyPaimon is the Python SDK for Apache Paimon. It provides efficient data ingestion capabilities, allowing you to use Python to directly read, write, and process Paimon table data.
DLF catalog integration
By importing the pypaimon_dlf2 extension package and configuring a DLF catalog, you can automatically sync Paimon table metadata to Alibaba Cloud Data Lake Formation (DLF).
This integration provides the following key benefits:
-
Multi-engine interoperability: After hosting the metadata in DLF, other Alibaba Cloud computing engines, such as MaxCompute, Hologres, and Alibaba Cloud EMR, can seamlessly access this Paimon data.
-
Unified governance: DLF's data lake management capabilities allow you to manage the lifecycle of Paimon tables and automatically optimize their storage formats.
Prerequisites
-
You have a DLF data catalog.
-
The Python version must be 3.8 or later. Run
python3 --versionto verify your current version.
Procedure
Step 1: Prepare the environment
-
Run the following command to install the PyPaimon SDK.
pip3 install pypaimon==1.4.1 -
(Optional) After the installation, run the
pip3 show pypaimoncommand to verify the installation.
Step 2: Access a DLF Paimon table
-
In your target directory, run the following command to create a new file named
testdlf.py.vim testdlf.py -
In the
testdlf.pyfile, add the following complete sample code. This example shows how to create a DLF Paimon table and then read from and write to it. For details about parameter configurations and other methods for reading and writing data, see Code details.import pyarrow as pa import pandas as pd from pypaimon import CatalogFactory from pypaimon import Schema # Create a catalog. catalog_options = { 'metastore': 'rest', 'uri': "http://${region_id}-vpc.dlf.aliyuncs.com", 'warehouse': "${catalog_name}", 'dlf.region': '${region_id}', "token.provider": "dlf", 'dlf.access-key-id': "xxx", 'dlf.access-key-secret': "xxxx", } catalog = CatalogFactory.create(catalog_options) # Create a database. catalog.create_database( name='testdb', ignore_if_exists=True # Specifies whether to ignore the error if the database already exists. ) # Create a schema. pa_schema = pa.schema([ ('date', pa.string()), ('hour', pa.string()), ('key', pa.int64()), ('value', pa.string()) ]) schema = Schema.from_pyarrow_schema( pa_schema=pa_schema, partition_keys=['date', 'hour'], primary_keys=['date', 'hour', 'key'], options={'bucket': '2'}, comment='my test table' ) # Create a table. catalog.create_table( identifier='testdb.tb', schema=schema, ignore_if_exists=True # Specifies whether to ignore the error if the table already exists. ) table = catalog.get_table('testdb.tb') # Create table write and commit operations. write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() # Write data to the table. Both PyArrow and Pandas are supported. # Write sample Pandas data. data = { 'date': ['2024-12-01', '2024-12-01', '2024-12-02'], 'hour': ['08', '09', '08'], 'key': [1, 2, 3], 'value': ['AAA', 'BBB', 'CCC'], } dataframe = pd.DataFrame(data) table_write.write_pandas(dataframe) # Commit the data. table_commit.commit(table_write.prepare_commit()) # Close the resources. table_write.close() table_commit.close() # Read data from the table. Multiple output formats are supported. read_builder = table.new_read_builder() predicate_builder = read_builder.new_predicate_builder() predicate = predicate_builder.equal('date', '2024-12-01') read_builder = read_builder.with_filter(predicate) table_scan = read_builder.new_scan() splits = table_scan.plan().splits() table_read = read_builder.new_read() pa_table = table_read.to_arrow(splits) print(pa_table)
Step 3: Run the Python file
Go to the target directory and run the following command to run the Python script.
python3 testdlf.py
The following output is returned.
root@iZxxx:/opt# python3 testdlf.py
pyarrow.Table
date: string not null
hour: string not null
key: int64 not null
value: string
----
date: [["2024-12-01"],["2024-12-01"]]
hour: [["09"],["08"]]
key: [[2],[1]]
value: [["BBB"],["AAA"]]
Code details
Create a DLF Paimon table
-
Create a Paimon DLF catalog.
NoteYou must create a catalog to access Paimon tables in DLF.
# The catalog_options is a dictionary where both keys and values are strings. catalog_options = { 'metastore': 'rest', 'uri': "http://${region_id}-vpc.dlf.aliyuncs.com", 'warehouse': "${catalog_name}", 'dlf.region': '${region_id}', "token.provider": "dlf", 'dlf.access-key-id': "xxx", 'dlf.access-key-secret': "xxxx", } catalog = CatalogFactory.create(catalog_options)The following table describes the parameters.
Parameter
Description
metastore
Set to the fixed value
rest, which indicates that you connect to DLF by using the REST Catalog protocol.dlf.region
The ID of the DLF region. For more information, see Endpoints.
uri
The DLF REST Catalog endpoint. In a VPC environment, use
http://${region_id}-vpc.dlf.aliyuncs.com. In a public network environment, usehttps://dlfnext.${region_id}.aliyuncs.com. For more information, see Endpoints.warehouse
The name of the DLF data catalog. You can view the name in the Data Lake Formation console. For more information, see Data catalog.
dlf.access-key-id
The AccessKey ID required to access the DLF service. For more information, see Create an AccessKey pair.
dlf.access-key-secret
The AccessKey secret required to access the DLF service. For more information, see Create an AccessKey pair.
token.provider
Set to the fixed value
dlf, which indicates that the DLF service provides the access token.max-workers
Optional. The number of concurrent threads for reading data in PyPaimon. Must be an integer of 1 or greater. The default is 1, which indicates a serial read.
-
Create a database.
In a Paimon catalog, every table belongs to a specific database. Create databases to organize and manage your tables.
catalog.create_database( name='database_name', ignore_if_exists=True, # Specifies whether to ignore the error if the database already exists. properties={'key': 'value'} # Optional. The database properties. ) -
Create a schema.
A schema includes column definitions, partition keys, primary keys, table options, and comments. The column definitions are described using
pyarrow.Schema. The other parameters are optional. You can build apyarrow.Schemain either of the following ways.PyArrow
Use the
pyarrow.schemamethod. The following code provides an example.import pyarrow as pa from pypaimon import Schema pa_schema = pa.schema([ ('date', pa.string()), ('hour', pa.string()), ('key', pa.int64()), ('value', pa.string()) ]) schema = Schema( pa_schema=pa_schema, partition_keys=['date', 'hour'], primary_keys=['date', 'hour', 'key'], options={'bucket': '2'}, comment='my test table' )NoteFor information about the data type mapping between
pyarrowandPaimon, see PyPaimon data type mapping.Pandas
If you have Pandas data, you can derive the schema directly from a
pandas.DataFrame. The following code provides an example.import pandas as pd import pyarrow as pa from pypaimon import Schema # This is sample DataFrame data. data = { 'date': ['2024-12-01', '2024-12-01', '2024-12-02'], 'hour': ['08', '09', '08'], 'key': [1, 2, 3], 'value': ['AAA', 'BBB', 'CCC'], } dataframe = pd.DataFrame(data) # Obtain the pyarrow.Schema from the DataFrame. record_batch = pa.RecordBatch.from_pandas(dataframe) pa_schema = record_batch.schema schema = Schema( pa_schema=pa_schema, partition_keys=['date', 'hour'], primary_keys=['date', 'hour', 'key'], options={'bucket': '2'}, comment='my test table' ) -
Create and get a table.
catalog.create_table( identifier='database_name.table_name', schema=schema, ignore_if_exists=True # Specifies whether to ignore the error if the table already exists. ) table = catalog.get_table('database_name.table_name')
Write data to a table
PyPaimon does not currently support writing data to primary key tables where the bucket option is set to -1.
-
Create table write and commit operations.
# Create table write and commit operations. write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() # Write sample Pandas data. data = { 'date': ['2024-12-01', '2024-12-01', '2024-12-02'], 'hour': ['08', '09', '08'], 'key': [1, 2, 3], 'value': ['AAA', 'BBB', 'CCC'], } -
You can write data to the table in one of the following ways:
For large datasets, use PyArrow. For smaller datasets, typically a few gigabytes or less, Pandas can be more efficient.
PyArrow
You can write data as either a
pyarrow.Tableor apyarrow.RecordBatch.pyarrow.RecordBatchis more suitable for stream processing.-
Method 1: Write a pyarrow.Table
# Create fields. fields = [ pa.field('date', pa.string()), pa.field('hour', pa.string()), pa.field('key', pa.int64()), pa.field('value', pa.string()) ] # Create a schema from the fields. schema = pa.schema(fields) # Create a table. pa_table = pa.Table.from_arrays(data, schema) # Write the data. table_write.write_arrow(pa_table) -
Method 2: Write a pyarrow.RecordBatch
# Create fields. fields = [ pa.field('date', pa.string()), pa.field('hour', pa.string()), pa.field('key', pa.int64()), pa.field('value', pa.string()) ] # Create a schema from the fields. schema = pa.schema(fields) # Create a RecordBatch. record_batch = pa.RecordBatch.from_arrays(data, schema) # Write the data. table_write.write_arrow_batch(record_batch)
Pandas
You can write data from a pandas.DataFrame.
import pandas as pd dataframe = pd.DataFrame(data) table_write.write_pandas(dataframe) -
-
Commit the data and close the resources.
# Commit the data. table_commit.commit(table_write.prepare_commit()) # Close the resources. table_write.close() table_commit.close()
Read data from a table
-
Create a ReadBuilder.
read_builder = table.new_read_builder() -
Use a PredicateBuilder to build and push down filter conditions.
-
For example, you can read only the data where
dateis2024-12-01.predicate_builder = read_builder.new_predicate_builder() predicate = predicate_builder.equal('date', '2024-12-01') read_builder = read_builder.with_filter(predicate) -
For example, you can project only the
keyandvaluecolumns.read_builder = read_builder.with_projection(['key', 'value'])
NoteFor more information about supported filter conditions, see PyPaimon filter conditions.
-
-
Obtain the
splits.table_scan = read_builder.new_scan() splits = table_scan.plan().splits() -
Convert the
splitsto different output formats.Apache Arrow
-
Read all data into a
pyarrow.Table.table_read = read_builder.new_read() pa_table = table_read.to_arrow(splits) print(pa_table) # Sample output: # pyarrow.Table # key: int64 not null # value: string # ---- # key: [[2],[1]] # value: [["BBB"],["AAA"]] -
Read data into a
pyarrow.RecordBatchReaderand iterate over the batches.table_read = read_builder.new_read() for batch in table_read.to_arrow_batch_reader(splits): print(batch) # Sample output: # pyarrow.RecordBatch # key: int64 # value: string # ---- # key: [1,2] # value: ["AAA","BBB"]
Pandas
Read data into a
pandas.DataFrame.table_read = read_builder.new_read() df = table_read.to_pandas(splits) print(df) # Sample output: # key value # 0 1 AAA # 1 2 BBBDuckDB
ImportantYou must install DuckDB. You can run
pip install duckdbto install it.Convert the data to an in-memory DuckDB table and query it.
table_read = read_builder.new_read() duckdb_con = table_read.to_duckdb(splits, 'duckdb_table') print(duckdb_con.query("SELECT * FROM duckdb_table").fetchdf()) # Sample output: # key value # 0 1 AAA # 1 2 BBB print(duckdb_con.query("SELECT * FROM duckdb_table WHERE key = 1").fetchdf()) # Sample output: # key value # 0 1 AAARay
ImportantYou must install Ray. You can run
pip install rayto install it.table_read = read_builder.new_read() ray_dataset = table_read.to_ray(splits) # Print information about ray_dataset. print(ray_dataset) # Sample output: # MaterializedDataset(num_blocks=1, num_rows=2, schema={key: int64, value: string}) # Print the first two records in ray_dataset. print(ray_dataset.take(2)) # Sample output: # [{'key': 1, 'value': 'AAA'}, {'key': 2, 'value': 'BBB'}] # Convert the entire ray_dataset to a Pandas DataFrame and print the result. print(ray_dataset.to_pandas()) # Sample output: # key value # 0 1 AAA # 1 2 BBB -