本文通过示例为您介绍如何在已经创建好的E-MapReduce(简称EMR)集群中使用Iceberg。
背景信息
本文以数据湖元数据为例,详细配置请参见数据湖元数据配置。
前提条件
已在E-MapReduce控制台上,创建Hadoop的EMR-5.3.0及后续版本的集群,详情请参见创建集群。
使用限制
由于Iceberg的Spark SQL Extensions不适用于Spark 2.4,因此对于EMR-3.38.x及后续版本的Spark只能使用DataFrame API操作Iceberg。本文介绍EMR-5.3.0及后续版本以Spark SQL方式操作Iceberg。
操作步骤
使用SSH方式登录到集群,详情信息请参见登录集群。
执行以下命令,通过Spark SQL读写Iceberg配置。
在Spark SQL中操作Iceberg,首先需要配置Catalog。Catalog的配置以spark.sql.catalog.<catalog_name>作为前缀。
以下是在Spark SQL中使用数据湖元数据的配置,集群版本不同默认的Catalog名称不同,需要配置的参数也不同,具体请参见数据湖元数据配置。
EMR-5.6.0及后续版本
spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.iceberg.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \
EMR-5.5.x版本
spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.dlf=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.dlf.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \ --conf spark.sql.catalog.dlf.warehouse=<yourOSSWarehousePath> \
说明spark.sql.catalog.dlf.warehouse参数可以不设置。如果不设置spark.sql.catalog.dlf.warehouse,则使用默认的warehouse路径。
EMR-5.3.x~EMR-5.4.x版本(包含)
spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.dlf_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.dlf_catalog.catalog-impl=org.apache.iceberg.aliyun.dlf.DlfCatalog \ --conf spark.sql.catalog.dlf_catalog.io-impl=org.apache.iceberg.hadoop.HadoopFileIO \ --conf spark.sql.catalog.dlf_catalog.oss.endpoint=<yourOSSEndpoint> \ --conf spark.sql.catalog.dlf_catalog.warehouse=<yourOSSWarehousePath> \ --conf spark.sql.catalog.dlf_catalog.access.key.id=<yourAccessKeyId> \ --conf spark.sql.catalog.dlf_catalog.access.key.secret=<yourAccessKeySecret> \ --conf spark.sql.catalog.dlf_catalog.dlf.catalog-id=<yourCatalogId> \ --conf spark.sql.catalog.dlf_catalog.dlf.endpoint=<yourDLFEndpoint> \ --conf spark.sql.catalog.dlf_catalog.dlf.region-id=<yourDLFRegionId>
当返回信息中包含如下信息时,表示已进入spark-sql命令行。
spark-sql>
基础操作。
重要以下示例中的
<yourCatalogName>
为您Catalog的名称,请您根据实际信息修改Catalog名称。创建库
CREATE DATABASE IF NOT EXISTS <yourCatalogName>.iceberg_db;
创建表
CREATE TABLE IF NOT EXISTS <yourCatalogName>.iceberg_db.sample( id BIGINT COMMENT 'unique id', data STRING ) USING iceberg;
Iceberg表支持COMMENT、PARTITIONED BY、LOCATION和TBLPROPERTIES等语法。如果通过TBLPROPERTIES设置表级别属性,代码示例如下。
CREATE TABLE IF NOT EXISTS <yourCatalogName>.iceberg_db.sample( id BIGINT COMMENT 'unique id', data STRING ) USING iceberg TBLPROPERTIES ( 'write.format.default'='parquet' );
写入数据
INSERT INTO <yourCatalogName>.iceberg_db.sample VALUES (1, 'a'), (2, 'b'), (3, 'c');
查询数据
SELECT * FROM <yourCatalogName>.iceberg_db.sample; SELECT count(1) AS count, data FROM <yourCatalogName>.iceberg_db.sample GROUP BY data;
更新数据
UPDATE <yourCatalogName>.iceberg_db.sample SET data = 'x' WHERE id = 3;
删除数据
DELETE FROM <yourCatalogName>.iceberg_db.sample WHERE id = 3;
- 本页导读 (1)