访问Kafka数据源

更新时间:

本文介绍如何使用云原生数据仓库 AnalyticDB MySQL 版Spark通过ENI网络访问消息队列Kafka版。

前提条件

  • AnalyticDB for MySQL集群的产品系列为企业版或湖仓版

  • 已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组

  • 已创建AnalyticDB for MySQL集群的数据库账号。

  • Kafka实例与AnalyticDB for MySQL集群位于同一地域。具体操作,请参见公网和VPC接入

  • 已在Kafka实例中创建Topic和Group。具体操作,请参见创建资源

  • 已开通OSS服务,并创建与AnalyticDB for MySQL集群位于相同地域的存储空间。具体操作,请参见开通OSS服务创建存储空间

准备工作

  1. 云消息队列 Kafka 版控制台实例详情页面,获取Kafka实例的交换机ID。

  2. ECS管理控制台安全组页面,搜索Kafka实例ID来获取安全组ID。

  3. 云消息队列 Kafka 版控制台白名单管理页面,查看Kafka实例的白名单是否为交换机ID的网段。

操作步骤

  1. 分别下载与Kafka和AnalyticDB for MySQL Spark实例版本对应的JAR包。下载链接,请参见Kafka-clientsSpark-sql-kafka-0-10

  2. 在pom.xml文件的dependencies中添加依赖项。

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
        <version>3.2.2</version>
        <scope>test</scope>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.1</version>
    </dependency>
  3. 编写Spark Streaming示例程序来读取Kafka中的消息,并进行编译打包。本文生成的JAR包名称为spark-example.jar

    package com.aliyun.spark.streaming
    
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    object SparkKafka {
      def main(args: Array[String]): Unit = {
        if(args.length < 3){
          System.err.println(
            """
              |args0: groupId
              |args1: topicName
              |args2: bootstrapServers
              |""".stripMargin)
          System.exit(1)
        }
        val groupId = args(0)
        val topicName = args(1)
        val bootstrapServers = args(2)
    
    
        val sparkConf: SparkConf = new SparkConf()
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .setAppName("SparkKafkaSub")
        sparkConf.registerKryoClasses(Array(classOf[ConsumerRecord[_,_]]))
    
        val sparkSession = SparkSession
          .builder()
          .config(sparkConf)
          .getOrCreate()
    
        val df = sparkSession
          .readStream
          .format("kafka")
         //Kafka实例的域名接入点。
          .option("kafka.bootstrap.servers", alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092)
         //Kafka实例的Topic名称。
          .option("subscribe", kafka_test)
         //Kafka实例的Group ID。
          .option("group.id", kafka_groupId)
          .load()
    
        val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
          .writeStream
          .outputMode("append")
          .format("console")
          .start()
        query.awaitTermination()
    
      }
    }
  4. 将下载的JAR包和Spark Streaming示例程序上传至OSS。具体操作,请参见上传文件

  5. 进入Spark开发编辑器。

    1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版湖仓版页签下,单击目标集群ID。

    2. 在左侧导航栏,单击作业开发 > Spark Jar 开发

  6. 在编辑器窗口上方,选择Job型资源组和Spark作业类型。本文以Batch类型为例。

  7. 在编辑器中执行以下作业内容。

    {
        "args": [
          "kafka_groupId",
          "kafka_test",
          "alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092"
        ],
        "file": "oss://<bucket_name>/spark-example.jar",
        "jars": "oss://<bucket_name>/kafka-clients-2.8.1.jar,oss://<bucket_name>/spark-sql-kafka-0-10_2.12-3.2.0.jar",
        "name": "Kafka Example",
        "className": "com.aliyun.spark.streaming.SparkKafka",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small",
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****"
        }
    }

    参数说明如下。

    参数名称

    参数说明

    args

    Spark作业传入的参数,多个参数之间以英文逗号(,)分隔。

    file

    Spark作业主文件的存储位置。主文件是入口类所在的JAR包或者Python的入口执行文件。

    说明

    Spark作业主文件目前只支持存储在OSS中。

    jars

    Spark作业依赖的JAR包,多个JAR包之间以英文逗号(,)分隔。

    name

    Spark作业名称。

    className

    Java或者Scala程序入口类。Python不需要指定入口类。

    spark.adb.eni.enabled

    是否开启ENI访问。使用企业版或湖仓版Spark访问kafka数据源时,需要开启ENI访问。

    spark.adb.eni.vswitchId

    准备工作中获取的交换机ID。

    spark.adb.eni.securityGroupId

    准备工作中获取的安全组ID。

    conf其他参数

    与开源Spark中的配置项基本一致,参数格式为key:value形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB for MySQL特有的配置参数,请参见Spark应用配置参数说明

  8. 单击立即执行