Kafka

本文介绍如何通过DLA Serverless Spark访问消息队列Kafka版。

重要

云原生数据湖分析(DLA)产品已退市,云原生数据仓库 AnalyticDB MySQL 版湖仓版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相关使用文档,请参见访问Kafka数据源

前提条件

在DLA Serverless Spark中访问消息队列Kafka版前,您需要正确配置Kafka数据源网络。具体如下:

  • 授权DLA Serverless Spark访问用户VPC的功能,具体内容,请参见配置数据源网络

    1. 访问用户VPC所需要的交换机可使用Kafka服务中的交换机。在Kafka服务控制台的实例详情中您可以找到Kafka服务中的交换机。1

    2. 访问用户VPC所需要的安全组可以使用Kafka服务中的安全组,在安全组列表页面,按照安全组名称搜索Kafka消息队列的实例ID即可找到对应的安全组。安全组

    3. 往kafka白名单中添加第一步中选中的VswitchID的网段。

      新版Kafka界面加白名单

      新版kafka界面白名单

      旧版Kafka UI加白名单

      旧版Kafka加白名单

操作步骤

  1. 登录Data Lake Analytics管理控制台

  2. 在页面左上角,选择Kafka服务所在地域。

  3. 单击左侧导航栏中的Serverless Spark > 作业管理

  4. 作业编辑页面,单击创建作业

  5. 创建作业模板页面,按照页面提示进行参数配置后,单击确定创建Spark作业。3

  6. 单击Spark作业名,在Spark作业编辑框中输入Spark Streaming任务内容。

    {
        "file": "oss://path/to/xxx.jar",
        "name": "Kafka",
        "className": "com.alibabacloud.cwchan.SparkKafkaSub",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs",
            "spark.dla.eni.vswitch.id": "{vswitch-id}",
            "spark.dla.eni.security.group.id": "{security-group-id}",
            "spark.dla.eni.enable": "true"
        }
    }

    编译打包过程中需要打包Spark-Kafka的相关依赖,如下所示。

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.2</version>
    </dependency>
                        
    重要

    如果是使用子账号提交作业,需要配置子账号权限,请参见快速配置子账号权限

示例代码

以下是连接Kafka的核心代码片段,完整代码参考

    val sparkConf: SparkConf = new SparkConf()
      .setAppName("SparkKafkaSub")
    val sparkSessoin = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()
    val df = sparkSessoin
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topicName)
      .option("group.id", groupId)
      .load()
    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .outputMode("append")
      .format("console")
      .start()
    query.awaitTermination()
说明

该代码消费Kafka的消息,并打印Key-Value对。