PyPaimon+DLF进行数据入湖

本文为您介绍在Ubuntu环境中如何使用PyPaimon创建DLF Paimon表以及读写DLF Paimon表数据的方法。

背景信息

PyPaimon作为Apache PaimonPython SDK,不仅支持通过Python便捷地将数据写入Paimon表,实现高效的数据入湖操作,还支持通过Python高效处理Paimon中的数据。使用pypaimon_dlf2 SDK并配置DLF Catalog后,您可以将Paimon表的元数据同步至数据湖构建(DLF)服务中,从而便于阿里云上的其他计算引擎接入这些数据资源。此外,借助DLF提供的全面的数据湖管理功能,还可以实现对数据湖生命周期的有效管理和存储格式的优化。

前提条件

  • 已创建DLF 2.0数据目录。如未创建,请参见新建数据目录

  • 确保Ubuntu环境中已安装JRE 8,可通过运行java -version命令来验证Java版本。如未安装或版本不一致,请安装正确的版本(例如, 您可尝试通过apt install openjdk-8-jre来安装OpenJDK 8JRE版本)。

  • 确保Python版本为Python 3.8及以上。

操作步骤

步骤一:环境准备

  1. 下载pypaimon_dlf2-0.3.dev0.tar.gz安装包,并上传到Ubuntu环境的目标目录下。

    重要

    该安装包目前仅支持Ubuntu环境,其他环境可能无法使用。

  2. 进入目标目录下,执行以下命令,安装pypaimon_dlf2 SDK。

    pip3 install pypaimon_dlf2-0.3.dev0.tar.gz
  3. (可选)安装完成后,可通过pip show pypaimon_dlf2命令查看结果。

步骤二:通过PyPaimon访问DLF Paimon

  1. 在目标目录下,执行以下命令,新建文件testdlf.py

    vim testdlf.py
  2. testdlf.py文件中,写入以下完整示例代码。此示例展示了如何通过PyPaimon创建DLF Paimon以及读写DLF Paimon表数据。代码的具体参数配置及多种读写表数据的方式,请参见代码详解

    import pyarrow as pa
    import pandas as pd
    from pypaimon import Schema
    from pypaimon.py4j import Catalog
    
    # 创建 catalog
    catalog_options = {
      'metastore': 'dlf-paimon',
      'dlf.region': 'xxx',
      'dlf.endpoint': 'xxx',
      'dlf.catalog.id': 'xxx',
      'dlf.catalog.accessKeyId': 'xxx',
      'dlf.catalog.accessKeySecret': 'xxx',
      'max-workers': 'N'
    }
    catalog = Catalog.create(catalog_options)
    
    # 创建 database
    catalog.create_database(
       name='testdb',
       ignore_if_exists=True    # 是否忽略Database已存在的错误
    )
    
    # 创建 Schema
    pa_schema = pa.schema([
        ('date', pa.string()),
        ('hour', pa.string()),
        ('key', pa.int64()),
        ('value', pa.string())
    ])
    
    schema = Schema(
        pa_schema=pa_schema,
        partition_keys=['date', 'hour'],
        primary_keys=['date', 'hour', 'key'],
        options={'bucket': '2'},
        comment='my test table'
    )
    # 创建 table
    catalog.create_table(
        identifier='testdb.tb',
        schema=schema,
        ignore_if_exists=True  # 是否忽略表已存在的错误
    )
    table = catalog.get_table('testdb.tb')
    
    # 创建 table write and commit
    write_builder = table.new_batch_write_builder()
    table_write = write_builder.new_write()
    table_commit = write_builder.new_commit()
    
    # 写表数据,支持pyarrowPandas
    # 写入Pandas示例数据
    data = {
        'date': ['2024-12-01', '2024-12-01', '2024-12-02'],
        'hour': ['08', '09', '08'],
        'key': [1, 2, 3],
        'value': ['AAA', 'BBB', 'CCC'],
    }
    
    dataframe = pd.DataFrame(data)
    table_write.write_pandas(dataframe)
    
    # 提交数据
    table_commit.commit(table_write.prepare_commit())
    # 关闭资源
    table_write.close()
    table_commit.close()
    
    # 读表数据,支持多种数据格式
    read_builder = table.new_read_builder()
    predicate_builder = read_builder.new_predicate_builder()
    predicate = predicate_builder.equal('date', '2024-12-01')
    read_builder = read_builder.with_filter(predicate)
    
    table_scan = read_builder.new_scan()
    splits = table_scan.plan().splits()
    
    table_read = read_builder.new_read()
    pa_table = table_read.to_arrow(splits)
    print(pa_table)

步骤三:运行Python文件

进入目标目录,执行以下命令,运行Python脚本。

python3 testdlf.py

运行结果如下。

image

代码详解

通过PyPaimon创建DLF Paimon

  1. 创建Paimon DLF Catalog。

    说明

    Catalog是管理表的入口,在访问DLF中的Paimon表之前,首先需要创建Catalog。

    from pypaimon.py4j import Catalog
    
    # Catalog options是一个dict, keyvalue都是str
    catalog_options = {
      'metastore': 'dlf-paimon',
      'dlf.region': 'xxx',
      'dlf.endpoint': 'xxx',
      'dlf.catalog.id': 'xxx',
      'dlf.catalog.accessKeyId': 'xxx',
      'dlf.catalog.accessKeySecret': 'xxx',
      'max-workers': 'N'
    }
    catalog = Catalog.create(catalog_options)
    

    参数说明如下。

    参数

    说明

    metastore

    dlf-paimon

    dlf.region

    DLF Region ID,详情请参见地域及访问域名

    dlf.endpoint

    DLF Endpoint,详情请参见地域及访问域名

    dlf.catalog.id

    DLF数据目录ID。可在数据湖构建控制台上查看数据目录对应的ID,具体操作请参见数据目录

    dlf.catalog.accessKeyId

    访问DLF服务所需的AccessKey。详情请参见创建AccessKey

    dlf.catalog.accessKeySecret

    访问DLF服务所需的SecretKey。详情请参见创建AccessKey

    max-workers

    可选,PyPaimon读数据时的并发数。N:大于等于1的整数,默认为1,即默认情况下是串行读取。

  2. 创建Database。

    Paimon Catalog中,所有表都归属于特定的Database内。您可以创建Database来管理表。

    catalog.create_database(
       name='database_name',
       ignore_if_exists=True,    # 是否忽略Database已存在的错误
       properties={'key': 'value'}  # Database参数(可选)
    )
  3. 创建Schema。

    Schema包含列定义、分区键、主键、表参数和注释。其中,列定义使用 pyarrow.Schema 描述,而其余参数均为可选项。可通过以下两种方式构建pyarrow.Schema

    重要

    为了确保兼容性,列定义应使用小写。因为在DLF中,列名会被自动转换成小写形式。否则,在查询时可能会因为找不到对应的列而报错。

    PyArrow

    使用 pyarrow.schema方法。示例如下。

    import pyarrow as pa
    from pypaimon import Schema
    
    pa_schema = pa.schema([
        ('date', pa.string()),
        ('hour', pa.string()),
        ('key', pa.int64()),
        ('value', pa.string())
    ])
    
    schema = Schema(
        pa_schema=pa_schema,
        partition_keys=['date', 'hour'],
        primary_keys=['date', 'hour', 'key'],
        options={'bucket': '2'},
        comment='my test table'
    )
    说明

    pyarrowPaimon的数据类型映射,请参见PyPaimon数据类型映射

    Pandas

    如果您有pandas数据,也可以直接从 pandas.DataFrame中获取。示例如下。

    import pandas as pd
    import pyarrow as pa
    
    from pypaimon import Schema
    
    # 这里是示例DataFrame数据
    data = {
        'date': ['2024-12-01', '2024-12-01', '2024-12-02'],
        'hour': ['08', '09', '08'],
        'key': [1, 2, 3],
        'value': ['AAA', 'BBB', 'CCC'],
    }
    dataframe = pd.DataFrame(data)
    
    # 从DataFrame中获取pyarrow.Schema
    record_batch = pa.RecordBatch.from_pandas(dataframe)
    pa_schema = record_batch.schema
    
    schema = Schema(
        pa_schema=pa_schema,
        partition_keys=['date', 'hour'],
        primary_keys=['date', 'hour', 'key'],
        options={'bucket': '2'},
        comment='my test table'
    )
  4. 创建并获取Table。

    catalog.create_table(
        identifier='database_name.table_name',
        schema=schema,
        ignore_if_exists=True  # 是否忽略表已存在的错误
    )
    table = catalog.get_table('database_name.table_name')
    

写入表数据

说明

当前PyPaimon不支持写bucket=-1的主键表。

  1. 创建Table写入与提交操作

    # 创建 table write and commit
    write_builder = table.new_batch_write_builder()
    table_write = write_builder.new_write()
    table_commit = write_builder.new_commit()
    
    # 写入Pandas示例数据
    data = {
        'date': ['2024-12-01', '2024-12-01', '2024-12-02'],
        'hour': ['08', '09', '08'],
        'key': [1, 2, 3],
        'value': ['AAA', 'BBB', 'CCC'],
    }
  2. 您可通过以下两种方式写入表数据:

    在处理大规模数据集时,推荐使用PyArrow;而对于较小规模的数据(通常指几GB以下),Pandas则能更高效地进行处理。

    PyArrow

    支持pyarrow.Tablepyarrow.RecordBatch两种方式。其中,pyarrow.RecordBatch更适合用于流式处理场景。

    • 方式一:写入pyarrow.Table

      # 创建字段
      fields = [
          pa.field('date', pa.string()),
          pa.field('hour', pa.string()),
          pa.field('key', pa.int64()),
          pa.field('value', pa.string())
      ]
      
      # 使用数据和字段创建 Schema
      schema = pa.schema(fields)
      
      # 创建 Table
      pa_table = pa.Table.from_arrays(data, schema)
      
      # 写入数据
      table_write.write_arrow(pa_table)
    • 方式二:写入pyarrow.RecordBatch

      # 创建字段
      fields = [
          pa.field('date', pa.string()),
          pa.field('hour', pa.string()),
          pa.field('key', pa.int64()),
          pa.field('value', pa.string())
      ]
      
      # 使用数据和字段创建 Schema
      schema = pa.schema(fields)
      
      # 创建 RecordBatch
      record_batch = pa.RecordBatch.from_arrays(data, schema)
      
      # 写入数据
      table_write.write_arrow_batch(record_batch)

    Pandas

    支持写入pandas.DataFrame。

    import pandas as pd
    
    dataframe = pd.DataFrame(data)
    table_write.write_pandas(dataframe)
  3. 提交数据并关闭资源。

    # 提交数据
    table_commit.commit(table_write.prepare_commit())
    # 关闭资源
    table_write.close()
    table_commit.close()

读取表数据

  1. 创建ReadBuilder,构建读数据工具。

    read_builder = table.new_read_builder()
  2. 使用PredicateBuilder来构建和下推筛选条件。

    • 支持条件筛选,例如您只想查询date = 2024-12-01的数据。

      predicate_builder = read_builder.new_predicate_builder()
      predicate = predicate_builder.equal('date', '2024-12-01')
      read_builder = read_builder.with_filter(predicate)
    • 支持筛选特定列,例如您只想查询 keyvalue两列。

      read_builder = read_builder.with_projection(['key', 'value'])
    说明

    更多支持的筛选条件,参见PyPaimon筛选条件

  3. 获取splits

    table_scan = read_builder.new_scan()
    splits = table_scan.plan().splits()
  4. splits转换为多种数据格式。

    Apache Arrow

    • 您可以把所有数据读到 pyarrow.Table中。

      table_read = read_builder.new_read()
      pa_table = table_read.to_arrow(splits)
      print(pa_table)
      
      # 输出示例:
      # pyarrow.Table
      # key: int64 not null
      # value: string
      # ----
      # key: [[2],[1]]
      # value: [["BBB"],["AAA"]]
    • 也可以将数据读到 pyarrow.RecordBatchReader并迭代读取。

      table_read = read_builder.new_read()
      for batch in table_read.to_arrow_batch_reader(splits):
          print(batch)
      
      # 输出示例:
      # pyarrow.RecordBatch
      # key: int64
      # value: string
      # ----
      # key: [1,2]
      # value: ["AAA","BBB"]

    Pandas

    您可以将数据读到 pandas.DataFrame中。

    table_read = read_builder.new_read()
    df = table_read.to_pandas(splits)
    print(df)
    
    # 输出示例:
    #    key  value
    # 0   1  AAA
    # 1   2  BBB

    DuckDB

    重要

    需要安装DuckDB,可通过pip install duckdb安装。

    您可以将数据转换为一个in-memoryDuckDB table并查询。

    table_read = read_builder.new_read()
    duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')
    
    print(duckdb_con.query("SELECT * FROM duckdb_table").fetchdf())
    # 输出示例:
    #    key  value
    # 0   1  AAA
    # 1   2  BBB
    
    print(duckdb_con.query("SELECT * FROM duckdb_table WHERE key = 1").fetchdf())
    # 输出示例:
    #    key  value
    # 0   1  AAA

    Ray

    重要

    需要安装Ray,可通过pip install ray安装。

    table_read = read_builder.new_read()
    ray_dataset = table_read.to_ray(splits)
    
    # 打印ray_dataset的信息
    print(ray_dataset)
    # 输出示例:
    # MaterializedDataset(num_blocks=1, num_rows=2, schema={key: int64, value: string})
    
    # 打印ray_dataset中的前两个元素
    print(ray_dataset.take(2))
    # 输出示例:
    # [{'key': 1, 'value': 'AAA'}, {'key': 2, 'value': 'BBB'}]
    
    # 将整个ray_dataset转换为Pandas DataFrame格式后打印
    print(ray_dataset.to_pandas())
    # 输出示例:
    #    key  value
    # 0   1  AAA
    # 1   2  BBB

常见问题

Q:在安装JRE时,执行apt install openjdk-8-jre失败,报错如下,应如何处理?

image

A:先执行apt update更新软件包,再执行apt install openjdk-8-jre进行安装。