Python读写Paimon表

本文为您介绍如何使用PyFlink作业来读写Paimon表。

使用限制

VVR 11.1及以上版本支持读写元数据为DLF 2.5Paimon表。

准备工作

在开始编写之前,请确保您已经完成以下的准备工作:

  1. 已经授权并开通DLF,并创建数据目录,且与实时计算Flink版在同一地域下。

  2. 请确认本地开发环境已安装Pyflink依赖。

    # Python3.12可能会存在不兼容的情况,建议使用3.11及以下环境安装 
    pip install apache-flink
  3. 在实时计算开发控制台上,创建Paimon DLF Catalog

  4. 在左侧导航栏选择数据开发 > 数据查询 > 新建,创建测试表。

    请确保运行时,右下角的执行环境为VVR 11.1及以上版本,若无相应环境,可以重新创建Session集群

    # 创建测试表(本示例创建的Catalog名称为paimontest)
    # 在默认数据库上创建test表
    CREATE TABLE `paimontest`.`default`.`test` (
      user_id INT,
      user_nmae STRING
    );

编写Python作业

写入Paimon

编写paimon_write.py。本示例中的Schema与上文创建的测试表一致,为k INT,v STRING

import argparse
import logging
import os
import re
import sys

from pyflink.common import Configuration
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

def parse_with_params(input_str):
    """
    解析以分号分隔的键值对字符串,返回字典格式。
    示例输入: "k1=v1;k2=v2"
    输出: {'k1': 'v1', 'k2': 'v2'}
    """
    pattern = r'([a-zA-Z0-9_.]+)=([^;]+)'
    matches = re.findall(pattern, input_str)
    return {key: value for key, value in matches}

def find_paimon_classpath(directory):
    """
    在指定目录中查找 Ververica Connector for Paimon 的 JAR 文件。
    返回其 file:// 类型的路径。
    该连接器已内置,无需上传。
    """
    pattern = r'^ververica-connector-paimon.*\.jar$'
    for root, dirs, files in os.walk(directory):
        for file in files:
            if re.match(pattern, file):
                return 'file://' + str(os.path.join(root, file))
    raise IOError(f'Paimon jar not found in the directory {directory}')

def paimon_write(table, database, catalog_conf):
    """
    向 Paimon 表中插入数据 (1, 'Alice'), (2, 'Bob')
    """

    # 配置 Flink Pipeline Classpath,加载 Paimon Connector
    config = Configuration()
    config.set_string("pipeline.classpaths", find_paimon_classpath("/flink/opt/"))

    # 创建流式执行环境
    env_settings = EnvironmentSettings.Builder().with_configuration(config).in_streaming_mode().build()
    t_env = StreamTableEnvironment.create(environment_settings=env_settings)
    
    # 填写对应Catalog的名称 
    catalog_name = "paimontest"

    # 解析并构造 Paimon Catalog 的 WITH 子句
    with_params = parse_with_params(catalog_conf)
    with_params.setdefault('type', 'paimon')  # 设置默认类型为 paimon
    with_clause = ', '.join([f"'{k}' = '{v}'" for k, v in with_params.items()])

    # 创建 Paimon Catalog
    t_env.execute_sql(f"CREATE CATALOG {catalog_name} WITH ({with_clause})")

    # 插入数据到目标表
    table_result = t_env.execute_sql(
        f"INSERT INTO `{catalog_name}`.`{database}`.`{table}` VALUES (1, 'Alice'), (2, 'Bob')"
    )
    table_result.wait()  # 等待任务完成

if __name__ == '__main__':
    # 日志配置
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    # 参数解析器
    parser = argparse.ArgumentParser()
    parser.add_argument('--table', dest='table', required=True, help='表名')
    parser.add_argument('--database', dest='database', required=False, default='default', help='数据库名')
    parser.add_argument('--catalog_conf', dest='catalog_conf', required=True,
                        help='Catalog 配置项,格式为 "k1=v1;k2=v2;..."')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    # 执行写入逻辑
    paimon_write(known_args.table, known_args.database, known_args.catalog_conf)

读取Paimon

编写paimon_read.py。本示例中的Schema与上文创建的测试表一致,为k INT,v STRING

import argparse
import logging
import os
import re
import sys

from pyflink.common import Configuration
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

def parse_with_params(input_str):
    """
    解析以分号分隔的键值对字符串,返回字典格式。
    示例输入: "k1=v1;k2=v2"
    输出: {'k1': 'v1', 'k2': 'v2'}
    """
    pattern = r'([a-zA-Z0-9_.]+)=([^;]+)'
    matches = re.findall(pattern, input_str)
    return {key: value for key, value in matches}

def find_paimon_classpath(directory):
    """
    在指定目录中查找 Ververica Connector for Paimon 的 JAR 文件。
    返回其 file:// 类型的路径。
    该连接器已内置,无需上传。
    """
    pattern = r'^ververica-connector-paimon.*\.jar$'
    for root, dirs, files in os.walk(directory):
        for file in files:
            if re.match(pattern, file):
                return 'file://' + str(os.path.join(root, file))
    raise IOError(f'Paimon jar not found in the directory {directory}')

def paimon_read(table, database, catalog_conf):
    """
    从 Paimon 表中读取数据,并输出到控制台
    """

    # 配置 Flink Pipeline Classpath,加载 Paimon Connector
    config = Configuration()
    config.set_string("pipeline.classpaths", find_paimon_classpath("/flink/opt/"))

    # 创建流式执行环境
    env_settings = EnvironmentSettings.Builder().with_configuration(config).in_streaming_mode().build()
    t_env = StreamTableEnvironment.create(environment_settings=env_settings)

    catalog_name = "paimontest"

    # 解析并构造 Paimon Catalog 的 WITH 子句
    with_params = parse_with_params(catalog_conf)
    with_params.setdefault('type', 'paimon')  # 设置默认类型为 paimon
    with_clause = ', '.join([f"'{k}' = '{v}'" for k, v in with_params.items()])

    # 创建 Paimon Catalog
    t_env.execute_sql(f"CREATE CATALOG {catalog_name} WITH ({with_clause})")

    # 创建临时打印 Sink 表
    t_env.execute_sql(f"""
        CREATE TEMPORARY TABLE print_sink (
            k INT,
            v STRING
        ) WITH (
            'connector' = 'print',
            'logger' = 'true'
        )
    """)

    # 查询数据并写入到 print_sink(即控制台)
    table_result = t_env.execute_sql(
        f"INSERT INTO print_sink SELECT * FROM `{catalog_name}`.`{database}`.`{table}`"
    )
    table_result.wait()  # 等待任务完成

if __name__ == '__main__':
    # 日志配置
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    # 参数解析器
    parser = argparse.ArgumentParser()
    parser.add_argument('--table', dest='table', required=True, help='表名')
    parser.add_argument('--database', dest='database', required=False, default='default', help='数据库名')
    parser.add_argument('--catalog_conf', dest='catalog_conf', required=True,
                        help='Catalog 配置项,格式为 "k1=v1;k2=v2;..."')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    # 执行读取逻辑
    paimon_read(known_args.table, known_args.database, known_args.catalog_conf)

上传作业并运行

  1. 登录实时计算控制台

  2. 单击目标工作空间操作列下的控制台

  3. 在左侧导航栏选择运维中心 > 作业运维 > 部署作业 > Python作业

  4. 填写对应的参数。

    image

    参数

    说明

    引擎版本

    VVR 11.1及以上

    Python Uri

    上传对应的Python脚本

    Entry Point Main Arguments

    --table test
    --database default
    --catalog_conf metastore=rest;token.provider=dlf;uri=http://cn-hangzhou-vpc.dlf.aliyuncs.com;warehouse=paimontest
    • table:目标测试表。

    • databbase:目标测试库。

    • catalog_conf:

      • metastore:固定为rest。

      • token.provider:固定为dlf。

      • uri:详见服务接入点中的Region ID。

      • warehouse:DLF Catalog名称。

  5. 单击部署后,单击启动作业。

查询结果

写入查询

  1. 在左侧导航栏选择数据开发 > 数据查询 > 新建,创建临时查询脚本。

  2. 编写查询语句后,选中单击运行。

    SELECT * FROM `paimontest`.`default`.`test`;

读取查询

在运行作业中,查看作业日志输出。

image

常见问题

Q: 代码中关于Paimon JAR文件是否需要自行上传?

A:无需上传。环境中已经内置了paimon connectorJAR包。

Q: 为什么在实时计算开发控制台创建Catalog没有看见对应的DLF数据目录?

A:请确认在DLF创建的Catalog与实时计算Flink版在同一地域下。