本文介绍Parquet格式的OSS外部表的创建、读取及写入方法。
适用范围
OSS外部表不支持cluster属性。
单个文件大小不能超过2GB,如果文件过大,建议拆分。
MaxCompute需要与OSS部署在同一地域。
支持数据类型
MaxCompute数据类型详情请参见1.0数据类型版本、2.0数据类型版本。
JNI模式:
set odps.ext.parquet.native=false,表示读外部表解析Parquet数据文件时,使用原有基于Java的开源社区实现,支持读和写。Native模式:
set odps.ext.parquet.native=true,表示读外部表解析Parquet数据文件时,使用新的基于C++的Native实现,仅支持读。模式
Java模式(读写)
Native模式(只读)
TINYINT


SMALLINT


INT


BIGINT


BINARY


FLOAT


DOUBLE


DECIMAL(precision,scale)


VARCHAR(n)


CHAR(n)


STRING


DATE


DATETIME


TIMESTAMP


TIMESTAMP_NTZ


BOOLEAN


ARRAY


MAP


STRUCT


JSON


支持压缩格式
当读写压缩属性的OSS文件时,需要在建表语句中添加
with serdeproperties属性配置,详情请参见with serdeproperties属性参数。支持读写的数据文件格式:以ZSTD、SNAPPY、GZIP方式压缩的Parquet。
创建外部表
语法结构
当Parquet文件中的Schema与外表Schema不一致时:
列数不一致:如果Parquet文件中的列数小于外表DDL的列数,则读取Parquet数据时,系统会将缺少的列值补充为NULL。反之(大于时),会丢弃超出的列数据。
列类型不一致:如果Parquet文件中的列类型与外表DDL中对应的列类型不一致,则读取Parquet数据时会报错。例如:使用STRING(或INT)类型接收Parquet文件中INT(或STRING)类型的数据,报错
ODPS-0123131:User defined function exception - Traceback:xxx。
精简语法结构
CREATE EXTERNAL TABLE [IF NOT EXISTS] <mc_oss_extable_name>
(
<col_name> <data_type>,
...
)
[COMMENT <table_comment>]
[PARTITIONED BY (<col_name> <data_type>, ...)]
STORED AS parquet
LOCATION '<oss_location>'
[tblproperties ('<tbproperty_name>'='<tbproperty_value>',...)];详细语法结构
CREATE EXTERNAL TABLE [IF NOT EXISTS] <mc_oss_extable_name>
(
<col_name> <data_type>,
...
)
[COMMENT <table_comment>]
[PARTITIONED BY (<col_name> <data_type>, ...)]
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH serdeproperties(
'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole',
'mcfed.parquet.compression'='ZSTD/SNAPPY/GZIP'
)
STORED AS parquet
LOCATION '<oss_location>'
;公共参数
公共参数说明请参见基础语法参数说明。
独有参数
with serdeproperties属性参数
property_name | 使用场景 | 说明 | property_value | 默认值 |
mcfed.parquet.compression | 当需要将Parquet数据以压缩方式写入OSS时,请添加该属性。 | Parquet压缩属性。Parquet数据默认不压缩。 |
| 无 |
mcfed.parquet.compression.codec.zstd.level | 当 | level值越大,压缩比越高,实测取值高时,写出数据的减少量非常有限,但时间和资源消耗快速增加,性价比明显降低,因此对于大数据读写压缩Parquet文件的场景,低level(level3~level5)的zstd压缩效果最好。例如: | 取值范围为1~22。 | 3 |
parquet.file.cache.size | 在处理Parquet数据场景中,如果需要提升读OSS数据文件性能,请添加该属性。 | 指定读OSS数据文件时,可缓存的数据量,单位为KB。 | 1024 | 无 |
parquet.io.buffer.size | 在处理Parquet数据场景中,如果需要提升读OSS数据文件性能,请添加该属性。 | 指定OSS数据文件大小超过1024 KB时,可缓存的数据量,单位为KB。 | 4096 | 无 |
tblproperties属性参数
property_name | 使用场景 | 说明 | property_value | 默认值 |
io.compression.codecs | 当OSS数据文件为Raw-Snappy格式时,请添加该属性。 | 内置的开源数据解析器SNAPPY格式场景。 配置该参数值为True时,MaxCompute才可以正常读取压缩数据,否则MaxCompute无法成功读取数据。 | com.aliyun.odps.io.compress.SnappyRawCodec。 | 无 |
odps.external.data.output.prefix (兼容odps.external.data.prefix) | 当需要添加输出文件的自定义前缀名时,请添加该属性。 |
| 符合条件的字符组合,例如'mc_'。 | 无 |
odps.external.data.enable.extension | 当需要显示输出文件的扩展名时,请添加该属性。 | True表示显示输出文件的扩展名,反之不显示扩展名。 |
| False |
odps.external.data.output.suffix | 当需要添加输出文件的自定义后缀名时,请添加该属性。 | 仅包含数字,字母,下划线(a-z, A-Z, 0-9, _)。 | 符合条件的字符组合,例如'_hangzhou'。 | 无 |
odps.external.data.output.explicit.extension | 当需要添加输出文件的自定义扩展名时,请添加该属性。 |
| 符合条件的字符组合,例如"jsonl"。 | 无 |
mcfed.parquet.compression | 当需要将Parquet数据以压缩方式写入OSS时,请添加该属性。 | Parquet压缩属性。Parquet数据默认不压缩。 |
| 无 |
mcfed.parquet.block.size | 控制Parquet文件的块大小,影响存储效率和读取性能。 | Parquet调优属性。定义Parquet块大小,以字节为单位。 | 非负整数 | 134217728 (128MB) |
mcfed.parquet.block.row.count.limit | 当向Parquet外部表写入数据时,限制每个行组中的记录数,避免内存溢出。 | Parquet调优属性。控制每行组(row group)的最大记录数。如果出现OOM的情况,可将参数适当调小。 参数使用建议:
| 非负整数 | 2147483647 (Integer.MAX_VALUE) |
mcfed.parquet.page.size.row.check.min | 当向Parquet外部表写入数据时,控制内存检查的频率,防止内存溢出。 | Parquet调优属性。限制内存检查之间的最小记录数。如果出现OOM的情况,可将参数适当调小。 | 非负整数 | 100 |
mcfed.parquet.page.size.row.check.max | 当向Parquet外部表写入数据时,控制内存检查的频率,防止内存溢出。 | Parquet调优属性。限制内存检查之间的最小记录数。如果出现OOM的情况,可将参数适当调小。 由于需要对内存使用情况进行频繁计算,该参数调整可能会带来额外开销。 参数使用建议:
| 非负整数 | 1000 |
mcfed.parquet.compression.codec.zstd.level | 当需要将Parquet数据以ZSTD的压缩形式写入OSS,指定ZSTD压缩算法的压缩级别时,请添加该属性。 | Parquet压缩属性。指ZSTD压缩算法的压缩级别。取值范围:1~22。 | 非负整数 | 3 |
写入数据
MaxCompute写入语法详情,请参见写入语法说明。
查询分析
SELECT语法详情,请参见查询语法说明。
优化查询计划详情,请参见查询优化。
若需要直读LOCATION文件,请参见特色功能:Schemaless Query。
查询优化:Parquet外部表支持通过开启PPD(即Predicate Push Down)实现查询优化。优化性能结果参见支持Predicate Push Down(Parquet PPD)。
在SQL前添加如下参数开启PPD:
-- PPD参数需在Native模式下使用,即Native开关需为true。 -- 开启parquet native reader。 SET odps.ext.parquet.native = true; -- 开启parquet ppd。 SET odps.sql.parquet.use.predicate.pushdown = true;
支持Predicate Push Down(Parquet PPD)
Parquet外部表本身不支持Predicate Push Down(Parquet PPD),执行带有WHERE过滤条件的查询时,MaxCompute默认会扫描所有数据,导致不必要的I/O、资源消耗和查询延迟。因此,MaxCompute新增了ParquetPDD参数,通过开启Parquet PDD,可在数据扫描阶段利用Parquet文件自身的元数据特性实现Parquet RowGroup级别的过滤,从而提升查询性能、降低资源消耗与成本。
使用方式
开启Predicate Push Down(Parquet PPD)
执行SQL查询前,通过
set命令设置以下两个Session级别的参数以开启Parquet PDD。-- 开启parquet native reader。 set odps.ext.parquet.native = true; -- 开启parquet ppd。 set odps.sql.parquet.use.predicate.pushdown = true;使用示例
基于1TB的TPCDS测试数据集,以
tpcds_1t_store_salesParquet外部表为例,开启PPD并执行过滤查询,总数据量为2879987999。-- 创建外部表tpcds_1t_store_sales。 CREATE EXTERNAL TABLE IF NOT EXISTS tpcds_1t_store_sales ( ss_sold_date_sk BIGINT, ss_sold_time_sk BIGINT, ss_item_sk BIGINT, ss_customer_sk BIGINT, ss_cdemo_sk BIGINT, ss_hdemo_sk BIGINT, ss_addr_sk BIGINT, ss_store_sk BIGINT, ss_promo_sk BIGINT, ss_ticket_number BIGINT, ss_quantity BIGINT, ss_wholesale_cost DECIMAL(7,2), ss_list_price DECIMAL(7,2), ss_sales_price DECIMAL(7,2), ss_ext_discount_amt DECIMAL(7,2), ss_ext_sales_price DECIMAL(7,2), ss_ext_wholesale_cost DECIMAL(7,2), ss_ext_list_price DECIMAL(7,2), ss_ext_tax DECIMAL(7,2), ss_coupon_amt DECIMAL(7,2), ss_net_paid DECIMAL(7,2), ss_net_paid_inc_tax DECIMAL(7,2), ss_net_profit DECIMAL(7,2) ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH serdeproperties( 'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole', 'mcfed.parquet.compression'='zstd' ) STORED AS parquet LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss_bucket_path/'; -- 使用1TB的TPCDS测试数据集。 INSERT OVERWRITE TABLE tpcds_1t_store_sales SELECT ss_sold_date_sk, ss_sold_time_sk, ss_item_sk, ss_customer_sk, ss_cdemo_sk, ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, ss_ticket_number, ss_quantity, ss_wholesale_cost, ss_list_price, ss_sales_price, ss_ext_discount_amt, ss_ext_sales_price, ss_ext_wholesale_cost, ss_ext_list_price, ss_ext_tax, ss_coupon_amt, ss_net_paid, ss_net_paid_inc_tax, ss_net_profit FROM bigdata_public_dataset.tpcds_1t.store_sales; -- 执行查询。 SELECT SUM(ss_sold_date_sk) FROM tpcds_1t_store_sales WHERE ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;
性能对比结果
开启PPD功能可减少数据扫描量,从而降低查询延迟和资源消耗。
模式 | 表总数据量 | 扫描条数 | 扫描Bytes | Mapper耗时 | 总资源消耗 | 说明 |
Parquet外表+未开启PPD | 2879987999 | 2879987999(100%) | 19386793984(100%) | 18s | cpu 19.25 Core * Min, memory 24.07 GB * Min 100% | |
Parquet外表+开启PPD | 2879987999 | 762366649(26.47%) | 3339386880(17.22%) | 12s | cpu 11.47 Core * Min, memory 14.33 GB * Min ~59.58% | 降低扫描数据量后,延迟和资源消耗降低很明显 |
内表+开启PPD | 2879987999 | 32830000(1.14%) | 1633880386(8.43%) | 9s | cpu 5.62 Core * Min, memory 7.02 GB * Min ~29.19% | 内表数据有排序,因此PPD效果更加极致 |
测试详情
Parquet外表+未开启PPD
SET odps.ext.parquet.native = true; SET odps.sql.parquet.use.predicate.pushdown = false; SELECT SUM(ss_sold_date_sk) FROM tpcds_1t_store_sales WHERE ss_store_sk = 2 AND ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;


Parquet外表+开启PPD
SET odps.ext.parquet.native = true; SET odps.sql.parquet.use.predicate.pushdown = true; SELECT SUM(ss_sold_date_sk) FROM tpcds_1t_store_sales WHERE ss_store_sk = 2 AND ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;

存在很多空Mapper,无需读数据:

实际裁减RowGroup的日志:

内表+开启PPD
内表数据有排序属性,因此裁减效果更明显。
SELECT SUM(ss_sold_date_sk) FROM bigdata_public_dataset.tpcds_1t.store_sales WHERE ss_store_sk = 2 AND ss_sold_date_sk >= 2451871 AND ss_sold_date_sk <= 2451880;


场景示例
本示例将创建以ZSTD压缩的Parquet格式外表,并进行读取和写入操作。
前置准备
已准备好OSS存储空间(Bucket)、OSS目录。具体操作请参见创建存储空间、管理目录。
由于MaxCompute只在部分地域部署,跨地域的数据连通性可能存在问题,因此建议Bucket与MaxCompute项目所在地域保持一致。
授权
具备访问OSS的权限。阿里云账号(主账号)、RAM用户或RAMRole身份可以访问OSS外部表,授权信息请参见OSS的STS模式授权。
已具备在MaxCompute项目中创建表(CreateTable)的权限。表操作的权限信息请参见MaxCompute权限。
准备ZSTD格式数据文件。
在示例数据的
oss-mc-testBucket中创建parquet_zstd_jni/dt=20230418目录层级,并将存放在分区目录dt=20230418下。创建ZSTD压缩格式的Parquet外部表。
CREATE EXTERNAL TABLE IF NOT EXISTS mc_oss_parquet_data_type_zstd ( vehicleId INT, recordId INT, patientId INT, calls INT, locationLatitute DOUBLE, locationLongtitue DOUBLE, recordTime STRING, direction STRING ) PARTITIONED BY (dt STRING ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH serdeproperties( 'odps.properties.rolearn'='acs:ram::<uid>:role/aliyunodpsdefaultrole', 'mcfed.parquet.compression'='zstd' ) STORED AS parquet LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/parquet_zstd_jni/';引入分区数据。当创建的OSS外部表为分区表时,需要额外执行引入分区数据的操作,更多操作请参见补全OSS外部表分区数据语法。
-- 引入分区数据。 MSCK REPAIR TABLE mc_oss_parquet_data_type_zstd ADD PARTITIONS;读取Parquet外表数据。
SELECT * FROM mc_oss_parquet_data_type_zstd WHERE dt='20230418' LIMIT 10;部分返回结果如下:
+------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+ | vehicleid | recordid | patientid | calls | locationlatitute | locationlongtitue | recordtime | direction | dt | +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+ | 1 | 12 | 76 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:10 | SW | 20230418 | | 1 | 1 | 51 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:00 | S | 20230418 | | 1 | 2 | 13 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:01 | NE | 20230418 | | 1 | 3 | 48 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:02 | NE | 20230418 | | 1 | 4 | 30 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:03 | W | 20230418 | | 1 | 5 | 47 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:04 | S | 20230418 | | 1 | 6 | 9 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:05 | S | 20230418 | | 1 | 7 | 53 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:06 | N | 20230418 | | 1 | 8 | 63 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:07 | SW | 20230418 | | 1 | 9 | 4 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:08 | NE | 20230418 | | 1 | 10 | 31 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:09 | N | 20230418 | +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+写入数据至Parquet外表。
INSERT INTO mc_oss_parquet_data_type_zstd PARTITION ( dt = '20230418') VALUES (1,16,76,1,46.81006,-92.08174,'9/14/2014 0:10','SW'); -- 查询新写入的数据 SELECT * FROM mc_oss_parquet_data_type_zstd WHERE dt = '20230418' AND recordid=16;返回结果如下:
+------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+ | vehicleid | recordid | patientid | calls | locationlatitute | locationlongtitue | recordtime | direction | dt | +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+ | 1 | 16 | 76 | 1 | 46.81006 | -92.08174 | 9/14/2014 0:10 | SW | 20230418 | +------------+------------+------------+------------+------------------+-------------------+----------------+------------+------------+
常见问题
Parquet文件列类型与外表DDL类型不一致
报错信息
ODPS-0123131:User defined function exception - Traceback: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.IntWritable at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector.getPrimitiveJavaObject(WritableIntObjectInspector.java:46)错误描述
Parquet文件的LongWritable字段类型与外表DDL的INT类型不一致。
解决方案
外部表DDL中的INT类型需要改为BIGINT类型。
写外部表时报错java.lang.OutOfMemoryError
报错信息
ODPS-0123131:User defined function exception - Traceback: java.lang.OutOfMemoryError: Java heap space at java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:77) at org.apache.parquet.bytes.BytesInput$BAOS.<init>(BytesInput.java:175) at org.apache.parquet.bytes.BytesInput$BAOS.<init>(BytesInput.java:173) at org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:161)错误描述
大量数据写入Parquet外表时出现OOM报错。
解决方案
建议创建外部表时,先调小
mcfed.parquet.block.row.count.limit参数,如果还会发生OOM,或者输出文件太大,可以调小mcfed.parquet.page.size.row.check.max参数,更加频繁地检查内存。详情请参见独有参数。在向Parquet外表写入数据前,加上如下参数。
-- 设置UDF JVM Heap使用的最大内存。 SET odps.sql.udf.jvm.memory=12288; -- 控制Runtime侧batch size。 SET odps.sql.executionengine.batch.rowcount =64; -- 设置每个Map Worker的内存大小。 SET odps.stage.mapper.mem=12288; -- 修改每个Map Worker的输入数据量,即输入文件的分片大小,从而间接控制每个Map阶段下Worker的数量。 SET odps.stage.mapper.split.size=64;