本文为您介绍如何通过Spark Structured Streaming流式写入Iceberg表。

前提条件

  • 已在E-MapReduce控制台上,创建Hadoop集群,详情请参见创建集群
    说明 此文档仅适用于EMR-3.38.0及后续版本与EMR-5.4.0及后续版本的Hadoop集群。
  • 已在E-MapReduce控制台上,创建Kafka集群,详情请参见创建集群

使用限制

Hadoop集群和Kafka集群需要在同一VPC和交换机下,不支持跨VPC。

流式写入方式

Spark Structured Streaming通过DataStreamWriter接口流式写数据到Iceberg表,代码如下。
val tableIdentifier: String = ...
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("checkpointLocation", checkpointPath)
    .start()
说明 代码中的tableIdentifier是元数据表名或者表路径。流式写入支持以下两种方式:
  • append:追加每个批次的数据到Iceberg表,相当于insert into。
  • complete:使用最新批次的数据完全覆盖Iceberg,相当于insert overwrite。

示例

本示例是从上游Kafka中读取数据,写入Iceberg表,打包放到EMR集群上通过spark-submit提交执行。

  1. 通过Kafka脚本创建测试使用的topic并准备测试数据。
    1. 使用SSH方式登录到Kafka集群,详情信息请参见登录集群
    2. 执行以下命令,创建名为iceberg_test的topic。
      kafka-topics.sh --zookeeper emr-header-1:2181,emr-worker-1:2181,emr-worker-2:2181 --topic iceberg_test --partitions 3 --replication-factor 2 --create
    3. 执行以下命令,生产测试数据。
      kafka-console-producer.sh --broker-list emr-header-1:9092,emr-worker-1:9092,emr-worker-2:9092 --topic iceberg_test
  2. 通过Spark SQL创建测试使用的数据库iceberg_db和表iceberg_table,详细操作请参见基础使用
  3. 编写Spark代码。
    以Scala版代码为例,代码示例如下。
    注意 示例中数据湖元数据的配置参数,根据集群版本不同,配置的参数不同,Catalog名称也不同。本示例以EMR-5.3.0版本为列,其中dlf_catalog为Catalog名称。具体版本对应的配置请参见数据湖元数据配置
    def main(args: Array[String]): Unit = {
    
      // 配置使用数据湖元数据。
      val sparkConf = new SparkConf()
      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
    
      val spark = SparkSession
        .builder()
        .config(sparkConf)
        .appName("StructuredSinkIceberg")
        .getOrCreate()
    
      val checkpointPath = "oss://mybucket/tmp/iceberg_table_checkpoint"
      val bootstrapServers = "192.168.XX.XX:9092"
      val topic = "iceberg_test"
    
      // 从上游Kafka读取数据
      val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topic)
        .load()
    
      import spark.implicits._
      val resDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        .as[(String, String)].toDF("id", "data")
    
      // 流式写入Iceberg表
      val query = resDF.writeStream
        .format("iceberg")
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
        .option("path", "dlf_catalog.iceberg_db.iceberg_table")
        .option("checkpointLocation", checkpointPath)
        .start()
    
      query.awaitTermination()
    }
    请您根据集群的实际情况,修改如下参数。
    参数 描述
    checkpointPath Spark流式写数据的Checkpoint路径。
    bootstrapServers Kafka集群中任一Kafka Broker组件的内网IP地址。
    topic Topic名称。
  4. 打包程序并部署到EMR集群。
    1. 检查编译Scala代码的Maven插件,可以在pom.xml中配置如下插件。
      <build>
          <plugins>
              <!-- the Maven Scala plugin will compile Scala source files -->
              <plugin>
                  <groupId>net.alchim31.maven</groupId>
                  <artifactId>scala-maven-plugin</artifactId>
                  <version>3.2.2</version>
                  <executions>
                      <execution>
                          <goals>
                              <goal>compile</goal>
                              <goal>testCompile</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>
    2. 本地调试完成后,通过以下命令打包。
      mvn clean install
    3. 使用SSH方式登录到集群,详情信息请参见登录集群
    4. 上传JAR包至EMR集群。
      本示例是上传到EMR集群的根目录下。
  5. 提交运行Spark作业。
    1. 执行以下命令,通过spark-submit提交Spark作业。
      spark-submit \
       --master yarn \
       --deploy-mode cluster \
       --driver-memory 1g \
       --executor-cores 2 \
       --executor-memory 3g \
       --num-executors 1 \
       --class com.aliyun.iceberg.StructuredSinkIceberg \
       iceberg-demos.jar
      说明 iceberg-demos.jar为您打包好的JAR包。--class和JAR包请根据您实际信息修改。
    2. 通过Spark SQL查询数据的变化,详细操作请参见基础使用