EMR Serverless Spark以Iceberg REST访问DLF Catalog

本文为您介绍在EMR Serverless Spark集群中如何以Iceberg REST对接 DLF Catalog。

前提条件

  • 已创建与DLF实例同地域的Serverless Spark工作空间,详情请参见创建工作空间

  • 使用相关镜像:esr-4.5.0 (Spark 3.5.2, Scala 2.12)及以上版本。

    说明

    如果是RAM用户,在进行数据操作之前,需要先授予相应的资源权限。详情请参见数据授权管理

使用限制

目前支持以下类型的任务:

步骤一:Catalog授权

  1. 登录数据湖构建控制台

  2. Catalog列表页面,单击Catalog名称,进入Catalog详情页。

  3. 单击权限页签,单击授权

  4. 在授权页面,配置以下信息,单击确定

    • 用户/角色:选择RAM用户/RAM角色

    • 选择授权对象:在下拉列表中选择AliyunECSInstanceForEMRRole

      说明

      如果用户下拉列表中未找到AliyunECSInstanceForEMRRole,可以在用户管理页面单击同步。

    • 预置权限类型:选择Data Editor。

步骤二:读写数据

  1. 连接Catalog

    创建SQL会话,请参见管理SQL会话。在自定义配置下的Spark 配置中添加以下配置。

    重要

    以下配置示例中,iceberg_catalog为自定义的 Catalog 名称,用于在 Spark 中注册一个基于 Iceberg REST Catalog 的 Iceberg 表管理服务。该Catalog 通过 REST API 与阿里云 DLF服务对接,您可根据实际环境修改 Catalog 名称及相关参数。

    • ${regionID}:替换为实际的地域ID,例如 cn-hangzhou,请参见服务接入点

    • ${catalogName}:替换为在DLF中创建的Catalog 名称。

    • ${access_key_id}${access_key_secret}:阿里云账户AK/SK。

    # 启用Iceberg Spark扩展
    spark.sql.extensions                                org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
    # 注册名为 iceberg_catalog 的 Spark Catalog
    spark.sql.catalog.iceberg_catalog                   org.apache.iceberg.spark.SparkCatalog
    # 指定底层Catalog实现为Iceberg REST Catalog
    spark.sql.catalog.iceberg_catalog.catalog-impl      org.apache.iceberg.rest.RESTCatalog
    # DLF Iceberg 服务的 REST API 地址(需替换 ${regionID} 为实际地域 ID)
    spark.sql.catalog.iceberg_catalog.uri               http://${regionID}-vpc.dlf.aliyuncs.com/iceberg
    # 指定关联的 DLF Catalog 名称
    spark.sql.catalog.iceberg_catalog.warehouse         ${catalogName}
    # 使用 DLF 定制的 FileIO 实现
    spark.sql.catalog.iceberg_catalog.io-impl           org.apache.iceberg.rest.DlfFileIO
    # 启用 SigV4 签名认证
    spark.sql.catalog.iceberg_catalog.rest.auth.type    sigv4
    spark.sql.catalog.iceberg_catalog.rest.auth.sigv4.delegate-auth-type  none
    spark.sql.catalog.iceberg_catalog.rest.signing-region  ${regionID}
    spark.sql.catalog.iceberg_catalog.rest.signing-name  DlfNext
    # 访问凭证
    spark.sql.catalog.iceberg_catalog.rest.access-key-id ${access_key_id}
    spark.sql.catalog.iceberg_catalog.rest.secret-access-key ${access_key_secret}
  2. 读写数据

    SQL任务完整的开发流程示例,请参见SparkSQL开发快速入门

    说明

    不指定数据库时,创建数据表会默认建在Catalog下的default数据库中,也可创建并指定其他数据库。

    -- 创建数据库
    CREATE DATABASE IF NOT EXISTS db;
    
    -- 创建非分区表
    CREATE TABLE iceberg_catalog.db.tbl (
        id BIGINT NOT NULL COMMENT 'unique id',
        data STRING
    )
    USING iceberg;
    
    -- 插入非分区表数据
    INSERT INTO iceberg_catalog.db.tbl VALUES
    (1, 'Alice'),
    (2, 'Bob'),
    (3, 'Charlie');
    
    -- 查询非分区表所有数据
    SELECT * FROM iceberg_catalog.db.tbl;
    
    -- 按条件查询非分区表
    SELECT * FROM iceberg_catalog.db.tbl WHERE id = 2;
    
    -- 更新非分区表数据
    UPDATE iceberg_catalog.db.tbl SET data = 'David' WHERE id = 3;
    
    -- 再次查询确认更新
    SELECT * FROM iceberg_catalog.db.tbl WHERE id = 3;
    
    -- 删除非分区表数据
    DELETE FROM iceberg_catalog.db.tbl WHERE id = 1;
    
    -- 再次查询确认删除
    SELECT * FROM iceberg_catalog.db.tbl;
    
    -- 创建分区表
    CREATE TABLE iceberg_catalog.db.part_tbl (
        id BIGINT,
        data STRING,
        category STRING,
        ts TIMESTAMP
    )
    USING iceberg
    PARTITIONED BY (bucket(16, id), days(ts), category);
    
    -- 插入分区表数据
    INSERT INTO iceberg_catalog.db.part_tbl VALUES
    (100, 'Data1', 'A', to_timestamp('2025-01-01 12:00:00')),
    (200, 'Data2', 'B', to_timestamp('2025-01-02 14:00:00')),
    (300, 'Data3', 'A', to_timestamp('2025-01-01 15:00:00')),
    (400, 'Data4', 'C', to_timestamp('2025-01-03 10:00:00'));
    
    
    -- 查询分区表所有数据
    SELECT * FROM iceberg_catalog.db.part_tbl;
    
    -- 查询 bucket(16, id) = 0 的数据
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE bucket(16, id) = 0;
    
    -- 查询 day(ts) = '2025-01-01' 的数据
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE days(ts) = '2025-01-01';
    
    -- 查询某个 category 的数据
    SELECT * FROM iceberg_catalog.db.part_tbl WHERE category = 'A';
    
    -- 多条件组合查询(bucket + day + category)
    SELECT * FROM iceberg_catalog.db.part_tbl 
    WHERE bucket(16, id) = 0 
      AND days(ts) = '2025-01-01'
      AND category = 'A';
    
    -- 聚合统计每个分类的数据数量
    SELECT category, COUNT(*) AS count 
    FROM iceberg_catalog.db.part_tbl 
    GROUP BY category;
    
    -- 删除数据库(谨慎操作),删除前需确保该dbtbl为空
    -- DROP DATABASE iceberg_catalog.db;