本文介绍DLA Serverless Spark如何提交Spark Streaming作业以及Spark Streaming作业重试的最佳实践。
前提条件
在DLA Serverless Spark中运行Spark Streaming作业前,您需要完成以下准备工作:
- 授权DLA Serverless Spark访问用户VPC网络的权限。具体操请参见配置数据源网络。
- 在Data Lake Analytics管理控制台的虚拟集群管理页面中,确认您的虚拟集群使用的版本是
spark_2_4_5-dla_1_2_0
及以上。
创建Spark Streaming作业
以DLA Serverless Spark访问用户VPC网络中的阿里云消息队列Kafka为例。
- 登录Data Lake Analytics管理控制台。
- 在页面左上角,选择Kafka服务所在地域。
- 单击左侧导航栏中的 。
- 在作业编辑页面,单击创建作业模板。
- 在创建作业模板页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
- 单击Spark作业名,在Spark作业编辑框中输入Spark Streaming作业内容。
编译打包过程中,需要打包{ "file": "oss://path/to/xxx.jar", "name": "Kafka", "className": "com.alibabacloud.cwchan.SparkKafkaSub", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 5, "spark.executor.resourceSpec": "medium", "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs", "spark.dla.eni.vswitch.id": "{vswitch-id}", "spark.dla.eni.security.group.id": "{security-group-id}", "spark.dla.eni.enable": "true" } }
Spark-Kafka
的相关依赖,如下所示:<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.2</version> </dependency>
注意 如果是使用RAM用户提交作业,需要配置RAM用户权限,详情请参见快速配置子账号权限。
示例代码
以下是连接Kafka的核心代码片段:
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkKafkaSub")
val sparkSessoin = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val df = sparkSessoin
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topicName)
.option("group.id", groupId)
.load()
val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
说明 该代码消费Kafka的消息,并打印Key-Value对。
Spark Streaming作业重试的最佳实践
对于流应用,如果想要配置作业失败之后进行自动重试,您可以在conf中配置如下参数。
#代表作业的尝试次数,默认为1代表不重试,5代表作业失败将会尝试五次
spark.dla.job.maxAttempts 5
#代表作业尝试的计数有效时间,1h代表有效时间为一个小时,超时将不会计入作业尝试总数,该值默认为-1,代表作业失败计数永不超时
spark.dla.job.attemptFailuresValidityInterval 1h
#以上两个参数组合起来的含义是:在任意1个1小时的时间区间内,作业尝试次数超过五次后,作业将停止尝试,否则作业会继续进行尝试
说明 关于作业重试功能的具体配置说明,请参见作业重试相关参数。
对于流应用而言,在重新提交作业前,通常希望能够从上一次中断消费的位置继续消费。下面介绍如何才能使得作业在重试的时候从上一次停止的消费位点继续消费。
-
Spark Structured Streaming(推荐)
对于Structured Streaming,您只需要在启动query的时候,指定checkpoint location即可,location指定为OSS路径。示例代码如下:val query = df .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] .writeStream .format("csv") .option("path", outputPath) .outputMode("Append") .option("checkpointLocation", "oss://path/to/checkpoint/dir") .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) .start()
说明 关于Spark Structured Streaming的checkpoint的更多内容,请参见Structured Streaming Programming Guide。 -
Spark Streaming(DStreams)
对于DStreams,您需要按照一定的编程模式,才可以正确的从checkpoint处恢复程序,示例代码如下:// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start() context.awaitTermination()
说明 关于DStreams的checkpoint的更多内容,请参见Spark Streaming Programming Guide。