基础使用

本文通过示例为您介绍如何在已经创建好的E-MapReduce(简称EMR)集群中使用Iceberg。

背景信息

本文以数据湖元数据为例,详细配置请参见数据湖元数据配置

前提条件

已在E-MapReduce控制台上,创建HadoopEMR-5.3.0及后续版本的集群,详情请参见创建集群

使用限制

由于IcebergSpark SQL Extensions不适用于Spark 2.4,因此对于EMR-3.38.x及后续版本的Spark只能使用DataFrame API操作Iceberg。本文介绍EMR-5.3.0及后续版本以Spark SQL方式操作Iceberg。

操作步骤

  1. 使用SSH方式登录到集群,详情信息请参见登录集群

  2. 执行以下命令,通过Spark SQL读写Iceberg配置。

    Spark SQL中操作Iceberg,首先需要配置Catalog。以下是在Spark SQL中使用数据湖元数据的配置,集群版本不同默认的Catalog名称不同,需要配置的参数也不同,具体请参见数据湖元数据配置

    说明

    Catalog的配置以spark.sql.catalog.<catalog_name>作为前缀,其中<catalog_name>Catalog名称。

    • EMR-5.6.0及后续版本

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.iceberg.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \
    • EMR-5.5.x版本

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.dlf=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.dlf.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \
       --conf spark.sql.catalog.dlf.warehouse=<yourOSSWarehousePath> \
      说明

      spark.sql.catalog.dlf.warehouse参数可以不设置。如果不设置spark.sql.catalog.dlf.warehouse,则使用默认的warehouse路径。

    • EMR-5.3.x~EMR-5.4.x版本(包含)

      这些版本需要配置AccessKey信息。请确保代码运行环境设置了环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具体配置方法,请参见Linux、macOSWindows系统配置环境变量

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.dlf_catalog=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.dlf_catalog.catalog-impl=org.apache.iceberg.aliyun.dlf.DlfCatalog \
       --conf spark.sql.catalog.dlf_catalog.io-impl=org.apache.iceberg.hadoop.HadoopFileIO \
       --conf spark.sql.catalog.dlf_catalog.oss.endpoint=<yourOSSEndpoint> \
       --conf spark.sql.catalog.dlf_catalog.warehouse=<yourOSSWarehousePath> \
       --conf spark.sql.catalog.dlf_catalog.access.key.id=<ALIBABA_CLOUD_ACCESS_KEY_ID> \
       --conf spark.sql.catalog.dlf_catalog.access.key.secret=<ALIBABA_CLOUD_ACCESS_KEY_SECRET> \
       --conf spark.sql.catalog.dlf_catalog.dlf.catalog-id=<yourCatalogId> \
       --conf spark.sql.catalog.dlf_catalog.dlf.endpoint=<yourDLFEndpoint> \
       --conf spark.sql.catalog.dlf_catalog.dlf.region-id=<yourDLFRegionId>

    当返回信息中包含如下信息时,表示已进入spark-sql命令行。

    spark-sql>
  3. 基础操作。

    重要

    以下示例中的<catalog_name>为您Catalog的名称,例如,EMR-5.6.0及后续版本的Catalog的名称为iceberg,其余版本请参见步骤2中配置Catalog时的信息,配置都是以spark.sql.catalog.<catalog_name>作为前缀的。

    • 创建库

      CREATE DATABASE IF NOT EXISTS <catalog_name>.iceberg_db;
    • 创建表

      CREATE TABLE IF NOT EXISTS <catalog_name>.iceberg_db.sample(
          id BIGINT COMMENT 'unique id', 
          data STRING
      ) 
      USING iceberg;

      Iceberg表支持COMMENT、PARTITIONED BY、LOCATIONTBLPROPERTIES等语法。如果通过TBLPROPERTIES设置表级别属性,代码示例如下。

      CREATE TABLE IF NOT EXISTS <catalog_name>.iceberg_db.sample(
          id BIGINT COMMENT 'unique id', 
          data STRING
      ) 
      USING iceberg
      TBLPROPERTIES (
          'write.format.default'='parquet'
      );
    • 写入数据

      INSERT INTO <catalog_name>.iceberg_db.sample VALUES (1, 'a'), (2, 'b'), (3, 'c');
    • 查询数据

      SELECT * FROM <catalog_name>.iceberg_db.sample;
      SELECT count(1) AS count, data FROM <catalog_name>.iceberg_db.sample GROUP BY data;
    • 更新数据

      UPDATE <catalog_name>.iceberg_db.sample SET data = 'x' WHERE id = 3;
    • 删除数据

      DELETE FROM <catalog_name>.iceberg_db.sample WHERE id = 3;