Kafka
更新时间:
本文介绍如何通过DLA Serverless Spark访问消息队列Kafka版。
重要
云原生数据湖分析(DLA)产品已退市,云原生数据仓库 AnalyticDB MySQL 版湖仓版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相关使用文档,请参见访问Kafka数据源。
前提条件
在DLA Serverless Spark中访问消息队列Kafka版前,您需要正确配置Kafka数据源网络。具体如下:
授权DLA Serverless Spark访问用户VPC的功能,具体内容,请参见配置数据源网络。
访问用户VPC所需要的交换机可使用Kafka服务中的交换机。在Kafka服务控制台的实例详情中您可以找到Kafka服务中的交换机。
访问用户VPC所需要的安全组可以使用Kafka服务中的安全组,在安全组列表页面,按照安全组名称搜索Kafka消息队列的实例ID即可找到对应的安全组。
往kafka白名单中添加第一步中选中的VswitchID的网段。
新版Kafka界面加白名单
旧版Kafka UI加白名单
操作步骤
在页面左上角,选择Kafka服务所在地域。
单击左侧导航栏中的
。在作业编辑页面,单击创建作业。
在创建作业模板页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
单击Spark作业名,在Spark作业编辑框中输入Spark Streaming任务内容。
{ "file": "oss://path/to/xxx.jar", "name": "Kafka", "className": "com.alibabacloud.cwchan.SparkKafkaSub", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 5, "spark.executor.resourceSpec": "medium", "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs", "spark.dla.eni.vswitch.id": "{vswitch-id}", "spark.dla.eni.security.group.id": "{security-group-id}", "spark.dla.eni.enable": "true" } }
编译打包过程中需要打包
Spark-Kafka
的相关依赖,如下所示。<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.2</version> </dependency>
重要如果是使用子账号提交作业,需要配置子账号权限,请参见快速配置子账号权限。
示例代码
以下是连接Kafka的核心代码片段,完整代码参考。
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkKafkaSub")
val sparkSessoin = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val df = sparkSessoin
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topicName)
.option("group.id", groupId)
.load()
val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
说明
该代码消费Kafka的消息,并打印Key-Value对。
文档内容是否对您有帮助?