本文为您介绍在Ubuntu环境中如何使用PyPaimon创建DLF Paimon表以及读写DLF Paimon表数据的方法。
背景信息
PyPaimon作为Apache Paimon的Python SDK,不仅支持通过Python便捷地将数据写入Paimon表,实现高效的数据入湖操作,还支持通过Python高效处理Paimon中的数据。使用pypaimon_dlf2 SDK并配置DLF Catalog后,您可以将Paimon表的元数据同步至数据湖构建(DLF)服务中,从而便于阿里云上的其他计算引擎接入这些数据资源。此外,借助DLF提供的全面的数据湖管理功能,还可以实现对数据湖生命周期的有效管理和存储格式的优化。
前提条件
已创建DLF 2.0数据目录。如未创建,请参见新建数据目录。
确保Ubuntu环境中已安装JRE 8,可通过运行
java -version
命令来验证Java版本。如未安装或版本不一致,请安装正确的版本(例如, 您可尝试通过apt install openjdk-8-jre
来安装OpenJDK 8的JRE版本)。确保Python版本为Python 3.8及以上。
操作步骤
步骤一:环境准备
下载pypaimon_dlf2-0.3.dev0.tar.gz安装包,并上传到Ubuntu环境的目标目录下。
重要该安装包目前仅支持Ubuntu环境,其他环境可能无法使用。
进入目标目录下,执行以下命令,安装pypaimon_dlf2 SDK。
pip3 install pypaimon_dlf2-0.3.dev0.tar.gz
(可选)安装完成后,可通过
pip show pypaimon_dlf2
命令查看结果。
步骤二:通过PyPaimon访问DLF Paimon表
在目标目录下,执行以下命令,新建文件
testdlf.py
。vim testdlf.py
在
testdlf.py
文件中,写入以下完整示例代码。此示例展示了如何通过PyPaimon创建DLF Paimon表以及读写DLF Paimon表数据。代码的具体参数配置及多种读写表数据的方式,请参见代码详解。import pyarrow as pa import pandas as pd from pypaimon import Schema from pypaimon.py4j import Catalog # 创建 catalog catalog_options = { 'metastore': 'dlf-paimon', 'dlf.region': 'xxx', 'dlf.endpoint': 'xxx', 'dlf.catalog.id': 'xxx', 'dlf.catalog.accessKeyId': 'xxx', 'dlf.catalog.accessKeySecret': 'xxx', 'max-workers': 'N' } catalog = Catalog.create(catalog_options) # 创建 database catalog.create_database( name='testdb', ignore_if_exists=True # 是否忽略Database已存在的错误 ) # 创建 Schema pa_schema = pa.schema([ ('date', pa.string()), ('hour', pa.string()), ('key', pa.int64()), ('value', pa.string()) ]) schema = Schema( pa_schema=pa_schema, partition_keys=['date', 'hour'], primary_keys=['date', 'hour', 'key'], options={'bucket': '2'}, comment='my test table' ) # 创建 table catalog.create_table( identifier='testdb.tb', schema=schema, ignore_if_exists=True # 是否忽略表已存在的错误 ) table = catalog.get_table('testdb.tb') # 创建 table write and commit write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() # 写表数据,支持pyarrow和Pandas # 写入Pandas示例数据 data = { 'date': ['2024-12-01', '2024-12-01', '2024-12-02'], 'hour': ['08', '09', '08'], 'key': [1, 2, 3], 'value': ['AAA', 'BBB', 'CCC'], } dataframe = pd.DataFrame(data) table_write.write_pandas(dataframe) # 提交数据 table_commit.commit(table_write.prepare_commit()) # 关闭资源 table_write.close() table_commit.close() # 读表数据,支持多种数据格式 read_builder = table.new_read_builder() predicate_builder = read_builder.new_predicate_builder() predicate = predicate_builder.equal('date', '2024-12-01') read_builder = read_builder.with_filter(predicate) table_scan = read_builder.new_scan() splits = table_scan.plan().splits() table_read = read_builder.new_read() pa_table = table_read.to_arrow(splits) print(pa_table)
步骤三:运行Python文件
进入目标目录,执行以下命令,运行Python脚本。
python3 testdlf.py
运行结果如下。
代码详解
通过PyPaimon创建DLF Paimon表
创建Paimon DLF Catalog。
说明Catalog是管理表的入口,在访问DLF中的Paimon表之前,首先需要创建Catalog。
from pypaimon.py4j import Catalog # Catalog options是一个dict, key和value都是str catalog_options = { 'metastore': 'dlf-paimon', 'dlf.region': 'xxx', 'dlf.endpoint': 'xxx', 'dlf.catalog.id': 'xxx', 'dlf.catalog.accessKeyId': 'xxx', 'dlf.catalog.accessKeySecret': 'xxx', 'max-workers': 'N' } catalog = Catalog.create(catalog_options)
参数说明如下。
参数
说明
metastore
dlf-paimon
dlf.region
DLF Region ID,详情请参见地域及访问域名。
dlf.endpoint
DLF Endpoint,详情请参见地域及访问域名。
dlf.catalog.id
DLF数据目录ID。可在数据湖构建控制台上查看数据目录对应的ID,具体操作请参见数据目录。
dlf.catalog.accessKeyId
访问DLF服务所需的AccessKey。详情请参见创建AccessKey。
dlf.catalog.accessKeySecret
访问DLF服务所需的SecretKey。详情请参见创建AccessKey。
max-workers
可选,PyPaimon读数据时的并发数。N:大于等于1的整数,默认为1,即默认情况下是串行读取。
创建Database。
在Paimon Catalog中,所有表都归属于特定的Database内。您可以创建Database来管理表。
catalog.create_database( name='database_name', ignore_if_exists=True, # 是否忽略Database已存在的错误 properties={'key': 'value'} # Database参数(可选) )
创建Schema。
Schema包含列定义、分区键、主键、表参数和注释。其中,列定义使用
pyarrow.Schema
描述,而其余参数均为可选项。可通过以下两种方式构建pyarrow.Schema
。重要为了确保兼容性,列定义应使用小写。因为在DLF中,列名会被自动转换成小写形式。否则,在查询时可能会因为找不到对应的列而报错。
PyArrow
使用
pyarrow.schema
方法。示例如下。import pyarrow as pa from pypaimon import Schema pa_schema = pa.schema([ ('date', pa.string()), ('hour', pa.string()), ('key', pa.int64()), ('value', pa.string()) ]) schema = Schema( pa_schema=pa_schema, partition_keys=['date', 'hour'], primary_keys=['date', 'hour', 'key'], options={'bucket': '2'}, comment='my test table' )
说明pyarrow
和Paimon
的数据类型映射,请参见PyPaimon数据类型映射。Pandas
如果您有pandas数据,也可以直接从
pandas.DataFrame
中获取。示例如下。import pandas as pd import pyarrow as pa from pypaimon import Schema # 这里是示例DataFrame数据 data = { 'date': ['2024-12-01', '2024-12-01', '2024-12-02'], 'hour': ['08', '09', '08'], 'key': [1, 2, 3], 'value': ['AAA', 'BBB', 'CCC'], } dataframe = pd.DataFrame(data) # 从DataFrame中获取pyarrow.Schema record_batch = pa.RecordBatch.from_pandas(dataframe) pa_schema = record_batch.schema schema = Schema( pa_schema=pa_schema, partition_keys=['date', 'hour'], primary_keys=['date', 'hour', 'key'], options={'bucket': '2'}, comment='my test table' )
创建并获取Table。
catalog.create_table( identifier='database_name.table_name', schema=schema, ignore_if_exists=True # 是否忽略表已存在的错误 ) table = catalog.get_table('database_name.table_name')
写入表数据
当前PyPaimon不支持写bucket=-1的主键表。
创建Table写入与提交操作。
# 创建 table write and commit write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() # 写入Pandas示例数据 data = { 'date': ['2024-12-01', '2024-12-01', '2024-12-02'], 'hour': ['08', '09', '08'], 'key': [1, 2, 3], 'value': ['AAA', 'BBB', 'CCC'], }
您可通过以下两种方式写入表数据:
在处理大规模数据集时,推荐使用PyArrow;而对于较小规模的数据(通常指几GB以下),Pandas则能更高效地进行处理。
PyArrow
支持
pyarrow.Table
和pyarrow.RecordBatch
两种方式。其中,pyarrow.RecordBatch
更适合用于流式处理场景。方式一:写入pyarrow.Table
# 创建字段 fields = [ pa.field('date', pa.string()), pa.field('hour', pa.string()), pa.field('key', pa.int64()), pa.field('value', pa.string()) ] # 使用数据和字段创建 Schema schema = pa.schema(fields) # 创建 Table pa_table = pa.Table.from_arrays(data, schema) # 写入数据 table_write.write_arrow(pa_table)
方式二:写入pyarrow.RecordBatch
# 创建字段 fields = [ pa.field('date', pa.string()), pa.field('hour', pa.string()), pa.field('key', pa.int64()), pa.field('value', pa.string()) ] # 使用数据和字段创建 Schema schema = pa.schema(fields) # 创建 RecordBatch record_batch = pa.RecordBatch.from_arrays(data, schema) # 写入数据 table_write.write_arrow_batch(record_batch)
Pandas
支持写入pandas.DataFrame。
import pandas as pd dataframe = pd.DataFrame(data) table_write.write_pandas(dataframe)
提交数据并关闭资源。
# 提交数据 table_commit.commit(table_write.prepare_commit()) # 关闭资源 table_write.close() table_commit.close()
读取表数据
创建ReadBuilder,构建读数据工具。
read_builder = table.new_read_builder()
使用PredicateBuilder来构建和下推筛选条件。
支持条件筛选,例如您只想查询date = 2024-12-01的数据。
predicate_builder = read_builder.new_predicate_builder() predicate = predicate_builder.equal('date', '2024-12-01') read_builder = read_builder.with_filter(predicate)
支持筛选特定列,例如您只想查询 key和value两列。
read_builder = read_builder.with_projection(['key', 'value'])
说明更多支持的筛选条件,参见PyPaimon筛选条件。
获取
splits
。table_scan = read_builder.new_scan() splits = table_scan.plan().splits()
将
splits
转换为多种数据格式。Apache Arrow
您可以把所有数据读到
pyarrow.Table
中。table_read = read_builder.new_read() pa_table = table_read.to_arrow(splits) print(pa_table) # 输出示例: # pyarrow.Table # key: int64 not null # value: string # ---- # key: [[2],[1]] # value: [["BBB"],["AAA"]]
也可以将数据读到
pyarrow.RecordBatchReader
并迭代读取。table_read = read_builder.new_read() for batch in table_read.to_arrow_batch_reader(splits): print(batch) # 输出示例: # pyarrow.RecordBatch # key: int64 # value: string # ---- # key: [1,2] # value: ["AAA","BBB"]
Pandas
您可以将数据读到
pandas.DataFrame
中。table_read = read_builder.new_read() df = table_read.to_pandas(splits) print(df) # 输出示例: # key value # 0 1 AAA # 1 2 BBB
DuckDB
重要需要安装DuckDB,可通过
pip install duckdb
安装。您可以将数据转换为一个in-memory的DuckDB table并查询。
table_read = read_builder.new_read() duckdb_con = table_read.to_duckdb(splits, 'duckdb_table') print(duckdb_con.query("SELECT * FROM duckdb_table").fetchdf()) # 输出示例: # key value # 0 1 AAA # 1 2 BBB print(duckdb_con.query("SELECT * FROM duckdb_table WHERE key = 1").fetchdf()) # 输出示例: # key value # 0 1 AAA
Ray
重要需要安装Ray,可通过
pip install ray
安装。table_read = read_builder.new_read() ray_dataset = table_read.to_ray(splits) # 打印ray_dataset的信息 print(ray_dataset) # 输出示例: # MaterializedDataset(num_blocks=1, num_rows=2, schema={key: int64, value: string}) # 打印ray_dataset中的前两个元素 print(ray_dataset.take(2)) # 输出示例: # [{'key': 1, 'value': 'AAA'}, {'key': 2, 'value': 'BBB'}] # 将整个ray_dataset转换为Pandas DataFrame格式后打印 print(ray_dataset.to_pandas()) # 输出示例: # key value # 0 1 AAA # 1 2 BBB
常见问题
Q:在安装JRE时,执行apt install openjdk-8-jre
失败,报错如下,应如何处理?
A:先执行apt update
更新软件包,再执行apt install openjdk-8-jre
进行安装。