本文介绍DLA Serverless Spark如何提交Spark Streaming作业。
背景信息
文中示例将使用DLA Serverless Spar访问用户VPC中的阿里云消息队列Kafka
前提条件
操作步骤
- 登录Data Lake Analytics管理控制台
- 在页面左上角,选择Kafka服务所在地域。
- 单击左侧导航栏中的
- 在作业编辑页面,单击创建作业。
- 在创建作业页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
- 单击Spark作业名,在Spark作业编辑框中输入Spark Streaming任务内容。
编译打包过程中,需要打包{ "file": "oss://path/to/xxx.jar", "name": "Kafka", "className": "com.alibabacloud.cwchan.SparkKafkaSub", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 5, "spark.executor.resourceSpec": "medium", "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs", "spark.dla.eni.vswitch.id": "{vswitch-id}", "spark.dla.eni.security.group.id": "{security-group-id}", "spark.dla.eni.enable": "true" } }
Spark-Kafka
的相关依赖,如下所示<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.2</version> </dependency>
注意 如果是使用子账号提交作业,需要配置子账号权限,详情请参见快速权限配置(精简版)。
示例代码
以下是连接Kafka的核心代码片段:
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkKafkaSub")
val sparkSessoin = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val df = sparkSessoin
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topicName)
.option("group.id", groupId)
.load()
val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
说明 该代码消费Kafka的消息,并打印Key-Value对。
在文档使用中是否遇到以下问题
更多建议
匿名提交