本文为您介绍在EMR Serverless Spark集群中如何以Iceberg REST对接 DLF Catalog。
前提条件
使用限制
目前支持以下类型的任务:
SQL会话:管理SQL会话。
Thrift Server:管理Spark Thrift Server会话。
批任务:批任务开发。
步骤一:Catalog授权
登录数据湖构建控制台。
在Catalog列表页面,单击Catalog名称,进入Catalog详情页。
单击权限页签,单击授权。
在授权页面,配置以下信息,单击确定。
用户/角色:选择RAM用户/RAM角色。
选择授权对象:在下拉列表中选择AliyunECSInstanceForEMRRole。
说明如果用户下拉列表中未找到AliyunECSInstanceForEMRRole,可以在用户管理页面单击同步。
预置权限类型:选择Data Editor。
步骤二:读写数据
连接Catalog
创建SQL会话,请参见管理SQL会话。在自定义配置下的Spark 配置中添加以下配置。
重要以下配置示例中,
iceberg_catalog为自定义的 Catalog 名称,用于在 Spark 中注册一个基于 Iceberg REST Catalog 的 Iceberg 表管理服务。该Catalog 通过 REST API 与阿里云 DLF服务对接,您可根据实际环境修改 Catalog 名称及相关参数。${regionID}:替换为实际的地域ID,例如cn-hangzhou,请参见服务接入点。${catalogName}:替换为在DLF中创建的Catalog 名称。${access_key_id},${access_key_secret}:阿里云账户AK/SK。
# 启用Iceberg Spark扩展 spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions # 注册名为 iceberg_catalog 的 Spark Catalog spark.sql.catalog.iceberg_catalog org.apache.iceberg.spark.SparkCatalog # 指定底层Catalog实现为Iceberg REST Catalog spark.sql.catalog.iceberg_catalog.catalog-impl org.apache.iceberg.rest.RESTCatalog # DLF Iceberg 服务的 REST API 地址(需替换 ${regionID} 为实际地域 ID) spark.sql.catalog.iceberg_catalog.uri http://${regionID}-vpc.dlf.aliyuncs.com/iceberg # 指定关联的 DLF Catalog 名称 spark.sql.catalog.iceberg_catalog.warehouse ${catalogName} # 使用 DLF 定制的 FileIO 实现 spark.sql.catalog.iceberg_catalog.io-impl org.apache.iceberg.rest.DlfFileIO # 启用 SigV4 签名认证 spark.sql.catalog.iceberg_catalog.rest.auth.type sigv4 spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type none spark.sql.catalog.iceberg_catalog.rest.signing-region ${regionID} spark.sql.catalog.iceberg_catalog.rest.signing-name DlfNext # 访问凭证 spark.sql.catalog.iceberg_catalog.rest.access-key-id ${access_key_id} spark.sql.catalog.iceberg_catalog.rest.secret-access-key ${access_key_secret}读写数据
SQL任务完整的开发流程示例,请参见SparkSQL开发快速入门。
说明不指定数据库时,创建数据表会默认建在Catalog下的
default数据库中,也可创建并指定其他数据库。-- 创建数据库 CREATE DATABASE IF NOT EXISTS db; -- 创建非分区表 CREATE TABLE iceberg_catalog.db.tbl ( id BIGINT NOT NULL COMMENT 'unique id', data STRING ) USING iceberg; -- 插入非分区表数据 INSERT INTO iceberg_catalog.db.tbl VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'); -- 查询非分区表所有数据 SELECT * FROM iceberg_catalog.db.tbl; -- 按条件查询非分区表 SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2; -- 更新非分区表数据 UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3; -- 再次查询确认更新 SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3; -- 删除非分区表数据 DELETE FROM iceberg_catalog.db.tbl WHERE id = 1; -- 再次查询确认删除 SELECT * FROM iceberg_catalog.db.tbl; -- 创建分区表 CREATE TABLE iceberg_catalog.db.part_tbl ( id BIGINT, data STRING, category STRING, ts TIMESTAMP ) USING iceberg PARTITIONED BY (bucket(16, id), days(ts), category); -- 插入分区表数据 INSERT INTO iceberg_catalog.db.part_tbl VALUES (100, 'Data1', 'A', to_timestamp('2025-01-01 12:00:00')), (200, 'Data2', 'B', to_timestamp('2025-01-02 14:00:00')), (300, 'Data3', 'A', to_timestamp('2025-01-01 15:00:00')), (400, 'Data4', 'C', to_timestamp('2025-01-03 10:00:00')); -- 查询分区表所有数据 SELECT * FROM iceberg_catalog.db.part_tbl; -- 查询 bucket(16, id) = 0 的数据 SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0; -- 查询 day(ts) = '2025-01-01' 的数据 SELECT * FROM iceberg_catalog.db.part_tbl WHERE days(ts) = '2025-01-01'; -- 查询某个 category 的数据 SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A'; -- 多条件组合查询(bucket + day + category) SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0 AND days(ts) = '2025-01-01' AND category = 'A'; -- 聚合统计每个分类的数据数量 SELECT category, COUNT(*) AS count FROM iceberg_catalog.db.part_tbl GROUP BY category; -- 删除数据库(谨慎操作),删除前需确保该db下tbl为空 -- DROP DATABASE iceberg_catalog.db;