访问Kafka数据源
本文介绍如何使用云原生数据仓库 AnalyticDB MySQL 版Spark通过ENI网络访问消息队列Kafka版。
前提条件
AnalyticDB for MySQL集群的产品系列为企业版或湖仓版。
已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组。
已创建AnalyticDB for MySQL集群的数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定或解绑RAM用户与数据库账号。
Kafka实例与AnalyticDB for MySQL集群位于同一地域。具体操作,请参见公网和VPC接入。
已在Kafka实例中创建Topic和Group。具体操作,请参见创建资源。
已开通OSS服务,并创建与AnalyticDB for MySQL集群位于相同地域的存储空间。具体操作,请参见开通OSS服务和创建存储空间。
准备工作
在云消息队列 Kafka 版控制台的实例详情页面,获取Kafka实例的交换机ID。
在ECS管理控制台的安全组页面,搜索Kafka实例ID来获取安全组ID。
在云消息队列 Kafka 版控制台的白名单管理页面,查看Kafka实例的白名单是否为交换机ID的网段。
操作步骤
分别下载与Kafka和AnalyticDB for MySQL Spark实例版本对应的JAR包。下载链接,请参见Kafka-clients和Spark-sql-kafka-0-10。
在pom.xml文件的dependencies中添加依赖项。
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.2.2</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.1</version> </dependency>
编写
Spark Streaming
示例程序来读取Kafka中的消息,并进行编译打包。本文生成的JAR包名称为spark-example.jar
。package com.aliyun.spark.streaming import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object SparkKafka { def main(args: Array[String]): Unit = { if(args.length < 3){ System.err.println( """ |args0: groupId |args1: topicName |args2: bootstrapServers |""".stripMargin) System.exit(1) } val groupId = args(0) val topicName = args(1) val bootstrapServers = args(2) val sparkConf: SparkConf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .setAppName("SparkKafkaSub") sparkConf.registerKryoClasses(Array(classOf[ConsumerRecord[_,_]])) val sparkSession = SparkSession .builder() .config(sparkConf) .getOrCreate() val df = sparkSession .readStream .format("kafka") //Kafka实例的域名接入点。 .option("kafka.bootstrap.servers", alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092) //Kafka实例的Topic名称。 .option("subscribe", kafka_test) //Kafka实例的Group ID。 .option("group.id", kafka_groupId) .load() val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .writeStream .outputMode("append") .format("console") .start() query.awaitTermination() } }
将下载的JAR包和
Spark Streaming
示例程序上传至OSS。具体操作,请参见上传文件。进入Spark开发编辑器。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击 。
在编辑器窗口上方,选择Job型资源组和Spark作业类型。本文以Batch类型为例。
在编辑器中执行以下作业内容。
{ "args": [ "kafka_groupId", "kafka_test", "alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092" ], "file": "oss://<bucket_name>/spark-example.jar", "jars": "oss://<bucket_name>/kafka-clients-2.8.1.jar,oss://<bucket_name>/spark-sql-kafka-0-10_2.12-3.2.0.jar", "name": "Kafka Example", "className": "com.aliyun.spark.streaming.SparkKafka", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****" } }
参数说明如下。
参数名称
参数说明
args
Spark作业传入的参数,多个参数之间以英文逗号(,)分隔。
file
Spark作业主文件的存储位置。主文件是入口类所在的JAR包或者Python的入口执行文件。
说明Spark作业主文件目前只支持存储在OSS中。
jars
Spark作业依赖的JAR包,多个JAR包之间以英文逗号(,)分隔。
name
Spark作业名称。
className
Java或者Scala程序入口类。Python不需要指定入口类。
spark.adb.eni.enabled
是否开启ENI访问。使用企业版或湖仓版Spark访问kafka数据源时,需要开启ENI访问。
spark.adb.eni.vswitchId
准备工作中获取的交换机ID。
spark.adb.eni.securityGroupId
准备工作中获取的安全组ID。
conf
其他参数与开源Spark中的配置项基本一致,参数格式为
key:value
形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明。单击立即执行。