MaxCompute支持通过创建Paimon外部表与存储于OSS上的Paimon表目录建立映射关系,并访问其中的数据。本文介绍如何创建Paimon外部表以及如何通过MaxCompute访问Paimon外部表。
功能介绍
Apache Paimon是一种流批一体的湖存储格式,具备高吞吐的写入和低延迟的查询能力。实时计算Flink版、E-MapReduce的常见计算引擎(如Spark、Hive或Trino)都与Paimon有完善的集成。借助Apache Paimon,可以在存储服务OSS上快速构建数据湖,并能接入MaxCompute实现数据湖分析。元数据过滤功能通过减少读取处理任务中不需要的OSS目录文件,进一步优化查询性能。
适用范围
Paimon外部表不支持自动跟随Paimon文件schema变更而调整schema。
不支持对Paimon外部表设置cluster属性,不支持设置主键。
Paimon外部表暂不支持查询回溯历史版本的数据等特性。
不建议直接将数据写入Paimon外部表,建议使用unload等方式导出数据到OSS。
可以使用INSERT INTO或INSERT OVERWRITE语句将数据写入Paimon外部表,暂不支持写入Dynamic Bucket表和Cross Partition表。
不支持对Paimon外部表执行UPDATE/DELETE操作。
MaxCompute需要与OSS部署在同一地域。
创建Paimon外部表
语法结构
各格式的外部表语法结构详情,请参见OSS外部表。
CREATE EXTERNAL TABLE [if NOT EXISTS] <mc_oss_extable_name>
(
<col_name> <data_type>,
...
)
[COMMENT <table_comment>]
[PARTITIONED BY (<col_name> <data_type>, ...)]
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
WITH serdeproperties (
'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole'
)
LOCATION '<oss_location>';公共参数
公共参数说明请参见基础语法参数说明。
写入数据
MaxCompute写入语法详情,请参见写入语法说明。
查询分析
Paimon表数据分片(Split)逻辑与MaxCompute原生表不同。Paimon格式拥有独立的内部文件组织和分片机制,不完全参照MaxCompute参数。
SELECT语法详情,请参见查询语法说明。
优化查询计划详情,请参见查询优化。
BadRowSkipping详情,请参见BadRowSkipping。
使用示例
步骤一:前置准备
已准备好OSS存储空间(Bucket)、OSS目录。具体操作请参见创建存储空间、管理目录。
由于MaxCompute只在部分地域部署,跨地域的数据连通性可能存在问题,因此建议Bucket与MaxCompute项目所在地域保持一致。
授权
具备访问OSS的权限。阿里云账号(主账号)、RAM用户或RAMRole身份可以访问OSS外部表,授权信息请参见OSS的STS模式授权。
已具备在MaxCompute项目中创建表(CreateTable)的权限。表操作的权限信息请参见MaxCompute权限。
步骤二:在Flink中准备数据
创建Paimon Catalog和Paimon表,并在表中插入数据,示例如下。
如果在OSS中已有Paimon表数据,可忽略此步骤。
创建Paimon Filesystem Catalog
登录Flink控制台,在左上角选择地域。
单击目标工作空间名称,然后在左侧导航栏,选择数据管理。
在右侧Catalog列表 界面,单击创建Catalog 。在弹出的创建 Catalog 对话框里,选择Apache Paimon,单击下一步 并配置如下参数:
参数
是否必填
说明
metastore
必填
元数据存储类型。本示例中选择
filesystem。catalog name
必填
自定义catalog名称,例如
paimon-catalog。warehouse
必填
OSS服务中所指定的数仓目录。本示例中
oss://paimon-fs/paimon-test/。fs.oss.endpoint
必填
OSS服务的endpoint,例如杭州地域为
oss-cn-hangzhou-internal.aliyuncs.com。fs.oss.accessKeyId
必填
访问OSS服务所需的Access Key ID。
fs.oss.accessKeySecret
必填
访问OSS服务所需的Access Key Secret。
创建Paimon表
登录Flink控制台,在左上角选择地域。
单击目标工作空间名称,然后在左侧导航栏,选择。
在查询脚本页签,单击
,新建查询脚本。输入如下命令后,单击运行。
CREATE TABLE `paimon_catalog`.`default`.test_tbl ( id BIGINT, data STRING, dt STRING, PRIMARY KEY (dt, id) NOT ENFORCED ) PARTITIONED BY (dt); INSERT INTO `paimon-catalog`.`default`.test_tbl VALUES (1,'CCC','2024-07-18'), (2,'DDD','2024-07-18');
若SQL作业有限流作业(例如执行
INSERT INTO ... VALUES ...语句),需要执行以下操作:单击目标工作空间名称,然后在左侧导航栏,选择。
在作业运维页面,单击目标作业名称,进入作业部署详情页面。
在运行参数配置区域,单击编辑,在其他配置中设置
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true代码并保存。配置作业的运行参数详情,请参见配置作业部署信息。
步骤三:通过MaxCompute创建Paimon外表
在MaxCompute中执行以下SQL代码,创建MaxCompute Paimon外部表。
CREATE EXTERNAL TABLE oss_extable_paimon_pt
(
id BIGINT,
data STRING
)
PARTITIONED BY (dt STRING )
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
WITH serdeproperties (
'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole'
)
LOCATION 'oss://oss-cn-<your region>-internal.aliyuncs.com/<table_path>'
;上述代码中table_path为Flink中创建的Paimon表路径,例如paimon-fs/paimon-test/default.db/test_tbl。获取方法如下:
登录Flink控制台,在左上角选择地域。
单击目标工作空间名称,然后在左侧导航栏,选择数据管理。
在元数据页面单击目标Catalog下的default,并在default页面中单击目标表操作列的查看。
在表结构详情页签的表属性区域获取path参数值,table_path中仅填写
oss://之后的路径。
步骤四:引入分区数据
当创建的OSS外部表为分区表时,需要额外执行引入分区数据的操作,详情请参见补全OSS外部表分区数据语法。
MSCK REPAIR TABLE oss_extable_paimon_pt ADD PARTITIONS;步骤五:通过MaxCompute读取Paimon外部表
在MaxCompute中执行以下命令,查询MaxCompute Paimon外部表oss_extable_paimon_pt。
SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;
SELECT * FROM oss_extable_paimon_pt WHERE dt='2024-07-18';返回结果如下:
+------------+------------+------------+
| id | data | dt |
+------------+------------+------------+
| 1 | CCC | 2024-07-18 |
| 2 | DDD | 2024-07-18 |
+------------+------------+------------+当Paimon文件中的Schema与外表Schema不一致时:
列数不一致:如果Paimon文件中的列数小于外表DDL的列数,则读取Paimon数据时,系统会将缺少的列值补充为NULL。反之(大于时),会丢弃超出的列数据。
列类型不一致:MaxCompute不支持使用INT类型接收Paimon文件中的STRING类型数据,支持使用STRING类型接收INT类型数据(不推荐)。
支持数据类型
MaxCompute数据类型请参见1.0数据类型版本和2.0数据类型版本。
开源Paimon数据类型 | MaxCompute 2.0数据类型 | 是否支持读写 | 说明 |
TINYINT | TINYINT | 8位有符号整型。 | |
SMALLINT | SMALLINT | 16位有符号整型。 | |
INT | INT | 32位有符号整型。 | |
BIGINT | BIGINT | 64位有符号整型。 | |
BINARY(MAX_LENGTH) | BINARY | 二进制数据类型,目前长度限制为8 MB。 | |
FLOAT | FLOAT | 32位二进制浮点型。 | |
DOUBLE | DOUBLE | 64位二进制浮点型。 | |
DECIMAL(precision,scale) | DECIMAL(precision,scale) | 10进制精确数字类型,默认为
| |
VARCHAR(n) | VARCHAR(n) | 变长字符类型。n为长度,[1,65535]。 | |
CHAR(n) | CHAR(n) | 固定长度字符类型。n为长度,[1,255]。 | |
VARCHAR(MAX_LENGTH) | STRING | 字符串类型目前长度限制为8MB。 | |
DATE | DATE | 日期类型格式为 | |
TIME、TIME(p) | 不支持 | Paimon数据类型TIME,不带时区的时间类型,由时分秒组成,精度可到纳秒。 TIME(p)表示小数位的精度,0-9之间,默认为0。 MaxCompute侧没有映射的类型。 | |
TIMESTAMP、TIMESTAMP(p) | TIMESTAMP_NTZ | 无时区时间戳类型,精确到纳秒。 读表需打开Native开关 | |
TIMESTAMP WITH LOCAL TIME_ZONE(9) | TIMESTAMP |
| |
TIMESTAMP WITH LOCAL TIME_ZONE(9) | DATETIME | 时间戳类型,精确到纳秒 格式为 | |
BOOLEAN | BOOLEAN | BOOLEAN类型。 | |
ARRAY | ARRAY | 复杂类型。 | |
MAP | MAP | 复杂类型。 | |
ROW | STRUCT | 复杂类型。 | |
MULTISET<t> | 不支持 | MaxCompute侧没有映射的类型。 | |
VARBINARY、VARBINARY(n)、BYTES | BINARY | 可变长度二进制字符串的数据类型。 |
常见问题
读Paimon外部表报错kSIGABRT
报错信息:
ODPS-0123144: Fuxi job failed - kSIGABRT(errCode:6) at Odps/*****_SQL_0_1_0_job_0/M1@f01b17437.cloud.eo166#3. Detail error msg: CRASH_CORE, maybe caused by jvm crash, please check your java udf/udaf/udtf. | fatalInstance: Odps/*****_SQL_0_1_0_job_0/M1#0_0错误原因:
JNI模式下读取TIMESTAMP_NTZ类型会产生此报错。
解决方案:
读表前加上打开Native开关
SET odps.sql.common.table.jni.disable.native=true;。
相关文档
可以在Flink中以自定义Catalog的方式创建MaxCompute Paimon外部表,并在写入数据后,通过MaxCompute查询并消费Paimon数据,详情请参见基于Flink创建MaxCompute Paimon外部表。