Iceberg是一种开放的数据湖表格式。您可以借助Iceberg快速地在HDFS或者阿里云OSS上构建自己的数据湖存储服务。本文为您介绍如何在EMR Serverless Spark中实现Iceberg表的读取与写入操作。
前提条件
已创建工作空间,详情请参见创建工作空间。
操作流程
SparkSQL与Notebook均支持对Iceberg表的读写操作。本文将以SparkSQL任务为例进行介绍。
步骤一:创建会话资源
进入会话管理页面。
在左侧导航栏,选择
。在Spark页面,单击目标工作空间名称。
在EMR Serverless Spark页面,单击左侧导航栏中的会话管理。
在SQL会话页面,单击创建SQL会话。
在创建SQL会话页面的Spark配置区域,配置以下信息,单击创建。详情请参见管理SQL会话。
Spark对Iceberg的读写基于Catalog,您可以根据具体场景进行选择。
Catalog类型介绍
类型
描述
Iceberg Catalog
用于管理Iceberg格式的元数据,只能用于查询和写入Iceberg表。
支持自定义元数据类型(DLF 1.0、Hive MetaStore、FileSystem)。
访问Iceberg表时应使用格式
<catalogName>.<数据库名>.<表名>
。重要本文配置中的
<catalogName>
为Catalog名称,您可以自定义。如无特殊需求时建议保持默认Catalog名称iceberg
。
spark_catalog
元数据为当前工作空间默认的Catalog,可以用于查询Iceberg表和非Iceberg表。
元数据是当前工作空间的默认Catalog。
如果您希望将默认Catalog修改为外部的Hive Metastore,可以参见EMR Serverless Spark连接外部Hive Metastore。
访问表时可直接使用格式
<数据库名>.<表名>
。
Catalog配置
使用Iceberg Catalog
DLF 1.0
元数据保存在DLF 1.0中。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.catalog-impl org.apache.iceberg.aliyun.dlf.hive.DlfCatalog
Hive MetaStore
元数据保存在指定的Hive MetaStore中。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.catalog-impl org.apache.iceberg.hive.HiveCatalog spark.sql.catalog.<catalogName>.uri thrift://<yourHMSUri>:<port>
参数
说明
thrift://<yourHMSUri>:<port>
Hive MetaStore的URI。格式为
thrift://<Hive metastore的IP地址>:9083
。<Hive metastore的IP地址>
为HMS服务的内网IP地址。如果您需要指定外部Metastore服务,请参见EMR Serverless Spark连接外部Hive Metastore。FileSystem
元数据保存在文件系统中。
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.<catalogName> org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.<catalogName>.type hadoop spark.sql.catalog.<catalogName>.warehouse oss://<yourBucketName>/warehouse
使用spark_catalog
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.spark_catalog org.apache.iceberg.spark.SparkSessionCatalog
单击新建SQL会话实例操作列的启动。
步骤二:读写Iceberg表
进入SQL开发页面。
在EMR Serverless Spark页面,单击左侧导航栏中的数据开发。
在开发目录页签下,单击
图标。
在新建对话框中,输入名称(例如users_task),类型使用默认的
,然后单击确定。拷贝如下代码到新增的SparkSQL页签(users_task)中。
CREATE DATABASE IF NOT EXISTS iceberg.ss_iceberg_db; CREATE TABLE iceberg.ss_iceberg_db.iceberg_tbl (id INT, name STRING) USING iceberg; INSERT INTO iceberg.ss_iceberg_db.iceberg_tbl VALUES (1, "a"), (2, "b"); SELECT id, name FROM iceberg.ss_iceberg_db.iceberg_tbl ORDER BY id;
当您需要进行删除操作时,可以删除相应的表和数据库信息。
DROP TABLE iceberg.ss_iceberg_db.iceberg_tbl; DROP DATABASE iceberg.ss_iceberg_db;
在数据库下拉列表中选择一个数据库,在会话下拉列表中选择刚刚创建的SQL会话实例。
单击运行,执行任务。返回信息如下所示。
相关文档
SQL任务和任务编排完整的开发流程示例,请参见SparkSQL开发快速入门。
更多Iceberg相关用法和配置,请参见Apache Iceberg。
创建SQL会话资源的具体操作,请参见管理SQL会话。
创建Notebook会话资源的具体操作,请参见管理Notebook会话。