Access DLF using PyPaimon

更新时间:
复制 MD 格式

This article explains how to use PyPaimon to create, read from, and write to Paimon tables in Data Lake Formation (DLF).

PyPaimon and DLF integration

PyPaimon is the Python SDK for Apache Paimon. It provides efficient data ingestion capabilities, allowing you to use Python to directly read, write, and process Paimon table data.

DLF catalog integration

By importing the pypaimon_dlf2 extension package and configuring a DLF catalog, you can automatically sync Paimon table metadata to Alibaba Cloud Data Lake Formation (DLF).

This integration provides the following key benefits:

  • Multi-engine interoperability: After hosting the metadata in DLF, other Alibaba Cloud computing engines, such as MaxCompute, Hologres, and Alibaba Cloud EMR, can seamlessly access this Paimon data.

  • Unified governance: DLF's data lake management capabilities allow you to manage the lifecycle of Paimon tables and automatically optimize their storage formats.

Prerequisites

  • You have a DLF data catalog.

  • The Python version must be 3.8 or later. Run python3 --version to verify your current version.

Procedure

Step 1: Prepare the environment

  1. Run the following command to install the PyPaimon SDK.

    pip3 install pypaimon==1.4.1
  2. (Optional) After the installation, run the pip3 show pypaimon command to verify the installation.

Step 2: Access a DLF Paimon table

  1. In your target directory, run the following command to create a new file named testdlf.py.

    vim testdlf.py
  2. In the testdlf.py file, add the following complete sample code. This example shows how to create a DLF Paimon table and then read from and write to it. For details about parameter configurations and other methods for reading and writing data, see Code details.

    import pyarrow as pa
    import pandas as pd
    from pypaimon import CatalogFactory
    from pypaimon import Schema
    # Create a catalog.
    catalog_options = {
      'metastore': 'rest',
      'uri': "http://${region_id}-vpc.dlf.aliyuncs.com",
      'warehouse': "${catalog_name}",
      'dlf.region': '${region_id}',
      "token.provider": "dlf",
      'dlf.access-key-id': "xxx",
      'dlf.access-key-secret': "xxxx",
    }
    catalog = CatalogFactory.create(catalog_options)
    # Create a database.
    catalog.create_database(
       name='testdb',
       ignore_if_exists=True    # Specifies whether to ignore the error if the database already exists.
    )
    # Create a schema.
    pa_schema = pa.schema([
        ('date', pa.string()),
        ('hour', pa.string()),
        ('key', pa.int64()),
        ('value', pa.string())
    ])
    schema = Schema.from_pyarrow_schema(
        pa_schema=pa_schema,
        partition_keys=['date', 'hour'],
        primary_keys=['date', 'hour', 'key'],
        options={'bucket': '2'},
        comment='my test table'
    )
    # Create a table.
    catalog.create_table(
        identifier='testdb.tb',
        schema=schema,
        ignore_if_exists=True  # Specifies whether to ignore the error if the table already exists.
    )
    table = catalog.get_table('testdb.tb')
    # Create table write and commit operations.
    write_builder = table.new_batch_write_builder()
    table_write = write_builder.new_write()
    table_commit = write_builder.new_commit()
    # Write data to the table. Both PyArrow and Pandas are supported.
    # Write sample Pandas data.
    data = {
        'date': ['2024-12-01', '2024-12-01', '2024-12-02'],
        'hour': ['08', '09', '08'],
        'key': [1, 2, 3],
        'value': ['AAA', 'BBB', 'CCC'],
    }
    dataframe = pd.DataFrame(data)
    table_write.write_pandas(dataframe)
    # Commit the data.
    table_commit.commit(table_write.prepare_commit())
    # Close the resources.
    table_write.close()
    table_commit.close()
    # Read data from the table. Multiple output formats are supported.
    read_builder = table.new_read_builder()
    predicate_builder = read_builder.new_predicate_builder()
    predicate = predicate_builder.equal('date', '2024-12-01')
    read_builder = read_builder.with_filter(predicate)
    table_scan = read_builder.new_scan()
    splits = table_scan.plan().splits()
    table_read = read_builder.new_read()
    pa_table = table_read.to_arrow(splits)
    print(pa_table)

Step 3: Run the Python file

Go to the target directory and run the following command to run the Python script.

python3 testdlf.py

The following output is returned.

root@iZxxx:/opt# python3 testdlf.py
pyarrow.Table
date: string not null
hour: string not null
key: int64 not null
value: string
----
date: [["2024-12-01"],["2024-12-01"]]
hour: [["09"],["08"]]
key: [[2],[1]]
value: [["BBB"],["AAA"]]

Code details

Create a DLF Paimon table

  1. Create a Paimon DLF catalog.

    Note

    You must create a catalog to access Paimon tables in DLF.

    # The catalog_options is a dictionary where both keys and values are strings.
    catalog_options = {
      'metastore': 'rest',
      'uri': "http://${region_id}-vpc.dlf.aliyuncs.com",
      'warehouse': "${catalog_name}",
      'dlf.region': '${region_id}',
      "token.provider": "dlf",
      'dlf.access-key-id': "xxx",
      'dlf.access-key-secret': "xxxx",
    }
    catalog = CatalogFactory.create(catalog_options)

    The following table describes the parameters.

    Parameter

    Description

    metastore

    Set to the fixed value rest, which indicates that you connect to DLF by using the REST Catalog protocol.

    dlf.region

    The ID of the DLF region. For more information, see Endpoints.

    uri

    The DLF REST Catalog endpoint. In a VPC environment, use http://${region_id}-vpc.dlf.aliyuncs.com. In a public network environment, use https://dlfnext.${region_id}.aliyuncs.com. For more information, see Endpoints.

    warehouse

    The name of the DLF data catalog. You can view the name in the Data Lake Formation console. For more information, see Data catalog.

    dlf.access-key-id

    The AccessKey ID required to access the DLF service. For more information, see Create an AccessKey pair.

    dlf.access-key-secret

    The AccessKey secret required to access the DLF service. For more information, see Create an AccessKey pair.

    token.provider

    Set to the fixed value dlf, which indicates that the DLF service provides the access token.

    max-workers

    Optional. The number of concurrent threads for reading data in PyPaimon. Must be an integer of 1 or greater. The default is 1, which indicates a serial read.

  2. Create a database.

    In a Paimon catalog, every table belongs to a specific database. Create databases to organize and manage your tables.

    catalog.create_database(
       name='database_name',
       ignore_if_exists=True,    # Specifies whether to ignore the error if the database already exists.
       properties={'key': 'value'}  # Optional. The database properties.
    )
  3. Create a schema.

    A schema includes column definitions, partition keys, primary keys, table options, and comments. The column definitions are described using pyarrow.Schema. The other parameters are optional. You can build a pyarrow.Schema in either of the following ways.

    PyArrow

    Use the pyarrow.schema method. The following code provides an example.

    import pyarrow as pa
    from pypaimon import Schema
    pa_schema = pa.schema([
        ('date', pa.string()),
        ('hour', pa.string()),
        ('key', pa.int64()),
        ('value', pa.string())
    ])
    schema = Schema(
        pa_schema=pa_schema,
        partition_keys=['date', 'hour'],
        primary_keys=['date', 'hour', 'key'],
        options={'bucket': '2'},
        comment='my test table'
    )
    Note

    For information about the data type mapping between pyarrow and Paimon, see PyPaimon data type mapping.

    Pandas

    If you have Pandas data, you can derive the schema directly from a pandas.DataFrame. The following code provides an example.

    import pandas as pd
    import pyarrow as pa
    from pypaimon import Schema
    # This is sample DataFrame data.
    data = {
        'date': ['2024-12-01', '2024-12-01', '2024-12-02'],
        'hour': ['08', '09', '08'],
        'key': [1, 2, 3],
        'value': ['AAA', 'BBB', 'CCC'],
    }
    dataframe = pd.DataFrame(data)
    # Obtain the pyarrow.Schema from the DataFrame.
    record_batch = pa.RecordBatch.from_pandas(dataframe)
    pa_schema = record_batch.schema
    schema = Schema(
        pa_schema=pa_schema,
        partition_keys=['date', 'hour'],
        primary_keys=['date', 'hour', 'key'],
        options={'bucket': '2'},
        comment='my test table'
    )
  4. Create and get a table.

    catalog.create_table(
        identifier='database_name.table_name',
        schema=schema,
        ignore_if_exists=True  # Specifies whether to ignore the error if the table already exists.
    )
    table = catalog.get_table('database_name.table_name')
            

Write data to a table

Note

PyPaimon does not currently support writing data to primary key tables where the bucket option is set to -1.

  1. Create table write and commit operations.

    # Create table write and commit operations.
    write_builder = table.new_batch_write_builder()
    table_write = write_builder.new_write()
    table_commit = write_builder.new_commit()
    # Write sample Pandas data.
    data = {
        'date': ['2024-12-01', '2024-12-01', '2024-12-02'],
        'hour': ['08', '09', '08'],
        'key': [1, 2, 3],
        'value': ['AAA', 'BBB', 'CCC'],
    }
  2. You can write data to the table in one of the following ways:

    For large datasets, use PyArrow. For smaller datasets, typically a few gigabytes or less, Pandas can be more efficient.

    PyArrow

    You can write data as either a pyarrow.Table or a pyarrow.RecordBatch. pyarrow.RecordBatch is more suitable for stream processing.

    • Method 1: Write a pyarrow.Table

      # Create fields.
      fields = [
          pa.field('date', pa.string()),
          pa.field('hour', pa.string()),
          pa.field('key', pa.int64()),
          pa.field('value', pa.string())
      ]
      # Create a schema from the fields.
      schema = pa.schema(fields)
      # Create a table.
      pa_table = pa.Table.from_arrays(data, schema)
      # Write the data.
      table_write.write_arrow(pa_table)
    • Method 2: Write a pyarrow.RecordBatch

      # Create fields.
      fields = [
          pa.field('date', pa.string()),
          pa.field('hour', pa.string()),
          pa.field('key', pa.int64()),
          pa.field('value', pa.string())
      ]
      # Create a schema from the fields.
      schema = pa.schema(fields)
      # Create a RecordBatch.
      record_batch = pa.RecordBatch.from_arrays(data, schema)
      # Write the data.
      table_write.write_arrow_batch(record_batch)

    Pandas

    You can write data from a pandas.DataFrame.

    import pandas as pd
    dataframe = pd.DataFrame(data)
    table_write.write_pandas(dataframe)
  3. Commit the data and close the resources.

    # Commit the data.
    table_commit.commit(table_write.prepare_commit())
    # Close the resources.
    table_write.close()
    table_commit.close()

Read data from a table

  1. Create a ReadBuilder.

    read_builder = table.new_read_builder()
  2. Use a PredicateBuilder to build and push down filter conditions.

    • For example, you can read only the data where date is 2024-12-01.

      predicate_builder = read_builder.new_predicate_builder()
      predicate = predicate_builder.equal('date', '2024-12-01')
      read_builder = read_builder.with_filter(predicate)
    • For example, you can project only the key and value columns.

      read_builder = read_builder.with_projection(['key', 'value'])
    Note

    For more information about supported filter conditions, see PyPaimon filter conditions.

  3. Obtain the splits.

    table_scan = read_builder.new_scan()
    splits = table_scan.plan().splits()
  4. Convert the splits to different output formats.

    Apache Arrow

    • Read all data into a pyarrow.Table.

      table_read = read_builder.new_read()
      pa_table = table_read.to_arrow(splits)
      print(pa_table)
      # Sample output:
      # pyarrow.Table
      # key: int64 not null
      # value: string
      # ----
      # key: [[2],[1]]
      # value: [["BBB"],["AAA"]]
    • Read data into a pyarrow.RecordBatchReader and iterate over the batches.

      table_read = read_builder.new_read()
      for batch in table_read.to_arrow_batch_reader(splits):
          print(batch)
      # Sample output:
      # pyarrow.RecordBatch
      # key: int64
      # value: string
      # ----
      # key: [1,2]
      # value: ["AAA","BBB"]

    Pandas

    Read data into a pandas.DataFrame.

    table_read = read_builder.new_read()
    df = table_read.to_pandas(splits)
    print(df)
    # Sample output:
    #    key  value
    # 0   1  AAA
    # 1   2  BBB

    DuckDB

    Important

    You must install DuckDB. You can run pip install duckdb to install it.

    Convert the data to an in-memory DuckDB table and query it.

    table_read = read_builder.new_read()
    duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')
    print(duckdb_con.query("SELECT * FROM duckdb_table").fetchdf())
    # Sample output:
    #    key  value
    # 0   1  AAA
    # 1   2  BBB
    print(duckdb_con.query("SELECT * FROM duckdb_table WHERE key = 1").fetchdf())
    # Sample output:
    #    key  value
    # 0   1  AAA

    Ray

    Important

    You must install Ray. You can run pip install ray to install it.

    table_read = read_builder.new_read()
    ray_dataset = table_read.to_ray(splits)
    # Print information about ray_dataset.
    print(ray_dataset)
    # Sample output:
    # MaterializedDataset(num_blocks=1, num_rows=2, schema={key: int64, value: string})
    # Print the first two records in ray_dataset.
    print(ray_dataset.take(2))
    # Sample output:
    # [{'key': 1, 'value': 'AAA'}, {'key': 2, 'value': 'BBB'}]
    # Convert the entire ray_dataset to a Pandas DataFrame and print the result.
    print(ray_dataset.to_pandas())
    # Sample output:
    #    key  value
    # 0   1  AAA
    # 1   2  BBB