本文介绍如何通过DLA Serverless Spark访问消息队列Kafka版。
前提条件
在DLA Serverless Spark中访问消息队列Kafka版前,您需要正确配置Kafka数据源网络。具体如下:
- 授权DLA Serverless Spark访问用户VPC的功能,具体内容,请参见配置数据源网络。
- 访问用户VPC所需要的交换机可使用Kafka服务中的交换机。在Kafka服务控制台的实例详情中您可以找到Kafka服务中的交换机。
- 访问用户VPC所需要的安全组可以使用Kafka服务中的安全组,在安全组列表页面,按照安全组名称搜索Kafka消息队列的实例ID即可找到对应的安全组。
- 往kafka白名单中添加第一步中选中的VswitchID的网段。
新版Kafka界面加白名单
旧版Kafka UI加白名单
- 访问用户VPC所需要的交换机可使用Kafka服务中的交换机。在Kafka服务控制台的实例详情中您可以找到Kafka服务中的交换机。
操作步骤
- 登录Data Lake Analytics管理控制台。
- 在页面左上角,选择Kafka服务所在地域。
- 单击左侧导航栏中的 。
- 在作业编辑页面,单击创建作业。
- 在创建作业模板页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
- 单击Spark作业名,在Spark作业编辑框中输入Spark Streaming任务内容。
编译打包过程中需要打包{ "file": "oss://path/to/xxx.jar", "name": "Kafka", "className": "com.alibabacloud.cwchan.SparkKafkaSub", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 5, "spark.executor.resourceSpec": "medium", "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs", "spark.dla.eni.vswitch.id": "{vswitch-id}", "spark.dla.eni.security.group.id": "{security-group-id}", "spark.dla.eni.enable": "true" } }
Spark-Kafka
的相关依赖,如下所示。<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.2</version> </dependency>
注意 如果是使用子账号提交作业,需要配置子账号权限,请参见快速配置子账号权限。
示例代码
以下是连接Kafka的核心代码片段,完整代码参考。
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkKafkaSub")
val sparkSessoin = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val df = sparkSessoin
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topicName)
.option("group.id", groupId)
.load()
val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
说明 该代码消费Kafka的消息,并打印Key-Value对。