本文为您介绍如何使用PyFlink作业来读写Paimon表。
使用限制
仅VVR 11.1及以上版本支持读写元数据为DLF 2.5的Paimon表。
准备工作
在开始编写之前,请确保您已经完成以下的准备工作:
请确认本地开发环境已安装Pyflink依赖。
# Python3.12可能会存在不兼容的情况,建议使用3.11及以下环境安装 pip install apache-flink
在实时计算开发控制台上,创建Paimon DLF Catalog。
在左侧导航栏选择
,创建测试表。请确保运行时,右下角的执行环境为VVR 11.1及以上版本,若无相应环境,可以重新创建Session集群 。
# 创建测试表(本示例创建的Catalog名称为paimontest) # 在默认数据库上创建test表 CREATE TABLE `paimontest`.`default`.`test` ( user_id INT, user_nmae STRING );
编写Python作业
写入Paimon表
编写paimon_write.py。本示例中的Schema与上文创建的测试表一致,为k INT,v STRING
。
import argparse
import logging
import os
import re
import sys
from pyflink.common import Configuration
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
def parse_with_params(input_str):
"""
解析以分号分隔的键值对字符串,返回字典格式。
示例输入: "k1=v1;k2=v2"
输出: {'k1': 'v1', 'k2': 'v2'}
"""
pattern = r'([a-zA-Z0-9_.]+)=([^;]+)'
matches = re.findall(pattern, input_str)
return {key: value for key, value in matches}
def find_paimon_classpath(directory):
"""
在指定目录中查找 Ververica Connector for Paimon 的 JAR 文件。
返回其 file:// 类型的路径。
该连接器已内置,无需上传。
"""
pattern = r'^ververica-connector-paimon.*\.jar$'
for root, dirs, files in os.walk(directory):
for file in files:
if re.match(pattern, file):
return 'file://' + str(os.path.join(root, file))
raise IOError(f'Paimon jar not found in the directory {directory}')
def paimon_write(table, database, catalog_conf):
"""
向 Paimon 表中插入数据 (1, 'Alice'), (2, 'Bob')
"""
# 配置 Flink Pipeline Classpath,加载 Paimon Connector
config = Configuration()
config.set_string("pipeline.classpaths", find_paimon_classpath("/flink/opt/"))
# 创建流式执行环境
env_settings = EnvironmentSettings.Builder().with_configuration(config).in_streaming_mode().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
# 填写对应Catalog的名称
catalog_name = "paimontest"
# 解析并构造 Paimon Catalog 的 WITH 子句
with_params = parse_with_params(catalog_conf)
with_params.setdefault('type', 'paimon') # 设置默认类型为 paimon
with_clause = ', '.join([f"'{k}' = '{v}'" for k, v in with_params.items()])
# 创建 Paimon Catalog
t_env.execute_sql(f"CREATE CATALOG {catalog_name} WITH ({with_clause})")
# 插入数据到目标表
table_result = t_env.execute_sql(
f"INSERT INTO `{catalog_name}`.`{database}`.`{table}` VALUES (1, 'Alice'), (2, 'Bob')"
)
table_result.wait() # 等待任务完成
if __name__ == '__main__':
# 日志配置
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
# 参数解析器
parser = argparse.ArgumentParser()
parser.add_argument('--table', dest='table', required=True, help='表名')
parser.add_argument('--database', dest='database', required=False, default='default', help='数据库名')
parser.add_argument('--catalog_conf', dest='catalog_conf', required=True,
help='Catalog 配置项,格式为 "k1=v1;k2=v2;..."')
argv = sys.argv[1:]
known_args, _ = parser.parse_known_args(argv)
# 执行写入逻辑
paimon_write(known_args.table, known_args.database, known_args.catalog_conf)
读取Paimon表
编写paimon_read.py。本示例中的Schema与上文创建的测试表一致,为k INT,v STRING
。
import argparse
import logging
import os
import re
import sys
from pyflink.common import Configuration
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
def parse_with_params(input_str):
"""
解析以分号分隔的键值对字符串,返回字典格式。
示例输入: "k1=v1;k2=v2"
输出: {'k1': 'v1', 'k2': 'v2'}
"""
pattern = r'([a-zA-Z0-9_.]+)=([^;]+)'
matches = re.findall(pattern, input_str)
return {key: value for key, value in matches}
def find_paimon_classpath(directory):
"""
在指定目录中查找 Ververica Connector for Paimon 的 JAR 文件。
返回其 file:// 类型的路径。
该连接器已内置,无需上传。
"""
pattern = r'^ververica-connector-paimon.*\.jar$'
for root, dirs, files in os.walk(directory):
for file in files:
if re.match(pattern, file):
return 'file://' + str(os.path.join(root, file))
raise IOError(f'Paimon jar not found in the directory {directory}')
def paimon_read(table, database, catalog_conf):
"""
从 Paimon 表中读取数据,并输出到控制台
"""
# 配置 Flink Pipeline Classpath,加载 Paimon Connector
config = Configuration()
config.set_string("pipeline.classpaths", find_paimon_classpath("/flink/opt/"))
# 创建流式执行环境
env_settings = EnvironmentSettings.Builder().with_configuration(config).in_streaming_mode().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
catalog_name = "paimontest"
# 解析并构造 Paimon Catalog 的 WITH 子句
with_params = parse_with_params(catalog_conf)
with_params.setdefault('type', 'paimon') # 设置默认类型为 paimon
with_clause = ', '.join([f"'{k}' = '{v}'" for k, v in with_params.items()])
# 创建 Paimon Catalog
t_env.execute_sql(f"CREATE CATALOG {catalog_name} WITH ({with_clause})")
# 创建临时打印 Sink 表
t_env.execute_sql(f"""
CREATE TEMPORARY TABLE print_sink (
k INT,
v STRING
) WITH (
'connector' = 'print',
'logger' = 'true'
)
""")
# 查询数据并写入到 print_sink(即控制台)
table_result = t_env.execute_sql(
f"INSERT INTO print_sink SELECT * FROM `{catalog_name}`.`{database}`.`{table}`"
)
table_result.wait() # 等待任务完成
if __name__ == '__main__':
# 日志配置
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
# 参数解析器
parser = argparse.ArgumentParser()
parser.add_argument('--table', dest='table', required=True, help='表名')
parser.add_argument('--database', dest='database', required=False, default='default', help='数据库名')
parser.add_argument('--catalog_conf', dest='catalog_conf', required=True,
help='Catalog 配置项,格式为 "k1=v1;k2=v2;..."')
argv = sys.argv[1:]
known_args, _ = parser.parse_known_args(argv)
# 执行读取逻辑
paimon_read(known_args.table, known_args.database, known_args.catalog_conf)
上传作业并运行
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏选择
。填写对应的参数。
参数
说明
引擎版本
VVR 11.1及以上
Python Uri
上传对应的Python脚本
Entry Point Main Arguments
--table test --database default --catalog_conf metastore=rest;token.provider=dlf;uri=http://cn-hangzhou-vpc.dlf.aliyuncs.com;warehouse=paimontest
table:目标测试表。
databbase:目标测试库。
catalog_conf:
metastore:固定为rest。
token.provider:固定为dlf。
uri:详见服务接入点中的Region ID。
warehouse:DLF Catalog名称。
单击部署后,单击启动作业。
查询结果
写入查询
在左侧导航栏选择
,创建临时查询脚本。编写查询语句后,选中单击运行。
SELECT * FROM `paimontest`.`default`.`test`;
读取查询
在运行作业中,查看作业日志输出。
常见问题
Q: 代码中关于Paimon JAR文件是否需要自行上传?
A:无需上传。环境中已经内置了paimon connector的JAR包。
Q: 为什么在实时计算开发控制台创建Catalog没有看见对应的DLF数据目录?
A:请确认在DLF创建的Catalog与实时计算Flink版在同一地域下。
该文章对您有帮助吗?