本文介绍DLA Serverless Spark如何提交Spark Streaming作业以及Spark Streaming作业重试的最佳实践。

前提条件

在DLA Serverless Spark中运行Spark Streaming作业前,您需要完成以下准备工作:

创建Spark Streaming作业

以DLA Serverless Spark访问用户VPC网络中的阿里云消息队列Kafka为例。

  1. 登录Data Lake Analytics管理控制台
  2. 在页面左上角,选择Kafka服务所在地域。
  3. 单击左侧导航栏中的Serverless Spark > 作业管理
  4. 作业编辑页面,单击创建作业模板
  5. 创建作业模板页面,按照页面提示进行参数配置后,单击确定创建Spark作业。3
  6. 单击Spark作业名,在Spark作业编辑框中输入Spark Streaming作业内容。
    {
        "file": "oss://path/to/xxx.jar",
        "name": "Kafka",
        "className": "com.alibabacloud.cwchan.SparkKafkaSub",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs",
            "spark.dla.eni.vswitch.id": "{vswitch-id}",
            "spark.dla.eni.security.group.id": "{security-group-id}",
            "spark.dla.eni.enable": "true"
        }
    }
    编译打包过程中,需要打包Spark-Kafka的相关依赖,如下所示:
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.2</version>
    </dependency>
                        
    注意 如果是使用RAM用户提交作业,需要配置RAM用户权限,详情请参见快速配置子账号权限

示例代码

以下是连接Kafka的核心代码片段:
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("SparkKafkaSub")
    val sparkSessoin = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()
    val df = sparkSessoin
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topicName)
      .option("group.id", groupId)
      .load()
    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .outputMode("append")
      .format("console")
      .start()
    query.awaitTermination()
说明 该代码消费Kafka的消息,并打印Key-Value对。

Spark Streaming作业重试的最佳实践

对于流应用,如果想要配置作业失败之后进行自动重试,您可以在conf中配置如下参数。
#代表作业的尝试次数,默认为1代表不重试,5代表作业失败将会尝试五次
spark.dla.job.maxAttempts  5

#代表作业尝试的计数有效时间,1h代表有效时间为一个小时,超时将不会计入作业尝试总数,该值默认为-1,代表作业失败计数永不超时
spark.dla.job.attemptFailuresValidityInterval 1h 

#以上两个参数组合起来的含义是:在任意1个1小时的时间区间内,作业尝试次数超过五次后,作业将停止尝试,否则作业会继续进行尝试
说明 关于作业重试功能的具体配置说明,请参见作业重试相关参数

对于流应用而言,在重新提交作业前,通常希望能够从上一次中断消费的位置继续消费。下面介绍如何才能使得作业在重试的时候从上一次停止的消费位点继续消费。

  • Spark Structured Streaming(推荐)

    对于Structured Streaming,您只需要在启动query的时候,指定checkpoint location即可,location指定为OSS路径。示例代码如下:
        val query = df
          .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
          .as[(String, String)]
          .writeStream
          .format("csv")
          .option("path", outputPath)
          .outputMode("Append")
          .option("checkpointLocation", "oss://path/to/checkpoint/dir")
          .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
          .start()
    说明 关于Spark Structured Streaming的checkpoint的更多内容,请参见Structured Streaming Programming Guide
  • Spark Streaming(DStreams)

    对于DStreams,您需要按照一定的编程模式,才可以正确的从checkpoint处恢复程序,示例代码如下:
    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
      val ssc = new StreamingContext(...)   // new context
      val lines = ssc.socketTextStream(...) // create DStreams
      ...
      ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
      ssc
    }
    
    // Get StreamingContext from checkpoint data or create a new one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
    
    // Do additional setup on context that needs to be done,
    // irrespective of whether it is being started or restarted
    context. ...
    
    // Start the context
    context.start()
    context.awaitTermination()
    说明 关于DStreams的checkpoint的更多内容,请参见Spark Streaming Programming Guide

Spark Streaming作业的监控与报警

对于流作业而言,DLA Spark默认为作业开启了监控与报警功能。
  • 您可以通过监控查看流作业的运行状态,例如作业处理延迟,数据处理速率等,具体请参见查看Spark监控
  • 您可以通过配置报警规则实现对流作业的实时报警通知,具体请参见管理报警