本文为您介绍如何通过Spark Structured Streaming流式写入Iceberg表。
前提条件
使用限制
创建的DataLake集群或Custom集群需要与Kafka集群在同一VPC和交换机下,不支持跨VPC。
流式写入方式
Spark Structured Streaming通过DataStreamWriter接口流式写数据到Iceberg表,代码如下。
val tableIdentifier: String = ...
data.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", tableIdentifier)
.option("checkpointLocation", checkpointPath)
.start()
代码中的tableIdentifier是元数据表名或者表路径。流式写入支持以下两种方式:
append:追加每个批次的数据到Iceberg表,相当于insert into。
complete:使用最新批次的数据完全覆盖Iceberg,相当于insert overwrite。
示例
本示例是从上游Kafka中读取数据,写入Iceberg表,打包放到EMR集群上通过spark-submit提交执行。
通过Kafka脚本创建测试使用的topic并准备测试数据。
使用SSH方式登录到Kafka集群,详情信息请参见登录集群。
执行以下命令,创建名为iceberg_test的topic。
kafka-topics.sh --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test --partitions 3 --replication-factor 2 --create
执行以下命令,生产测试数据。
kafka-console-producer.sh --broker-list core-1-1:9092,core-1-2:9092,core-1-3:9092 --topic iceberg_test
通过Spark SQL创建测试使用的数据库iceberg_db和表iceberg_table,详细操作请参见基础使用。
新建Maven项目,引入Spark的依赖和检查编译Scala代码的Maven插件,可以在pom.xml中添加如下配置。
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.1.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.1.2</version> </dependency> </dependencies> <build> <plugins> <!-- the Maven Scala plugin will compile Scala source files --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
编写Spark代码。
以Scala版代码为例,代码示例如下。
重要示例中数据湖元数据的配置参数,根据集群版本不同,配置的参数不同,Catalog名称也不同。本示例以EMR-5.3.0版本为列,其中
dlf_catalog
为Catalog名称。具体版本对应的配置请参见数据湖元数据配置。def main(args: Array[String]): Unit = { // 配置使用数据湖元数据。 val sparkConf = new SparkConf() sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>") val spark = SparkSession .builder() .config(sparkConf) .appName("StructuredSinkIceberg") .getOrCreate() val checkpointPath = "oss://mybucket/tmp/iceberg_table_checkpoint" val bootstrapServers = "192.168.XX.XX:9092" val topic = "iceberg_test" // 从上游Kafka读取数据 val df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("subscribe", topic) .load() val resDF = df.selectExpr("CAST(unbase64(CAST(key AS STRING)) AS STRING) AS strKey", // 假设key是以Base64编码的字符串,先解码为普通字符串 "CAST(value AS STRING) AS data") .select( col("strKey").cast(LongType).alias("id"), // 现在可以安全地将解码后的字符串转换为Long col("data") ) // 流式写入Iceberg表 val query = resDF.writeStream .format("iceberg") .outputMode("append") .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) .option("path", "dlf_catalog.iceberg_db.iceberg_table") .option("checkpointLocation", checkpointPath) .start() query.awaitTermination() }
请您根据集群的实际情况,修改如下参数。
参数
描述
checkpointPath
Spark流式写数据的Checkpoint路径。
bootstrapServers
Kafka集群中任一Kafka Broker组件的内网IP地址。
topic
Topic名称。
打包程序并部署到EMR集群。
本地调试完成后,通过以下命令打包。
mvn clean install
使用SSH方式登录到集群,详情信息请参见登录集群。
上传JAR包至EMR集群。
本示例是上传到EMR集群的根目录下。
提交运行Spark作业。
执行以下命令,通过spark-submit提交Spark作业。
spark-submit \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-cores 2 \ --executor-memory 3g \ --num-executors 1 \ --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<version> \ --class com.aliyun.iceberg.StructuredSinkIceberg \ iceberg-demos.jar
说明应替换为具体的版本号,且版本号需与您的Spark和Kafka版本兼容。
iceberg-demos.jar为您打包好的JAR包。--class和JAR包请根据您实际信息修改。
通过Spark SQL查询数据的变化,详细操作请参见基础使用。