Parquet外部表

本文介绍Parquet格式的OSS外部表的创建、读取及写入方法。

适用范围

  • OSS外部表不支持cluster属性。

  • 单个文件大小不能超过2GB,如果文件过大,建议拆分。

  • MaxCompute需要与OSS部署在同一地域。

支持数据类型

MaxCompute数据类型详情请参见1.0数据类型版本2.0数据类型版本

  • JNI模式:set odps.ext.parquet.native=false,表示读外部表解析Parquet数据文件时,使用原有基于Java的开源社区实现,支持读和写。

  • Native模式:set odps.ext.parquet.native=true,表示读外部表解析Parquet数据文件时,使用新的基于C++的Native实现,仅支持读。

    模式

    Java模式(读写)

    Native模式(只读)

    TINYINT

    已开通

    已开通

    SMALLINT

    已开通

    已开通

    INT

    已开通

    已开通

    BIGINT

    已开通

    已开通

    BINARY

    已开通

    已开通

    FLOAT

    已开通

    已开通

    DOUBLE

    已开通

    已开通

    DECIMAL(precision,scale)

    未开通

    已开通

    VARCHAR(n)

    已开通

    已开通

    CHAR(n)

    已开通

    已开通

    STRING

    已开通

    已开通

    DATE

    已开通

    已开通

    DATETIME

    已开通

    已开通

    TIMESTAMP

    已开通

    已开通

    TIMESTAMP_NTZ

    未开通

    未开通

    BOOLEAN

    已开通

    已开通

    ARRAY

    已开通

    已开通

    MAP

    已开通

    已开通

    STRUCT

    已开通

    已开通

    JSON

    未开通

    未开通

支持压缩格式

  • 当读写压缩属性的OSS文件时,需要在建表语句中添加with serdeproperties属性配置,详情请参见with serdeproperties属性参数

  • 支持读写的数据文件格式:以ZSTD、SNAPPY、GZIP方式压缩的Parquet。

创建外部表

语法结构

Parquet文件中的Schema与外表Schema不一致时:

  • 列数不一致:如果Parquet文件中的列数小于外表DDL的列数,则读取Parquet数据时,系统会将缺少的列值补充为NULL。反之(大于时),会丢弃超出的列数据。

  • 列类型不一致:如果Parquet文件中的列类型与外表DDL中对应的列类型不一致,则读取Parquet数据时会报错。例如:使用STRING(或INT)类型接收Parquet文件中INT(或STRING)类型的数据,报错ODPS-0123131:User defined function exception - Traceback:xxx

精简语法结构

CREATE EXTERNAL TABLE [IF NOT EXISTS] <mc_oss_extable_name>
(
  <col_name> <data_type>,
  ...
)
[COMMENT <table_comment>]
[PARTITIONED BY (<col_name> <data_type>, ...)]
STORED AS parquet 
LOCATION '<oss_location>' 
[tblproperties ('<tbproperty_name>'='<tbproperty_value>',...)];

详细语法结构

CREATE EXTERNAL TABLE [IF NOT EXISTS] <mc_oss_extable_name>
(
  <col_name> <data_type>,
  ...
)
[COMMENT <table_comment>]
[PARTITIONED BY (<col_name> <data_type>, ...)]
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH serdeproperties(
    'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole',
    'mcfed.parquet.compression'='ZSTD/SNAPPY/GZIP'
)
STORED AS parquet 
LOCATION '<oss_location>' 
;

公共参数

公共参数说明请参见基础语法参数说明

有参数

with serdeproperties属性参数

property_name

使用场景

说明

property_value

默认值

mcfed.parquet.compression

当需要将Parquet数据以压缩方式写入OSS时,请添加该属性。

Parquet压缩属性。Parquet数据默认不压缩。

  • ZSTD

  • SNAPPY

  • GZIP

mcfed.parquet.compression.codec.zstd.level

'mcfed.parquet.compression'='zstd'时,可以添加该属性。不填此属性时,以默认值3进行压缩。

level值越大,压缩比越高,实测取值高时,写出数据的减少量非常有限,但时间和资源消耗快速增加,性价比明显降低,因此对于大数据读写压缩Parquet文件的场景,level(level3~level5)的zstd压缩效果最好。例如:'mcfed.parquet.compression.codec.zstd.level'= '5'

取值范围为1~22。

3

parquet.file.cache.size

在处理Parquet数据场景中,如果需要提升读OSS数据文件性能,请添加该属性。

指定读OSS数据文件时,可缓存的数据量,单位为KB。

1024

parquet.io.buffer.size

在处理Parquet数据场景中,如果需要提升读OSS数据文件性能,请添加该属性。

指定OSS数据文件大小超过1024 KB时,可缓存的数据量,单位为KB。

4096

tblproperties属性参数

property_name

使用场景

说明

property_value

默认值

io.compression.codecs

OSS数据文件为Raw-Snappy格式时,请添加该属性。

内置的开源数据解析器SNAPPY格式场景。

配置该参数值为True时,MaxCompute才可以正常读取压缩数据,否则MaxCompute无法成功读取数据。

com.aliyun.odps.io.compress.SnappyRawCodec。

odps.external.data.output.prefix

(兼容odps.external.data.prefix)

当需要添加输出文件的自定义前缀名时,请添加该属性。

  • 仅包含数字,字母,下划线(a-z, A-Z, 0-9, _)。

  • 长度在1-10之间。

符合条件的字符组合,例如'mc_'。

odps.external.data.enable.extension

当需要显示输出文件的扩展名时,请添加该属性。

True表示显示输出文件的扩展名,反之不显示扩展名。

  • True

  • False

False

odps.external.data.output.suffix

当需要添加输出文件的自定义后缀名时,请添加该属性。

仅包含数字,字母,下划线(a-z, A-Z, 0-9, _)。

符合条件的字符组合,例如'_hangzhou'。

odps.external.data.output.explicit.extension

当需要添加输出文件的自定义扩展名时,请添加该属性。

  • 仅包含数字,字母,下划线(a-z, A-Z, 0-9, _)

  • 长度在1-10之间。

  • 优先级高于参数odps.external.data.enable.extension

符合条件的字符组合,例如"jsonl"。

mcfed.parquet.compression

当需要将Parquet数据以压缩方式写入OSS时,请添加该属性。

Parquet压缩属性。Parquet数据默认不压缩。

  • SNAPPY

  • GZIP

  • ZSTD

mcfed.parquet.block.size

控制Parquet文件的块大小,影响存储效率和读取性能。

Parquet调优属性。定义Parquet块大小,以字节为单位。

非负整数

134217728 (128MB)

mcfed.parquet.block.row.count.limit

当向Parquet外部表写入数据时,限制每个行组中的记录数,避免内存溢出。

Parquet调优属性。控制每行组(row group)的最大记录数。如果出现OOM的情况,可将参数适当调小。

参数使用建议:

  1. 假设JVM内存只有1GB,平均每条record1MB,那么该参数可以限制在100左右(默认一个row group128MB)。

  2. 该参数不能设置太小。

非负整数

2147483647

(Integer.MAX_VALUE)

mcfed.parquet.page.size.row.check.min

当向Parquet外部表写入数据时,控制内存检查的频率,防止内存溢出。

Parquet调优属性。限制内存检查之间的最小记录数。如果出现OOM的情况,可将参数适当调小。

非负整数

100

mcfed.parquet.page.size.row.check.max

当向Parquet外部表写入数据时,控制内存检查的频率,防止内存溢出。

Parquet调优属性。限制内存检查之间的最小记录数。如果出现OOM的情况,可将参数适当调小。

由于需要对内存使用情况进行频繁计算,该参数调整可能会带来额外开销。

参数使用建议:

  1. 默认配置为每10000record执行内存检查。在record size很小的情况下,可以将该参数设置小一些,比如1000,更频繁地执行内存检查,进而避免OOM。

  2. 建议先调小mcfed.parquet.block.row.count.limit,如果还会发生OOM,或者输出文件太大,那么可以调小mcfed.parquet.page.size.row.check.max,更加频繁地检查内存。

非负整数

1000

mcfed.parquet.compression.codec.zstd.level

当需要将Parquet数据以ZSTD的压缩形式写入OSS,指定ZSTD压缩算法的压缩级别时,请添加该属性。

Parquet压缩属性。指ZSTD压缩算法的压缩级别。取值范围:1~22。

非负整数

3

写入数据

MaxCompute写入语法详情,请参见写入语法说明

查询分析

  • SELECT语法详情,请参见查询语法说明

  • 优化查询计划详情,请参见查询优化

  • 若需要直读LOCATION文件,请参见特色功能:Schemaless Query

  • 查询优化:Parquet外部表支持通过开启PPD(即Predicate Push Down)实现查询优化。优化性能结果参见支持Predicate Push Down(Parquet PPD)

    SQL前添加如下参数开启PPD:

    -- PPD参数需在Native模式下使用,即Native开关需为true。
    -- 开启parquet native reader。
    SET odps.ext.parquet.native = true; 
    -- 开启parquet ppd。
    SET odps.sql.parquet.use.predicate.pushdown = true; 

支持Predicate Push Down(Parquet PPD)

Parquet外部表本身不支持Predicate Push Down(Parquet PPD),执行带有WHERE过滤条件的查询时,MaxCompute默认会扫描所有数据,导致不必要的I/O、资源消耗和查询延迟。因此,MaxCompute新增了ParquetPDD参数,通过开启Parquet PDD,可在数据扫描阶段利用Parquet文件自身的元数据特性实现Parquet RowGroup级别的过滤,从而提升查询性能、降低资源消耗与成本。

使用方式

  • 开启Predicate Push Down(Parquet PPD

    执行SQL查询前,通过set命令设置以下两个Session级别的参数以开启Parquet PDD。

    -- 开启parquet native reader。 
    set odps.ext.parquet.native = true; 
    -- 开启parquet ppd。
    set odps.sql.parquet.use.predicate.pushdown = true; 
  • 使用示例

    基于1TBTPCDS测试数据集,以tpcds_1t_store_sales Parquet外部表为例,开启PPD并执行过滤查询,总数据量为2879987999

    -- 创建外部表tpcds_1t_store_sales。
    CREATE EXTERNAL TABLE IF NOT EXISTS tpcds_1t_store_sales (
        ss_sold_date_sk         BIGINT,
        ss_sold_time_sk         BIGINT,
        ss_item_sk              BIGINT,
        ss_customer_sk          BIGINT,
        ss_cdemo_sk             BIGINT,
        ss_hdemo_sk             BIGINT,
        ss_addr_sk              BIGINT,
        ss_store_sk             BIGINT,
        ss_promo_sk             BIGINT,
        ss_ticket_number        BIGINT,
        ss_quantity             BIGINT,
        ss_wholesale_cost       DECIMAL(7,2),
        ss_list_price           DECIMAL(7,2),
        ss_sales_price          DECIMAL(7,2),
        ss_ext_discount_amt     DECIMAL(7,2),
        ss_ext_sales_price      DECIMAL(7,2),
        ss_ext_wholesale_cost   DECIMAL(7,2),
        ss_ext_list_price       DECIMAL(7,2),
        ss_ext_tax              DECIMAL(7,2),
        ss_coupon_amt           DECIMAL(7,2),
        ss_net_paid             DECIMAL(7,2),
        ss_net_paid_inc_tax     DECIMAL(7,2),
        ss_net_profit           DECIMAL(7,2)
    )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    WITH serdeproperties(
      'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole',
      'mcfed.parquet.compression'='zstd'
    )
    STORED AS parquet
    LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss_bucket_path/';
    
    -- 使用1TBTPCDS测试数据集。
    INSERT OVERWRITE TABLE tpcds_1t_store_sales
    SELECT 
        ss_sold_date_sk,
        ss_sold_time_sk,
        ss_item_sk,
        ss_customer_sk,
        ss_cdemo_sk,
        ss_hdemo_sk,
        ss_addr_sk,
        ss_store_sk,
        ss_promo_sk,
        ss_ticket_number,
        ss_quantity,
        ss_wholesale_cost,
        ss_list_price,
        ss_sales_price,
        ss_ext_discount_amt,
        ss_ext_sales_price,
        ss_ext_wholesale_cost,
        ss_ext_list_price,
        ss_ext_tax,
        ss_coupon_amt,
        ss_net_paid,
        ss_net_paid_inc_tax,
        ss_net_profit
    FROM 
        bigdata_public_dataset.tpcds_1t.store_sales;
    
    -- 执行查询。
    SELECT SUM(ss_sold_date_sk) FROM tpcds_1t_store_sales
      WHERE ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;

性能对比结果

开启PPD功能可减少数据扫描量,从而降低查询延迟和资源消耗。

模式

表总数据量

扫描条数

扫描Bytes

Mapper耗时

总资源消耗

说明

Parquet外表+未开启PPD

2879987999

2879987999(100%)

19386793984(100%)

18s

cpu 19.25 Core * Min, memory 24.07 GB * Min

100%

Parquet外表+开启PPD

2879987999

762366649(26.47%)

3339386880(17.22%)

12s

cpu 11.47 Core * Min, memory 14.33 GB * Min

~59.58%

降低扫描数据量后,延迟和资源消耗降低很明显

内表+开启PPD

2879987999

32830000(1.14%)

1633880386(8.43%)

9s

cpu 5.62 Core * Min, memory 7.02 GB * Min

~29.19%

内表数据有排序,因此PPD效果更加极致

测试详情

  1. Parquet外表+未开启PPD

    SET odps.ext.parquet.native = true;
    SET odps.sql.parquet.use.predicate.pushdown = false;
    
    SELECT SUM(ss_sold_date_sk) FROM tpcds_1t_store_sales
      WHERE ss_store_sk = 2 AND ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;

    image

    image

    image

  2. Parquet外表+开启PPD

    SET odps.ext.parquet.native = true;
    SET odps.sql.parquet.use.predicate.pushdown = true;
    
    SELECT SUM(ss_sold_date_sk) FROM tpcds_1t_store_sales
      WHERE ss_store_sk = 2 AND ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;

    image

    image

    存在很多空Mapper,无需读数据:

    image.webp

    实际裁减RowGroup的日志:

    image.png

  3. 内表+开启PPD

    内表数据有排序属性,因此裁减效果更明显。

    SELECT SUM(ss_sold_date_sk) FROM bigdata_public_dataset.tpcds_1t.store_sales
      WHERE ss_store_sk = 2 AND ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;

    image

    image

    image

场景示例

本示例将创建以ZSTD压缩的Parquet格式外表,并进行读取和写入操作。

  1. 前置准备

    1. 创建MaxCompute项目

    2. 已准备好OSS存储空间(Bucket)、OSS目录。具体操作请参见创建存储空间管理目录

      由于MaxCompute只在部分地域部署,跨地域的数据连通性可能存在问题,因此建议BucketMaxCompute项目所在地域保持一致。
    3. 授权

      1. 具备访问OSS的权限。阿里云账号(主账号)、RAM用户或RAMRole身份可以访问OSS外部表,授权信息请参见OSSSTS模式授权

      2. 已具备在MaxCompute项目中创建表(CreateTable)的权限。表操作的权限信息请参见MaxCompute权限

  2. 准备ZSTD格式数据文件。

    示例数据oss-mc-testBucket中创建parquet_zstd_jni/dt=20230418目录层级,并将存放在分区目录dt=20230418下。

  3. 创建ZSTD压缩格式的Parquet外部表。

    CREATE EXTERNAL TABLE IF NOT EXISTS mc_oss_parquet_data_type_zstd (
        vehicleId INT,
        recordId INT,
        patientId INT,
        calls INT,
        locationLatitute DOUBLE,
        locationLongtitue DOUBLE,
        recordTime STRING,
        direction STRING
    )
    PARTITIONED BY (dt STRING )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    WITH serdeproperties(
      'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole',
      'mcfed.parquet.compression'='zstd'
    )
    STORED AS parquet
    LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/parquet_zstd_jni/';
  4. 引入分区数据。当创建的OSS外部表为分区表时,需要额外执行引入分区数据的操作,更多操作请参见补全OSS外部表分区数据语法

    -- 引入分区数据。
    MSCK REPAIR TABLE mc_oss_parquet_data_type_zstd ADD PARTITIONS;
  5. 读取Parquet外表数据。

    SELECT * FROM mc_oss_parquet_data_type_zstd WHERE dt='20230418' LIMIT 10;

    部分返回结果如下:

    +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+
    | vehicleid  | recordid   | patientid  | calls      | locationlatitute | locationlongtitue | recordtime     | direction  | dt         |
    +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+
    | 1          | 12         | 76         | 1          | 46.81006         | -92.08174         | 9/14/2014 0:10 | SW         | 20230418   |
    | 1          | 1          | 51         | 1          | 46.81006         | -92.08174         | 9/14/2014 0:00 | S          | 20230418   |
    | 1          | 2          | 13         | 1          | 46.81006         | -92.08174         | 9/14/2014 0:01 | NE         | 20230418   |
    | 1          | 3          | 48         | 1          | 46.81006         | -92.08174         | 9/14/2014 0:02 | NE         | 20230418   |
    | 1          | 4          | 30         | 1          | 46.81006         | -92.08174         | 9/14/2014 0:03 | W          | 20230418   |
    | 1          | 5          | 47         | 1          | 46.81006         | -92.08174         | 9/14/2014 0:04 | S          | 20230418   |
    | 1          | 6          | 9          | 1          | 46.81006         | -92.08174         | 9/14/2014 0:05 | S          | 20230418   |
    | 1          | 7          | 53         | 1          | 46.81006         | -92.08174         | 9/14/2014 0:06 | N          | 20230418   |
    | 1          | 8          | 63         | 1          | 46.81006         | -92.08174         | 9/14/2014 0:07 | SW         | 20230418   |
    | 1          | 9          | 4          | 1          | 46.81006         | -92.08174         | 9/14/2014 0:08 | NE         | 20230418   |
    | 1          | 10         | 31         | 1          | 46.81006         | -92.08174         | 9/14/2014 0:09 | N          | 20230418   |
    +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+
  6. 写入数据至Parquet外表。

    INSERT INTO mc_oss_parquet_data_type_zstd PARTITION ( dt = '20230418') 
      VALUES  (1,16,76,1,46.81006,-92.08174,'9/14/2014 0:10','SW');
    
    -- 查询新写入的数据
    SELECT * FROM mc_oss_parquet_data_type_zstd WHERE dt = '20230418' AND recordid=16;

    返回结果如下:

    +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+
    | vehicleid  | recordid   | patientid  | calls      | locationlatitute | locationlongtitue | recordtime     | direction  | dt         |
    +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+
    | 1          | 16         | 76         | 1          | 46.81006         | -92.08174         | 9/14/2014 0:10 | SW         | 20230418   |
    +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+

常见问题

Parquet文件列类型与外表DDL类型不一致

  • 报错信息

    ODPS-0123131:User defined function exception - Traceback:
    java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.IntWritable 
       at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector.getPrimitiveJavaObject(WritableIntObjectInspector.java:46)
  • 错误描述

    Parquet文件的LongWritable字段类型与外表DDLINT类型不一致。

  • 解决方案

    外部表DDL中的INT类型需要改为BIGINT类型。

写外部表时报错java.lang.OutOfMemoryError

  • 报错信息

    ODPS-0123131:User defined function exception - Traceback:
    java.lang.OutOfMemoryError: Java heap space
    	at java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:77)
    	at org.apache.parquet.bytes.BytesInput$BAOS.<init>(BytesInput.java:175)
    	at org.apache.parquet.bytes.BytesInput$BAOS.<init>(BytesInput.java:173)
    	at org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:161)
  • 错误描述

    大量数据写入Parquet外表时出现OOM报错。

  • 解决方案

    建议创建外部表时,先调小mcfed.parquet.block.row.count.limit参数,如果还会发生OOM,或者输出文件太大,可以调小mcfed.parquet.page.size.row.check.max参数,更加频繁地检查内存。详情请参见独有参数

    在向Parquet外表写入数据前,加上如下参数。

    -- 设置UDF JVM Heap使用的最大内存。
    SET odps.sql.udf.jvm.memory=12288;
    -- 控制Runtimebatch size。
    SET odps.sql.executionengine.batch.rowcount =64;
    -- 设置每个Map Worker的内存大小。
    SET odps.stage.mapper.mem=12288;
    -- 修改每个Map Worker的输入数据量,即输入文件的分片大小,从而间接控制每个Map阶段下Worker的数量。
    SET odps.stage.mapper.split.size=64;