本文介绍如何使用AnalyticDB MySQL湖仓版(3.0)Spark通过ENI网络访问消息队列Kafka版。

前提条件

准备工作

  1. 消息队列Kafka版控制台实例详情页面,获取Kafka实例的交换机ID。
  2. ECS管理控制台安全组页面,搜索Kafka实例ID来获取安全组ID。
  3. 消息队列Kafka版控制台白名单管理页面,查看Kafka实例的白名单是否为交换机ID的网段。

操作步骤

  1. 分别下载与Kafka和AnalyticDB MySQL Spark实例版本对应的JAR包。下载链接,请参见Kafka-clientsSpark-sql-kafka-0-10
  2. 在pom.xml文件的dependencies中添加依赖项。
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
        <version>3.2.2</version>
        <scope>test</scope>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.1</version>
    </dependency>
  3. 编写Spark Streaming示例程序来读取Kafka中的消息,并进行编译打包。本文生成的JAR包名称为spark-example.jar
    package com.aliyun.spark.streaming
    
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    object SparkKafka {
      def main(args: Array[String]): Unit = {
        if(args.length < 3){
          System.err.println(
            """
              |args0: groupId
              |args1: topicName
              |args2: bootstrapServers
              |""".stripMargin)
          System.exit(1)
        }
        val groupId = args(0)
        val topicName = args(1)
        val bootstrapServers = args(2)
    
    
        val sparkConf: SparkConf = new SparkConf()
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .setAppName("SparkKafkaSub")
        sparkConf.registerKryoClasses(Array(classOf[ConsumerRecord[_,_]]))
    
        val sparkSession = SparkSession
          .builder()
          .config(sparkConf)
          .getOrCreate()
    
        val df = sparkSession
          .readStream
          .format("kafka")
         //Kafka实例的域名接入点。
          .option("kafka.bootstrap.servers", alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092)
         //Kafka实例的Topic名称。
          .option("subscribe", kafka_test)
         //Kafka实例的Group ID。
          .option("group.id", kafka_groupId)
          .load()
    
        val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
          .writeStream
          .outputMode("append")
          .format("console")
          .start()
        query.awaitTermination()
    
      }
    }
  4. 将下载的JAR包和Spark Streaming示例程序上传至OSS。具体操作,请参见上传文件
  5. 进入Spark开发编辑器。
    1. 登录云原生数据仓库AnalyticDB MySQL控制台
    2. 在页面左上角,选择集群所在地域。
    3. 在左侧导航栏,单击集群列表
    4. 湖仓版(3.0)页签下,单击目标集群ID
    5. 在左侧导航栏,单击作业开发 > Spark Jar 开发
  6. 在编辑器窗口上方,选择Job型资源组和Spark作业类型。本文以Batch类型为例。
  7. 在编辑器中执行以下作业内容。
    {
        "args": [
          "kafka_groupId",
          "kafka_test",
          "alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092"
        ],
        "file": "oss://<bucket_name>/spark-example.jar",
        "jars": "oss://<bucket_name>/kafka-clients-2.8.1.jar,oss://<bucket_name>/spark-sql-kafka-0-10_2.12-3.2.0.jar",
        "name": "Kafka Example",
        "className": "com.aliyun.spark.streaming.SparkKafka",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small",
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****"
        }
    }
    参数说明如下。
    参数名称参数说明
    argsSpark作业传入的参数,多个参数之间以英文逗号(,)分隔。
    fileSpark作业主文件的存储位置。主文件是入口类所在的JAR包或者Python的入口执行文件。
    说明 Spark作业主文件目前只支持存储在OSS中。
    jarsSpark作业依赖的JAR包,多个JAR包之间以英文逗号(,)分隔。
    nameSpark作业名称。
    classNameJava或者Scala程序入口类。Python不需要指定入口类。
    spark.adb.eni.enabled是否开启ENI访问。使用湖仓版(3.0)Spark访问kafka数据源时,需要开启ENI访问。
    spark.adb.eni.vswitchId准备工作中获取的交换机ID。
    spark.adb.eni.securityGroupId准备工作中获取的安全组ID。
    conf其他参数与开源Spark中的配置项基本一致,参数格式为key:value形式,多个参数之间以英文逗号(,)分隔。与开源Spark用法不一致的配置参数及AnalyticDB MySQL特有的配置参数,请参见Conf配置参数
  8. 单击立即执行