文档

开发入门

更新时间:

本文介绍Spark Streaming如何消费Log Service中的日志数据和统计日志条数。

Spark接入Log Service

  • 方法一:Receiver Based DStream

    val logServiceProject = args(0)    // LogService中的project名。
        val logStoreName = args(1)     // LogService中的logstore名。
        val loghubConsumerGroupName = args(2)  // loghubGroupName相同的作业将共同消费logstore的数据。
        val loghubEndpoint = args(3)  // 阿里云日志服务数据类API Endpoint。
        val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")     // 访问日志服务的AccessKey Id。
        val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET") // 访问日志服务的AccessKey Secret。
        val numReceivers = args(4).toInt  // 启动多少个Receiver来读取logstore中的数据。
        val batchInterval = Milliseconds(args(5).toInt * 1000) // Spark Streaming中每次处理批次时间间隔。
        val conf = new SparkConf().setAppName("Test Loghub Streaming")
        val ssc = new StreamingContext(conf, batchInterval)
        val loghubStream = LoghubUtils.createStream(
          ssc,
          logServiceProject,
          logStoreName,
          loghubConsumerGroupName,
          loghubEndpoint,
          numReceivers,
          accessKeyId,
          accessKeySecret,
          StorageLevel.MEMORY_AND_DISK)
        loghubStream.foreachRDD(rdd => println(rdd.count()))
        ssc.start()
        ssc.awaitTermination()
    说明

    运行代码示例前必须先配置环境变量。关于如何配置环境变量,请参见配置环境变量

  • 方法二: Direct API Based DStream

    val logServiceProject = args(0)
        val logStoreName = args(1)
        val loghubConsumerGroupName = args(2)
        val loghubEndpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkConnect = args(7)
        val checkpointPath = args(8)
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub Streaming")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkConnect, "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            logServiceProject,
            logStoreName,
            loghubConsumerGroupName,
            accessKeyId,
            accessKeySecret,
            loghubEndpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
          ssc.checkpoint(checkpointPath)
          val stream = loghubStream.checkpoint(batchInterval)
          stream.foreachRDD(rdd => {
            println(rdd.count())
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc
        }
        val ssc = StreamingContext.getOrCreate(checkpointPath, functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()

    从E-MapReduce SDK 1.4.0版本开始,提供基于Direct API的实现方式。此种方式可以避免将Loghub数据重复存储到Write Ahead Log中,即无需开启Spark Streaming的WAL特性即可实现数据的at least once。目前Direct API实现方式处于experimental状态,需要注意以下几点:

    • 在DStream的action中,必须做一次commit操作。

    • 一个Spark Streaming中,不支持对LogStore数据源做多个action操作。

    • Direct API方式需要Zookeeper服务的支持。

支持MetaService

上面的例子中是显式地将AccessKey传入到接口中,但是从E-MapReduce SDK 1.3.2版本开始,Spark Streaming可以基于MetaService实现免AccessKey处理LogService数据,具体可以参见E-MapReduce SDK中的LoghubUtils类说明。

LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
说明
  • E-MapReduce SDK支持Log Service的三种消费模式,即BEGIN_CURSOR、END_CURSOR和SPECIAL_TIMER_CURSOR,默认是 END_CURSOR。

    • BEGIN_CURSOR:从日志头开始消费,如果有checkpoint记录,则从checkpoint处开始消费。

    • END_CURSOR:从日志尾开始消费,如果有checkpoint记录,则从checkpoint处开始消费。

    • SPECIAL_TIMER_CURSOR:从指定时间点开始消费,如果有checkpoint记录,则从checkpoint处开始消费,单位为秒。

    以上三种消费模式都受到checkpoint记录的影响,如果存在checkpoint记录,则从checkpoint处开始消费,不管指定的是什么消费模式。E-MapReduce SDK基于“SPECIAL_TIMER_CURSOR”模式支持用户强制在指定时间点开始消费,在LoghubUtils#createStream接口中,以下参数需要组合使用:

    • cursorPosition:LogHubCursorPosition.SPECIAL_TIMER_CURSOR

    • forceSpecial:true

  • E-MapReduce的服务器(除了Master节点)无法连接公网。配置LogService endpoint时,请注意使用Log Service提供的内网 endpoint,否则无法请求到Log Service。

附录

完整示例代码,请参见Spark接入LogService