使用Schemaless Query方式读取湖上Parquet数据

本文以E-MapReduceServerless Spark集群为例,为您介绍在MaxCompute中如何使用Schemaless Query的方式读取Spark SQL生成的Parquet文件,并在计算完成后将结果通过UNLOAD命令传回至OSS。

前提条件

步骤一:基于Serverless Spark生成Parquet数据

  1. 登录E-MapReduce控制台。在左侧导航栏,选择EMR Serverless > Spark

  2. Spark页面单击已创建的工作空间名称,进入EMR Serverless Spark页面后,单击左侧的数据开发

  3. 新建Spark SQL任务,输入如下SQL命令以创建Parquet格式表并写入数据,然后单击运行

    重要

    执行下述命令前,请确保页面右上角已选择的数据目录和数据库所关联的路径为已创建的OSS Bucket路径。

    CREATE TABLE example_table01 (
        id INT,
        name STRING,
        age INT
    ) USING PARQUET;
    
    INSERT INTO example_table01 VALUES
    (1, 'Alice', 30),
    (2, 'Bob', 25),
    (3, 'Charlie', 35),
    (4, 'David', 40),
    (5, 'Eve', 32),
    (6, 'Frank', 28),
    (7, 'Grace', 33),
    (8, 'Hannah', 29),
    (9, 'Ian', 36),
    (10, 'Julia', 31);
    
    SELECT * FROM example_table01;

    返回结果如下:

    image

  4. 运行成功后,您可在DLF数据湖构建控制台元数据管理页面查看已生成的example_table01表。image

    也可在OSS Bucket目录下查看生成的Parquet文件。image

步骤二:通过Schemaless Query读取数据

MaxCompute中读取Parquet文件,详情请参见特色功能:Schemaless Query

由于在写入Parquet表时会生成一个名为_SUCCESS的文件,您需要配合file_pattern_blacklist参数,将_SUCCESS文件添加至黑名单中不读取该文件,如未添加该参数,则会报错。

SELECT * FROM 
LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/example_table01/' 
(
  'file_format'='parquet',
  'file_pattern_blacklist'='.*_SUCCESS.*'
);

返回结果如下:

+------------+------------+------------+
| id         | name       | age        |
+------------+------------+------------+
| 1          | Alice      | 30         |
| 2          | Bob        | 25         |
| 3          | Charlie    | 35         |
| 4          | David      | 40         |
| 5          | Eve        | 32         |
| 6          | Frank      | 28         |
| 7          | Grace      | 33         |
| 8          | Hannah     | 29         |
| 9          | Ian        | 36         |
| 10         | Julia      | 31         |
+------------+------------+------------+

步骤三:使用SQL进行计算

查询年龄大于30岁的总人数。

SELECT COUNT(*) FROM 
LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/example_table01/' 
(
  'file_format'='parquet',
  'file_pattern_blacklist'='.*_SUCCESS.*'
)
WHERE age>30;

返回结果如下:

+------------+
| _c0        |
+------------+
| 6          |
+------------+

步骤四:将计算结果通过UNLOAD命令回传至OSS

MaxCompute支持您将MaxCompute项目中的数据导出至外部存储OSS,以供其他计算引擎使用。具体使用请参见UNLOAD

  1. MaxCompute中执行如下命令,将步骤三:使用SQL进行计算的计算结果以Parquet格式导出至OSS。

    执行下述代码前,您需在OSS Bucket oss-mc-test下创建unload_schemaless目录。

    UNLOAD FROM  
    (
      SELECT COUNT(*) FROM 
      LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/example_table01/' 
      ('file_format'='parquet','file_pattern_blacklist'='.*_SUCCESS.*')
      WHERE age>30
    )
    INTO
    LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/oss-mc-test/unload_schemaless/'
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
    WITH SERDEPROPERTIES ('odps.properties.rolearn'='acs:ram::<uid>:role/AliyunODPSDefaultRole') 
    STORED AS PARQUET 
    PROPERTIES('mcfed.parquet.compression'='SNAPPY')
    ;
  2. 登录OSS控制台,查看是否UNLOAD成功。image