Paimon外部表

更新时间:
复制为 MD 格式

MaxCompute支持通过创建Paimon外部表与存储于OSS上的Paimon表目录建立映射关系,并访问其中的数据。本文介绍如何创建Paimon外部表以及如何通过MaxCompute访问Paimon外部表。

功能介绍

Apache Paimon是一种流批一体的湖存储格式,具备高吞吐的写入和低延迟的查询能力。实时计算FlinkE-MapReduce的常见计算引擎(如Spark、HiveTrino)都与Paimon有完善的集成。借助Apache Paimon,可以在存储服务OSS上快速构建数据湖,并能接入MaxCompute实现数据湖分析。元数据过滤功能通过减少读取处理任务中不需要的OSS目录文件,进一步优化查询性能。

适用范围

  • Paimon外部表不支持自动跟随Paimon文件schema变更而调整schema。

  • 不支持对Paimon外部表设置cluster属性,不支持设置主键。

  • Paimon外部表暂不支持查询回溯历史版本的数据等特性。

  • 不建议直接将数据写入Paimon外部表,建议使用unload等方式导出数据到OSS。

  • 可以使用INSERT INTOINSERT OVERWRITE语句将数据写入Paimon外部表,暂不支持写入Dynamic Bucket表和Cross Partition表。

  • 不支持对Paimon外部表执行UPDATE/DELETE操作。

  • MaxCompute需要与OSS部署在同一地域。

  • 支持数据类型

创建Paimon外部表

语法结构

各格式的外部表语法结构详情,请参见OSS外部表

CREATE EXTERNAL TABLE  [if NOT EXISTS] <mc_oss_extable_name>
(
  <col_name> <data_type>,
  ...
)
[COMMENT <table_comment>]
[PARTITIONED BY (<col_name> <data_type>, ...)]
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
WITH serdeproperties (
  'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole'
)
LOCATION '<oss_location>';

公共参数

公共参数说明请参见基础语法参数说明

写入数据

MaxCompute写入语法详情,请参见写入语法说明

查询分析

  • Paimon表数据分片(Split)逻辑与MaxCompute原生表不同。Paimon格式拥有独立的内部文件组织和分片机制,不完全参照MaxCompute参数。

  • SELECT语法详情,请参见查询语法说明

  • 优化查询计划详情,请参见查询优化

  • BadRowSkipping详情,请参见BadRowSkipping

使用示例

步骤一:前置准备

  1. 创建MaxCompute项目

  2. 已准备好OSS存储空间(Bucket)、OSS目录。具体操作请参见创建存储空间管理目录

    由于MaxCompute只在部分地域部署,跨地域的数据连通性可能存在问题,因此建议BucketMaxCompute项目所在地域保持一致。
  3. 授权

    1. 具备访问OSS的权限。阿里云账号(主账号)、RAM用户或RAMRole身份可以访问OSS外部表,授权信息请参见OSSSTS模式授权

    2. 已具备在MaxCompute项目中创建表(CreateTable)的权限。表操作的权限信息请参见MaxCompute权限

步骤二:在Flink中准备数据

创建Paimon CatalogPaimon表,并在表中插入数据,示例如下。

说明

如果在OSS中已有Paimon表数据,可忽略此步骤。

  1. 创建Paimon Filesystem Catalog

    1. 登录Flink控制台,在左上角选择地域。

    2. 单击目标工作空间名称,然后在左侧导航栏,选择数据管理

    3. 在右侧Catalog列表 界面,单击创建Catalog 。在弹出的创建 Catalog 对话框里,选择Apache Paimon,单击下一步 并配置如下参数:

      参数

      是否必填

      说明

      metastore

      必填

      元数据存储类型。本示例中选择filesystem

      catalog name

      必填

      自定义catalog名称,例如paimon-catalog

      warehouse

      必填

      OSS服务中所指定的数仓目录。本示例中oss://paimon-fs/paimon-test/

      fs.oss.endpoint

      必填

      OSS服务的endpoint,例如杭州地域为oss-cn-hangzhou-internal.aliyuncs.com

      fs.oss.accessKeyId

      必填

      访问OSS服务所需的Access Key ID。

      fs.oss.accessKeySecret

      必填

      访问OSS服务所需的Access Key Secret。

  2. 创建Paimon

    1. 登录Flink控制台,在左上角选择地域。

    2. 单击目标工作空间名称,然后在左侧导航栏,选择数据开发 > 数据查询

    3. 查询脚本页签,单击image,新建查询脚本。

      输入如下命令后,单击运行。

      CREATE TABLE `paimon_catalog`.`default`.test_tbl (
          id BIGINT,
          data STRING,
          dt STRING,
          PRIMARY KEY (dt, id) NOT ENFORCED
      ) PARTITIONED BY (dt);
      
      INSERT INTO `paimon-catalog`.`default`.test_tbl VALUES (1,'CCC','2024-07-18'), (2,'DDD','2024-07-18');
  3. SQL作业有限流作业(例如执行INSERT INTO ... VALUES ...语句),需要执行以下操作:

    1. 单击目标工作空间名称,然后在左侧导航栏,选择运维中心 > 作业运维

    2. 作业运维页面,单击目标作业名称,进入作业部署详情页面。

    3. 运行参数配置区域,单击编辑,在其他配置中设置execution.checkpointing.checkpoints-after-tasks-finish.enabled: true代码并保存。

      配置作业的运行参数详情,请参见配置作业部署信息

步骤三:通过MaxCompute创建Paimon外表

MaxCompute中执行以下SQL代码,创建MaxCompute Paimon外部表。

CREATE EXTERNAL TABLE oss_extable_paimon_pt
(
    id BIGINT,
    data STRING
)
PARTITIONED BY (dt STRING )
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
WITH serdeproperties (
    'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole'
)
LOCATION 'oss://oss-cn-<your region>-internal.aliyuncs.com/<table_path>'
;

上述代码中table_pathFlink中创建的Paimon表路径,例如paimon-fs/paimon-test/default.db/test_tbl。获取方法如下:

  1. 登录Flink控制台,在左上角选择地域。

  2. 单击目标工作空间名称,然后在左侧导航栏,选择数据管理

  3. 元数据页面单击目标Catalog下的default,并在default页面中单击目标表操作列的查看

  4. 表结构详情页签的表属性区域获取path参数值,table_path中仅填写oss://之后的路径。

步骤四:引入分区数据

当创建的OSS外部表为分区表时,需要额外执行引入分区数据的操作,详情请参见补全OSS外部表分区数据语法

MSCK REPAIR TABLE oss_extable_paimon_pt ADD PARTITIONS;

步骤五:通过MaxCompute读取Paimon外部表

MaxCompute中执行以下命令,查询MaxCompute Paimon外部表oss_extable_paimon_pt。

SET odps.sql.common.table.planner.ext.hive.bridge = true;
SET odps.sql.hive.compatible = true;
SELECT * FROM oss_extable_paimon_pt WHERE dt='2024-07-18';

返回结果如下:

+------------+------------+------------+
| id         | data       | dt         | 
+------------+------------+------------+
| 1          | CCC        | 2024-07-18 | 
| 2          | DDD        | 2024-07-18 | 
+------------+------------+------------+
说明

Paimon文件中的Schema与外表Schema不一致时:

  • 列数不一致:如果Paimon文件中的列数小于外表DDL的列数,则读取Paimon数据时,系统会将缺少的列值补充为NULL。反之(大于时),会丢弃超出的列数据。

  • 列类型不一致:MaxCompute不支持使用INT类型接收Paimon文件中的STRING类型数据,支持使用STRING类型接收INT类型数据(不推荐)。

支持数据类型

MaxCompute数据类型请参见1.0数据类型版本2.0数据类型版本

开源Paimon数据类型

MaxCompute 2.0数据类型

是否支持读写

说明

TINYINT

TINYINT

支持

8位有符号整型。

SMALLINT

SMALLINT

支持

16位有符号整型。

INT

INT

支持

32位有符号整型。

BIGINT

BIGINT

支持

64位有符号整型。

BINARY(MAX_LENGTH)

BINARY

支持

二进制数据类型,目前长度限制为8 MB。

FLOAT

FLOAT

支持

32位二进制浮点型。

DOUBLE

DOUBLE

支持

64位二进制浮点型。

DECIMAL(precision,scale)

DECIMAL(precision,scale)

支持

10进制精确数字类型,默认为decimal(38,18)。允许自定义precisionscale值。

  • precision:表示最多可以表示多少位的数字。默认取值范围:1 <= precision <= 38

  • scale:表示小数部分的位数。默认取值范围: 0 <= scale <= 18

VARCHAR(n)

VARCHAR(n)

支持

变长字符类型。n为长度,[1,65535]。

CHAR(n)

CHAR(n)

支持

固定长度字符类型。n为长度,[1,255]。

VARCHAR(MAX_LENGTH)

STRING

支持

字符串类型目前长度限制为8MB。

DATE

DATE

支持

日期类型格式为yyyy-mm-dd

TIME、TIME(p)

不支持

不支持

Paimon数据类型TIME,不带时区的时间类型,由时分秒组成,精度可到纳秒。

TIME(p)表示小数位的精度,0-9之间,默认为0。

MaxCompute侧没有映射的类型。

TIMESTAMP、TIMESTAMP(p)

TIMESTAMP_NTZ

支持

无时区时间戳类型,精确到纳秒。

读表需打开Native开关SET odps.sql.common.table.jni.disable.native=true;

TIMESTAMP WITH LOCAL TIME_ZONE(9)

TIMESTAMP

支持

  • 时间戳类型,精确到纳秒格式为yyyy-mm-dd hh:mm:ss.xxxxxxxxx

  • 针对Paimon源表TIMESTAMP低精度类型,在写入时会截断处理。0~3按照3位截断,4~6按照6位截断,7~9按照9位截断。

TIMESTAMP WITH LOCAL TIME_ZONE(9)

DATETIME

不支持

时间戳类型,精确到纳秒

格式为yyyy-mm-dd hh:mm:ss.xxxxxxxxx

BOOLEAN

BOOLEAN

支持

BOOLEAN类型。

ARRAY

ARRAY

支持

复杂类型。

MAP

MAP

支持

复杂类型。

ROW

STRUCT

支持

复杂类型。

MULTISET<t>

不支持

不支持

MaxCompute侧没有映射的类型。

VARBINARY、VARBINARY(n)、BYTES

BINARY

支持

可变长度二进制字符串的数据类型。

常见问题

Paimon外部表报错kSIGABRT

  • 报错信息

    ODPS-0123144: Fuxi job failed - kSIGABRT(errCode:6) at Odps/*****_SQL_0_1_0_job_0/M1@f01b17437.cloud.eo166#3. 
      Detail error msg: CRASH_CORE, maybe caused by jvm crash, please check your java udf/udaf/udtf. 
      | fatalInstance: Odps/*****_SQL_0_1_0_job_0/M1#0_0 
  • 错误原因

    JNI模式下读取TIMESTAMP_NTZ类型会产生此报错。

  • 解决方案

    读表前加上打开Native开关SET odps.sql.common.table.jni.disable.native=true;

相关文档

可以在Flink中以自定义Catalog的方式创建MaxCompute Paimon外部表,并在写入数据后,通过MaxCompute查询并消费Paimon数据,详情请参见基于Flink创建MaxCompute Paimon外部表