本文为您介绍如何通过Spark Structured Streaming流式写入Iceberg表。
前提条件
- 已在E-MapReduce控制台上,创建Hadoop集群,详情请参见创建集群。
说明 此文档仅适用于EMR-3.38.0及后续版本与EMR-5.4.0及后续版本的Hadoop集群。
- 已在E-MapReduce控制台上,创建Kafka集群,详情请参见创建集群。
使用限制
Hadoop集群和Kafka集群需要在同一VPC和交换机下,不支持跨VPC。
流式写入方式
Spark Structured Streaming通过DataStreamWriter接口流式写数据到Iceberg表,代码如下。
val tableIdentifier: String = ...
data.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", tableIdentifier)
.option("checkpointLocation", checkpointPath)
.start()
说明 代码中的
tableIdentifier是元数据表名或者表路径。流式写入支持以下两种方式:
- append:追加每个批次的数据到Iceberg表,相当于insert into。
- complete:使用最新批次的数据完全覆盖Iceberg,相当于insert overwrite。
示例
本示例是从上游Kafka中读取数据,写入Iceberg表,打包放到EMR集群上通过spark-submit提交执行。
- 通过Kafka脚本创建测试使用的topic并准备测试数据。
- 使用SSH方式登录到Kafka集群,详情信息请参见登录集群。
- 执行以下命令,创建名为iceberg_test的topic。
kafka-topics.sh --zookeeper emr-header-1:2181,emr-worker-1:2181,emr-worker-2:2181 --topic iceberg_test --partitions 3 --replication-factor 2 --create
- 执行以下命令,生产测试数据。
kafka-console-producer.sh --broker-list emr-header-1:9092,emr-worker-1:9092,emr-worker-2:9092 --topic iceberg_test
- 通过Spark SQL创建测试使用的数据库iceberg_db和表iceberg_table,详细操作请参见基础使用。
- 编写Spark代码。
以Scala版代码为例,代码示例如下。
注意 示例中数据湖元数据的配置参数,根据集群版本不同,配置的参数不同,Catalog名称也不同。本示例以EMR-5.3.0版本为列,其中
dlf_catalog
为Catalog名称。具体版本对应的配置请参见
数据湖元数据配置。
def main(args: Array[String]): Unit = {
// 配置使用数据湖元数据。
val sparkConf = new SparkConf()
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>")
sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>")
sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
val spark = SparkSession
.builder()
.config(sparkConf)
.appName("StructuredSinkIceberg")
.getOrCreate()
val checkpointPath = "oss://mybucket/tmp/iceberg_table_checkpoint"
val bootstrapServers = "192.168.XX.XX:9092"
val topic = "iceberg_test"
// 从上游Kafka读取数据
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.load()
import spark.implicits._
val resDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)].toDF("id", "data")
// 流式写入Iceberg表
val query = resDF.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", "dlf_catalog.iceberg_db.iceberg_table")
.option("checkpointLocation", checkpointPath)
.start()
query.awaitTermination()
}
请您根据集群的实际情况,修改如下参数。
参数 |
描述 |
checkpointPath |
Spark流式写数据的Checkpoint路径。 |
bootstrapServers |
Kafka集群中任一Kafka Broker组件的内网IP地址。 |
topic |
Topic名称。 |
- 打包程序并部署到EMR集群。
- 检查编译Scala代码的Maven插件,可以在pom.xml中配置如下插件。
<build>
<plugins>
<!-- the Maven Scala plugin will compile Scala source files -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
- 本地调试完成后,通过以下命令打包。
- 使用SSH方式登录到集群,详情信息请参见登录集群。
- 上传JAR包至EMR集群。
本示例是上传到EMR集群的根目录下。
- 提交运行Spark作业。
- 执行以下命令,通过spark-submit提交Spark作业。
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-cores 2 \
--executor-memory 3g \
--num-executors 1 \
--class com.aliyun.iceberg.StructuredSinkIceberg \
iceberg-demos.jar
说明 iceberg-demos.jar为您打包好的JAR包。--class和JAR包请根据您实际信息修改。
- 通过Spark SQL查询数据的变化,详细操作请参见基础使用。