本文介绍了如何使用DLA Spark Streaming访问LogHub。

前提条件

  • 已经创建了Spark虚拟集群。具体操作请参见创建虚拟集群
  • 已经开通对象存储OSS(Object Storage Service)服务。具体操作请参见开通OSS服务

操作步骤

  1. 准备以下测试代码来连接LogHub,并将测试代码打包成jar包上传至您的OSS。
    package com.aliyun.spark.streaming
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    object SparkLogHub {
      def main(args: Array[String]): Unit = {
        if (args.length < 8) {
          System.err.println(
            """Usage: LoghubSample <sls project> <sls logstore> <loghub group name> <sls endpoint>
              |         <access key id> <access key secret> <batch interval seconds> <checkpoint dir>
            """.stripMargin)
          System.exit(1)
        }
    
        val loghubProject = args(0)
        val logStore = args(1)
        val loghubGroupName = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val checkPointDir = args(7)
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("LoghubSample")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            loghubProject,
            logStore,
            loghubGroupName,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd => println(rdd.count()))
          ssc.checkpoint(checkPointDir) // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate(checkPointDir, functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
  2. 登录Data Lake Analytics管理控制台
  3. 在页面左上角,选择LogHub所在地域。
  4. 单击左侧导航栏中的Serverless Spark > 作业管理
  5. 作业编辑页面,单击创建作业
  6. 创建作业模板页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
    3
  7. 单击Spark作业名,在Spark作业编辑框中输入以下Spark Streaming任务内容。保存并提交Spark作业。
    {
        "args": [
            "<sls project>",  #sls的project名称。
            "<ls logstore>,   #sls的logstore名称。
            "<loghub group name>",  #sls的消费者组名称。
            "<sls endpoint>",   #sls的endpoint。
            "<access key id>",  #访问sls所需的accessKeyId。
            "<access key secret>",  #访问sls所需的accessKeySecret。
            "<batch interval seconds>", #Streaming的batch interval。
            "<checkpoint dir>" #checkpoint的oss路径。
        ],
        "name": "LogHub",
        "className": "com.aliyun.spark.streaming.SparkLogHub",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.dla.connectors": "oss",
            "spark.executor.instances": 1,
            "spark.dla.job.log.oss.uri": "oss://</path/to/store/your/spark/log>",  #存放Spark日志的路径。
            "spark.executor.resourceSpec": "medium"
        },
        "file": "oss://path/to/spark-examples-0.0.1-SNAPSHOT-shaded.jar"
    }