MaxCompute支持通过创建Paimon外部表与存储于OSS上的Paimon表目录建立映射关系,并访问其中的数据。本文介绍如何创建Paimon外部表以及如何通过MaxCompute访问Paimon外部表。
功能介绍
Apache Paimon是一种流批一体的湖存储格式,具备高吞吐的写入和低延迟的查询能力。实时计算Flink版、E-MapReduce的常见计算引擎(如Spark、Hive或Trino)都与Paimon有完善的集成。借助Apache Paimon,可以在存储服务OSS上快速构建数据湖,并能接入MaxCompute实现数据湖分析。元数据过滤功能通过减少读取处理任务中不需要的OSS目录文件,进一步优化查询性能。
适用范围
-
Schema限制
Paimon外部表不支持自动跟随Paimon文件Schema变更而调整Schema。
-
表属性限制
-
不支持对Paimon外部表设置cluster属性;
-
不支持设置主键;
-
Paimon外部表暂不支持Paimon表属性透传。
-
-
数据写入、修改及查询
-
可以使用INSERT INTO或INSERT OVERWRITE语句将数据写入Paimon外部表。
-
暂不支持写入Dynamic Bucket表和Cross Partition表。
-
不支持对Paimon外部表执行UPDATE/DELETE操作。
-
Paimon外部表暂不支持查询回溯历史版本的数据等特性。
-
不建议直接将数据写入Paimon外部表,建议使用UNLOAD等方式导出数据到OSS。
-
-
MaxCompute需要与OSS部署在同一地域。
创建Paimon外部表
语法结构
各格式的外部表语法结构详情,请参见OSS外部表。
CREATE EXTERNAL TABLE [if NOT EXISTS] <mc_oss_extable_name>
(
<col_name> <data_type>,
...
)
[COMMENT <table_comment>]
[PARTITIONED BY (<col_name> <data_type>, ...)]
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
WITH serdeproperties (
'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole'
)
LOCATION '<oss_location>';
公共参数
公共参数说明请参见基础语法参数说明。
写入数据
MaxCompute写入语法详情,请参见写入语法说明。
查询分析
-
Paimon表数据分片(Split)逻辑与MaxCompute原生表不同。Paimon格式拥有独立的内部文件组织和分片机制,不完全参照MaxCompute参数。
-
SELECT语法详情,请参见查询语法说明。
-
优化查询计划详情,请参见查询优化。
-
BadRowSkipping详情,请参见BadRowSkipping。
使用示例
步骤一:前置准备
-
已准备好OSS存储空间(Bucket)、OSS目录。具体操作请参见创建存储空间、管理目录。
由于MaxCompute只在部分地域部署,跨地域的数据连通性可能存在问题,因此建议Bucket与MaxCompute项目所在地域保持一致。
-
授权
-
具备访问OSS的权限。阿里云账号(主账号)、RAM用户或RAMRole身份可以访问OSS外部表,授权信息请参见OSS的STS模式授权。
-
已具备在MaxCompute项目中创建表(CreateTable)的权限。表操作的权限信息请参见MaxCompute权限。
-
步骤二:在Flink中准备数据
创建Paimon Catalog和Paimon表,并在表中插入数据,示例如下。
如果在OSS中已有Paimon表数据,可忽略此步骤。
-
创建Paimon Filesystem Catalog
-
登录Flink控制台,在左上角选择地域。
-
单击目标工作空间名称,然后在左侧导航栏,选择数据管理。
-
在右侧Catalog列表 界面,单击创建Catalog 。在弹出的创建 Catalog 对话框里,选择Apache Paimon,单击下一步 并配置如下参数:
参数
是否必填
说明
metastore
必填
元数据存储类型。本示例中选择
filesystem。catalog name
必填
自定义catalog名称,例如
paimon-catalog。warehouse
必填
OSS服务中所指定的数仓目录。本示例中
oss://paimon-fs/paimon-test/。fs.oss.endpoint
必填
OSS服务的endpoint,例如杭州地域为
oss-cn-hangzhou-internal.aliyuncs.com。fs.oss.accessKeyId
必填
访问OSS服务所需的Access Key ID。
fs.oss.accessKeySecret
必填
访问OSS服务所需的Access Key Secret。
-
-
创建Paimon表
-
登录Flink控制台,在左上角选择地域。
-
单击目标工作空间名称,然后在左侧导航栏,选择。
-
在查询脚本页签,单击
,新建查询脚本。输入如下命令后,单击运行。
CREATE TABLE `paimon_catalog`.`default`.test_tbl ( id BIGINT, data STRING, dt STRING, PRIMARY KEY (dt, id) NOT ENFORCED ) PARTITIONED BY (dt); INSERT INTO `paimon-catalog`.`default`.test_tbl VALUES (1,'CCC','2024-07-18'), (2,'DDD','2024-07-18');
-
-
若SQL作业有限流作业(例如执行
INSERT INTO ... VALUES ...语句),需要执行以下操作:-
单击目标工作空间名称,然后在左侧导航栏,选择。
-
在作业运维页面,单击目标作业名称,进入作业部署详情页面。
-
在运行参数配置区域,单击编辑,在其他配置中设置
execution.checkpointing.checkpoints-after-tasks-finish.enabled: true代码并保存。配置作业的运行参数详情,请参见配置作业部署信息。
-
步骤三:通过MaxCompute创建Paimon外表
在MaxCompute中执行以下SQL代码,创建MaxCompute Paimon外部表。
CREATE EXTERNAL TABLE oss_extable_paimon_pt
(
id BIGINT,
data STRING
)
PARTITIONED BY (dt STRING )
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
WITH serdeproperties (
'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole'
)
LOCATION 'oss://oss-cn-<your region>-internal.aliyuncs.com/<table_path>'
;
上述代码中table_path为Flink中创建的Paimon表路径,例如paimon-fs/paimon-test/default.db/test_tbl。获取方法如下:
-
登录Flink控制台,在左上角选择地域。
-
单击目标工作空间名称,然后在左侧导航栏,选择数据管理。
-
在元数据页面单击目标Catalog下的default,并在default页面中单击目标表操作列的查看。
-
在表结构详情页签的表属性区域获取path参数值,table_path中仅填写
oss://之后的路径。
步骤四:引入分区数据
当创建的OSS外部表为分区表时,需要额外执行引入分区数据的操作,详情请参见OSS外部表。
MSCK REPAIR TABLE oss_extable_paimon_pt ADD PARTITIONS;
步骤五:通过MaxCompute读取Paimon外部表
在MaxCompute中执行以下命令,查询MaxCompute Paimon外部表oss_extable_paimon_pt。
SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;
SELECT * FROM oss_extable_paimon_pt WHERE dt='2024-07-18';
返回结果如下:
+------------+------------+------------+
| id | data | dt |
+------------+------------+------------+
| 1 | CCC | 2024-07-18 |
| 2 | DDD | 2024-07-18 |
+------------+------------+------------+
当Paimon文件中的Schema与外表Schema不一致时:
-
列数不一致:如果Paimon文件中的列数小于外表DDL的列数,则读取Paimon数据时,系统会将缺少的列值补充为NULL。反之(大于时),会丢弃超出的列数据。
-
列类型不一致:MaxCompute不支持使用INT类型接收Paimon文件中的STRING类型数据,支持使用STRING类型接收INT类型数据(不推荐)。
支持数据类型
MaxCompute数据类型请参见1.0数据类型版本和2.0数据类型版本。
|
开源Paimon数据类型 |
MaxCompute 2.0数据类型 |
是否支持读写 |
说明 |
|
TINYINT |
TINYINT |
|
8位有符号整型。 |
|
SMALLINT |
SMALLINT |
|
16位有符号整型。 |
|
INT |
INT |
|
32位有符号整型。 |
|
BIGINT |
BIGINT |
|
64位有符号整型。 |
|
BINARY(MAX_LENGTH) |
BINARY |
|
二进制数据类型,目前长度限制为8 MB。 |
|
FLOAT |
FLOAT |
|
32位二进制浮点型。 |
|
DOUBLE |
DOUBLE |
|
64位二进制浮点型。 |
|
DECIMAL(precision,scale) |
DECIMAL(precision,scale) |
|
10进制精确数字类型,默认为
|
|
VARCHAR(n) |
VARCHAR(n) |
|
变长字符类型。n为长度,[1,65535]。 |
|
CHAR(n) |
CHAR(n) |
|
固定长度字符类型。n为长度,[1,255]。 |
|
VARCHAR(MAX_LENGTH) |
STRING |
|
字符串类型目前长度限制为8MB。 |
|
DATE |
DATE |
|
日期类型格式为 |
|
TIME、TIME(p) |
不支持 |
|
Paimon数据类型TIME,不带时区的时间类型,由时分秒组成,精度可到纳秒。 TIME(p)表示小数位的精度,0-9之间,默认为0。 MaxCompute侧没有映射的类型。 |
|
TIMESTAMP、TIMESTAMP(p) |
TIMESTAMP_NTZ |
|
无时区时间戳类型,精确到纳秒。 读表需打开Native开关 |
|
TIMESTAMP WITH LOCAL TIME_ZONE(9) |
TIMESTAMP |
|
|
|
TIMESTAMP WITH LOCAL TIME_ZONE(9) |
DATETIME |
|
时间戳类型,精确到纳秒 格式为 |
|
BOOLEAN |
BOOLEAN |
|
BOOLEAN类型。 |
|
ARRAY |
ARRAY |
|
复杂类型。 |
|
MAP |
MAP |
|
复杂类型。 |
|
ROW |
STRUCT |
|
复杂类型。 |
|
MULTISET<t> |
不支持 |
|
MaxCompute侧没有映射的类型。 |
|
VARBINARY、VARBINARY(n)、BYTES |
BINARY |
|
可变长度二进制字符串的数据类型。 |
常见问题
读Paimon外部表报错kSIGABRT
-
报错信息:
ODPS-0123144: Fuxi job failed - kSIGABRT(errCode:6) at Odps/*****_SQL_0_1_0_job_0/M1@f01b17437.cloud.eo166#3. Detail error msg: CRASH_CORE, maybe caused by jvm crash, please check your java udf/udaf/udtf. | fatalInstance: Odps/*****_SQL_0_1_0_job_0/M1#0_0 -
错误原因:
JNI模式下读取TIMESTAMP_NTZ类型会产生此报错。
-
解决方案:
读表前加上打开Native开关
SET odps.sql.common.table.jni.disable.native=true;。
相关文档
可以在Flink中以自定义Catalog的方式创建MaxCompute Paimon外部表,并在写入数据后,通过MaxCompute查询并消费Paimon数据,详情请参见基于Flink创建MaxCompute Paimon外部表。