通过Spark SQL读写Iceberg外表
本文主要介绍如何在云原生数据仓库 AnalyticDB MySQL 版中使用Spark SQL读写Iceberg外表。
前提条件
集群的产品系列为企业版、基础版或湖仓版。
企业版和基础版预留资源需大于0 ACU。
湖仓版集群存储预留资源需大于0 ACU。
集群内核版本需为3.2.5.0及以上版本。
说明请在云原生数据仓库AnalyticDB MySQL控制台集群信息页面的配置信息区域,查看和升级内核版本。
已创建数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。
集群与OSS存储空间位于相同地域。
已开通湖存储。
说明仅读写湖存储表时满足该条件。
步骤一:进入数据开发
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。
在左侧导航栏,单击
。在SQLConsole窗口,选择Spark引擎和资源组(Job型资源组或Spark引擎的Interactive型资源组)。
步骤二:创建外库与Iceberg外表
您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。
湖存储表
创建数据库。
CREATE DATABASE adb_external_db_iceberg WITH DBPROPERTIES ('adb_lake_bucket' = 'adb-lake-cn-shanghai-6gml****');
参数说明:
参数
说明
adb_lake_bucket
指定湖存储表数据的存储位置。
在数据库建表语句中指定湖存储后,数据库中的所有表将统一存储于该湖存储中。如果您不希望将数据库下的所有表都存储到湖存储中,可以在建表时再设置该参数。
创建Iceberg外表。
SET spark.adb.lakehouse.enabled=true; ----开启湖存储 CREATE TABLE adb_external_db_iceberg.test_iceberg_tbl ( `id` int, `name` string, `age` int ) USING iceberg PARTITIONED BY (age) TBLPROPERTIES ( 'adb_lake_bucket' = 'adb-lake-cn-shanghai-6gml****' );
参数说明:
参数
说明
adb_lake_bucket
指定湖存储表数据的存储位置。
若您已在创建数据库时指定湖存储,数据库中的所有表将统一存储于该湖存储中,在建表时您无需再次指定。
若您在创建数据库时未指定湖存储,此处必须显式指定,否则建表会报错。指定后,该表的数据会存储到该湖存储中。
若在创建数据库和创建表时均显式指定湖存储,则该表的数据会存储在建表时指定的湖存储中;而数据库下的其他表则会存储在建库语句指定的湖存储中。
非湖存储表
检查已有数据库中是否可以创建Iceberg外表,或新建数据库。
使用
SHOW CREATE DATABASE
语句查看数据库的DDL语句,满足以下两种情况,可以使用已有数据库,否则需新建数据库。DDL语句未指定
Location
。DDL语句指定了
Location
,且Catalog
参数的值为mix
。
新建数据库。
CREATE DATABASE adb_external_db_iceberg;
创建Iceberg外表。
重要如果数据库指定了
Location
,需要确保创建、读写Iceberg外表时spark.iceberg.warehouse
参数指定的OSS路径前缀和Location
一致。SET spark.adb.version=3.5; --指定Spark版本,必须指定为3.5 SET spark.iceberg.warehouse=oss://testBucketName/iceberg/; --Iceberg外表元数据与数据文件的存储路径 CREATE TABLE adb_external_db_iceberg.test_iceberg_tbl ( `id` int, `name` string, `age` int ) USING iceberg PARTITIONED BY (age);
步骤三:写入或删除Iceberg外表数据
湖存储表
写入数据
在执行写入操作时,需要在SQL语句前添加如下参数,否则写入会失败:
SET spark.adb.lakehouse.enabled=true; ----开启湖存储
INSERT INTO写入
INSERT INTO adb_external_db_iceberg.test_iceberg_tbl VALUES (1, 'lisa', 10), (2, 'jams', 20);
INSERT OVERWRITE写入
INSERT OVERWRITE adb_external_db_iceberg.test_iceberg_tbl VALUES (1, 'lisa', 10), (2, 'jams', 30);
INSERT OVERWRITE静态分区写入
INSERT OVERWRITE adb_external_db_iceberg.test_iceberg_tbl PARTITION(age=10) VALUES (1, 'anna');
INSERT OVERWRITE动态分区写入
SET spark.sql.sources.partitionOverwriteMode=dynamic; --仅覆盖动态分区,若未配置则会覆盖全表数据 INSERT OVERWRITE adb_external_db_iceberg.test_iceberg_tbl PARTITION(age) VALUES (1, 'bom', 10);
UPDATE写入
UPDATE adb_external_db_iceberg.test_iceberg_tbl SET name = 'box' WHERE id = 2;
删除数据
SET spark.adb.lakehouse.enabled=true; ----开启湖存储
DELETE FROM adb_external_db_iceberg.test_iceberg_tbl WHERE id = 1;
DELETE FROM adb_external_db_iceberg.test_iceberg_tbl WHERE age = 20;
非湖存储表
在执行写入和删除操作时,需要在SQL语句前添加如下参数,否则写入和删除会失败:
SET spark.adb.version=3.5; --指定Spark版本,必须指定为3.5
SET spark.iceberg.warehouse=oss://testBucketName/iceberg/; --Iceberg外表元数据与数据文件的存储路径
写入数据
INSERT INTO写入
INSERT INTO adb_external_db_iceberg.test_iceberg_tbl VALUES (1, 'Frank', 10), (2, 'Amy', 10);
INSERT OVERWRITE写入
INSERT OVERWRITE adb_external_db_iceberg.test_iceberg_tbl VALUES (1, 'Frank', 10), (2, 'Amy', 20);
INSERT OVERWRITE静态分区写入
INSERT OVERWRITE adb_external_db_iceberg.test_iceberg_tbl PARTITION(age=10) VALUES (1, 'Frank');
INSERT OVERWRITE动态分区写入
SET spark.sql.sources.partitionOverwriteMode=dynamic; --仅覆盖动态分区,若未配置则会覆盖全表数据 INSERT OVERWRITE adb_external_db_iceberg.test_iceberg_tbl PARTITION(age) VALUES (1, 'Bom', 10);
UPDATE写入
UPDATE adb_external_db_iceberg.test_iceberg_tbl SET name = 'box' WHERE id = 2;
删除数据
SET spark.adb.version=3.5; --指定Spark版本,必须指定为3.5
SET spark.iceberg.warehouse=oss://testBucketName/iceberg/; --Iceberg外表元数据与数据文件的存储路径
DELETE FROM adb_external_db_iceberg.test_iceberg_tbl WHERE id = 1;
DELETE FROM adb_external_db_iceberg.test_iceberg_tbl WHERE age = 20;
步骤四:查询Iceberg外表数据
湖存储表
SET spark.adb.lakehouse.enabled=true; ----开启湖存储
SELECT * FROM adb_external_db_iceberg.test_iceberg_tbl;
返回结果如下:
+---+----+---+
|id |name|age|
+---+----+---+
|1 |anna|10 |
|2 |jams|20 |
+---+----+---+
非湖存储表
SET spark.adb.version=3.5; --指定Spark版本,必须指定为3.5
SET spark.iceberg.warehouse=oss://testBucketName/iceberg/; --Iceberg外表元数据与数据文件的存储路径
SELECT * FROM adb_external_db_iceberg.test_iceberg_tbl;
返回结果如下:
+---+----+---+
|id |name|age|
+---+----+---+
|1 |anna|10 |
|2 |jams|20 |
+---+----+---+
步骤五:删除Iceberg外表
湖存储表
删除AnalyticDB for MySQL中的Iceberg托管湖存储表元数据及数据:
SET spark.adb.lakehouse.enabled=true;
DROP TABLE adb_external_db_iceberg.test_iceberg_tbl;
非湖存储表
仅在AnalyticDB for MySQL中删除Iceberg外表,OSS中的Iceberg外表数据不删除:
DROP TABLE adb_external_db_iceberg.test_iceberg_tbl;
同时删除AnalyticDB for MySQL中的Iceberg外表和OSS中Iceberg外表的数据:
DROP TABLE adb_external_db_iceberg.test_iceberg_tbl purge;