本文介绍DLA Serverless Spark如何提交Spark Streaming作业

背景信息

文中示例将使用DLA Serverless Spark内网连接阿里云消息队列Kafka

前提条件

在DLA Serverless Spark中运行Spark Streaming任务前,您需要完成以下准备工作。
  • 授权DLA Serverless Spark访问用户VPC的功能。
  • DLA控制台的虚拟集群管理中确认您的vc使用的版本是spark_2_4_5-dla_1_2_0及以上版本。
  • 访问用户VPC所需要的交换机可使用Kafka服务中的交换机,在Kafak服务控制台,实例详情中可以找到。1
  • 访问用户VPC所需要的安全组可以使用Kafka服务中的安全组,在安全组列表页面,按照安全组名称搜索Kafka消息队列的实例ID即可找到对应的安全组。安全组

操作步骤

  1. 登录Data Lake Analytics管理控制台
  2. 在页面左上角,选择Kafka服务所在地域。
  3. 单击左侧导航栏中的Serverless Spark > 作业管理
  4. 作业编辑页面,单击创建作业
  5. 创建作业页面,按照页面提示进行参数配置后,单击确定创建Spark作业。3
  6. 单击Spark作业名,在Spark作业编辑框中输入Spark Streaming任务内容。
    {
        "file": "oss://path/to/xxx.jar",
        "name": "Kafka",
        "className": "com.alibabacloud.cwchan.SparkKafkaSub",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs",
            "spark.dla.eni.vswitch.id": "{vswitch-id}",
            "spark.dla.eni.security.group.id": "{security-group-id}",
            "spark.dla.eni.enable": "true"
        }
    }
    注意 如果是使用子账号提交作业,需要配置spark.dla.roleArn参数,详情请参见使用RAM子账号开发Spark作业。

示例代码

以下是连接Kafka的核心代码片段:
        val sparkConf: SparkConf = new SparkConf()
      .setAppName("SparkKafkaSub")
    val sparkSessoin = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()
    val df = sparkSessoin
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topicName)
      .option("group.id", groupId)
      .load()
    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .outputMode("append")
      .format("console")
      .start()
    query.awaitTermination()
说明 该代码消费Kafka的消息,并打印Key-Value对。