通过Spark SQL读写Iceberg外表

更新时间:

本文主要介绍如何在云原生数据仓库 AnalyticDB MySQL 版中使用Spark SQL读写Iceberg外表。

前提条件

步骤一:进入数据开发

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。

  2. 在左侧导航栏,单击作业开发 > SQL开发

  3. SQLConsole窗口,选择Spark引擎和资源组(Job型资源组或Spark引擎的Interactive型资源组)。

步骤二:创建外库与Iceberg外表

说明

您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。

湖存储表

  1. 创建数据库。

    CREATE DATABASE adb_external_db_iceberg 
    WITH DBPROPERTIES ('adb_lake_bucket' = 'adb-lake-cn-shanghai-6gml****');

    参数说明:

    参数

    说明

    adb_lake_bucket

    指定湖存储表数据的存储位置。

    在数据库建表语句中指定湖存储后,数据库中的所有表将统一存储于该湖存储中。如果您不希望将数据库下的所有表都存储到湖存储中,可以在建表时再设置该参数。

  2. 创建Iceberg外表。

    SET spark.adb.lakehouse.enabled=true;                      ----开启湖存储  
    CREATE TABLE adb_external_db_iceberg.test_iceberg_tbl (
      `id` int,
      `name` string,
      `age` int
    ) USING iceberg
    PARTITIONED BY (age)
    TBLPROPERTIES ( 'adb_lake_bucket' = 'adb-lake-cn-shanghai-6gml****' );

    参数说明:

    参数

    说明

    adb_lake_bucket

    指定湖存储表数据的存储位置。

    • 若您已在创建数据库时指定湖存储,数据库中的所有表将统一存储于该湖存储中,在建表时您无需再次指定。

    • 若您在创建数据库时未指定湖存储,此处必须显式指定,否则建表会报错。指定后,该表的数据会存储到该湖存储中。

    • 若在创建数据库和创建表时均显式指定湖存储,则该表的数据会存储在建表时指定的湖存储中;而数据库下的其他表则会存储在建库语句指定的湖存储中。

非湖存储表

  1. 检查已有数据库中是否可以创建Iceberg外表,或新建数据库。

    • 使用SHOW CREATE DATABASE语句查看数据库的DDL语句,满足以下两种情况,可以使用已有数据库,否则需新建数据库。

      • DDL语句未指定Location

      • DDL语句指定了Location,且Catalog参数的值为mix

    • 新建数据库。

      CREATE DATABASE adb_external_db_iceberg;
  2. 创建Iceberg外表。

    重要

    如果数据库指定了Location,需要确保创建、读写Iceberg外表时spark.iceberg.warehouse参数指定的OSS路径前缀和Location一致。

    SET spark.adb.version=3.5;                                  --指定Spark版本,必须指定为3.5
    SET spark.iceberg.warehouse=oss://testBucketName/iceberg/;  --Iceberg外表元数据与数据文件的存储路径
    CREATE TABLE adb_external_db_iceberg.test_iceberg_tbl (
      `id` int,
      `name` string,
      `age` int
    ) USING iceberg
    PARTITIONED BY (age);

步骤三:写入或删除Iceberg外表数据

湖存储表

写入数据

在执行写入操作时,需要在SQL语句前添加如下参数,否则写入会失败:

SET spark.adb.lakehouse.enabled=true;                      ----开启湖存储
  • INSERT INTO写入

    INSERT INTO adb_external_db_iceberg.test_iceberg_tbl VALUES (1, 'lisa', 10), (2, 'jams', 20);
  • INSERT OVERWRITE写入

    INSERT OVERWRITE adb_external_db_iceberg.test_iceberg_tbl VALUES (1, 'lisa', 10), (2, 'jams', 30);
  • INSERT OVERWRITE静态分区写入

    INSERT OVERWRITE adb_external_db_iceberg.test_iceberg_tbl PARTITION(age=10) VALUES (1, 'anna');
  • INSERT OVERWRITE动态分区写入

    SET spark.sql.sources.partitionOverwriteMode=dynamic;    --仅覆盖动态分区,若未配置则会覆盖全表数据
    INSERT OVERWRITE adb_external_db_iceberg.test_iceberg_tbl PARTITION(age) VALUES (1, 'bom', 10);
  • UPDATE写入

    UPDATE adb_external_db_iceberg.test_iceberg_tbl SET name = 'box' WHERE id = 2;

删除数据

SET spark.adb.lakehouse.enabled=true;                      ----开启湖存储
DELETE FROM adb_external_db_iceberg.test_iceberg_tbl WHERE id = 1;
DELETE FROM adb_external_db_iceberg.test_iceberg_tbl WHERE age = 20;

非湖存储表

在执行写入和删除操作时,需要在SQL语句前添加如下参数,否则写入和删除会失败:

SET spark.adb.version=3.5;                                  --指定Spark版本,必须指定为3.5
SET spark.iceberg.warehouse=oss://testBucketName/iceberg/;  --Iceberg外表元数据与数据文件的存储路径

写入数据

  • INSERT INTO写入

    INSERT INTO adb_external_db_iceberg.test_iceberg_tbl VALUES (1, 'Frank', 10), (2, 'Amy', 10);
  • INSERT OVERWRITE写入

    INSERT OVERWRITE adb_external_db_iceberg.test_iceberg_tbl VALUES (1, 'Frank', 10), (2, 'Amy', 20);
  • INSERT OVERWRITE静态分区写入

    INSERT OVERWRITE adb_external_db_iceberg.test_iceberg_tbl PARTITION(age=10) VALUES (1, 'Frank');
  • INSERT OVERWRITE动态分区写入

    SET spark.sql.sources.partitionOverwriteMode=dynamic;    --仅覆盖动态分区,若未配置则会覆盖全表数据
    INSERT OVERWRITE adb_external_db_iceberg.test_iceberg_tbl PARTITION(age) VALUES (1, 'Bom', 10);
  • UPDATE写入

    UPDATE adb_external_db_iceberg.test_iceberg_tbl SET name = 'box' WHERE id = 2;

删除数据

SET spark.adb.version=3.5;                                  --指定Spark版本,必须指定为3.5
SET spark.iceberg.warehouse=oss://testBucketName/iceberg/;  --Iceberg外表元数据与数据文件的存储路径
DELETE FROM adb_external_db_iceberg.test_iceberg_tbl WHERE id = 1;
DELETE FROM adb_external_db_iceberg.test_iceberg_tbl WHERE age = 20;

步骤四:查询Iceberg外表数据

湖存储表

SET spark.adb.lakehouse.enabled=true;                      ----开启湖存储
SELECT * FROM adb_external_db_iceberg.test_iceberg_tbl;

返回结果如下:

+---+----+---+
|id |name|age|
+---+----+---+
|1  |anna|10 |
|2  |jams|20 |
+---+----+---+

非湖存储表

SET spark.adb.version=3.5;                                  --指定Spark版本,必须指定为3.5
SET spark.iceberg.warehouse=oss://testBucketName/iceberg/;  --Iceberg外表元数据与数据文件的存储路径
SELECT * FROM adb_external_db_iceberg.test_iceberg_tbl;

返回结果如下:

+---+----+---+
|id |name|age|
+---+----+---+
|1  |anna|10 |
|2  |jams|20 |
+---+----+---+

步骤五:删除Iceberg外表

湖存储表

删除AnalyticDB for MySQL中的Iceberg托管湖存储表元数据及数据:

SET spark.adb.lakehouse.enabled=true;
DROP TABLE adb_external_db_iceberg.test_iceberg_tbl;

非湖存储表

  • 仅在AnalyticDB for MySQL中删除Iceberg外表,OSS中的Iceberg外表数据不删除:

    DROP TABLE adb_external_db_iceberg.test_iceberg_tbl;
  • 同时删除AnalyticDB for MySQL中的Iceberg外表和OSSIceberg外表的数据:

    DROP TABLE adb_external_db_iceberg.test_iceberg_tbl purge;