Spark Streaming消费

日志服务采集到日志数据后,您可以通过运行Spark Streaming任务消费日志数据。

日志服务提供的Spark SDK实现了Receiver模式和Direct模式两种消费模式。Maven依赖如下:

<dependency>
  <groupId>com.aliyun.emr</groupId>
  <artifactId>emr-logservice_2.11</artifactId>
  <version>1.7.2</version>
</dependency>

Receiver模式

Receiver模式通过消费组消费日志数据并暂存在Spark Executor,Spark Streaming任务启动之后从Executor读取并处理数据。每条数据以JSON字符串的形式返回。消费组自动定时保存Checkpoint到服务端,无需手动更新Checkpoint。更多信息,请参见通过消费组消费日志数据

  • 参数说明

    参数

    类型

    说明

    project

    String

    日志服务Project名称。

    logstore

    String

    日志服务Logstore名称。

    consumerGroup

    String

    消费组名称。

    endpoint

    String

    日志服务Project所在地域的服务入口。更多信息,请参见服务接入点

    accessKeyId

    String

    访问日志服务的AccessKey ID。

    accessKeySecret

    String

    访问日志服务的AccessKey Secret。

  • 示例

    说明

    默认配置下,Receiver模式在异常情况下可能导致数据丢失。为了避免此类情况发生,建议开启Write-Ahead Logs开关(Spark 1.2以上版本支持)。更多关于Write-Ahead Logs的细节请参见Spark

    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.SparkConf
    
    object TestLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd =>
            rdd.map(bytes => new String(bytes)).top(10).foreach(println)
          )
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

Direct模式

Direct模式不需要消费组,使用API在任务运行时直接从服务端请求数据。Direct模式具有如下优势:

  • 简化并行:Spark partition个数与Logstore Shard总数一致。只需分裂Shard即可提高任务的并行度。

  • 高效:不需要开启Write-Ahead Logs来保证数据不丢失。

  • Exactly-once语义:直接从服务端按需获取数据,任务成功之后再提交Checkpoint。

    由于Spark异常退出或者其他原因导致任务未正常结束,可能会导致部分数据被重复消费。

Direct模式需要依赖ZooKeeper环境,用于临时保存中间状态。同时,必须设置Checkpoint目录。中间状态数据保存在ZooKeeper内对应的Checkpoint目录内。如果任务重启之后希望重新消费,需要在ZooKeeper内删除该目录,并更改消费组名称。

  • 参数说明

    参数

    类型

    说明

    project

    String

    日志服务Project名称。

    logstore

    String

    日志服务Logstore名称。

    consumerGroup

    String

    消费组名称,仅用于保存消费位置。

    endpoint

    String

    日志服务Project所在地域的服务入口。更多信息,请参见服务接入点

    accessKeyId

    String

    访问日志服务的Access Key ID。

    accessKeySecret

    String

    访问日志服务的Access Key Secret。

    zkAddress

    String

    ZooKeeper的连接地址。

  • 限流配置

    Spark Streaming流式消费是将数据分成微小的批次进行处理,因此日志服务开始消费时,需预知每个批次的边界,即每个批次需要获取的数据条数。

    日志服务底层的存储模型以LogGroup为单位,正常情况下每个LogGroup对应一次写入请求,例如一次写入请求可能包含数千条日志,这些日志作为一个LogGroup进行存储和消费。而通过WebTracking方式写入日志时,每次写入请求仅包含一条日志,即一个LogGroup只有一条日志。为了能够应对不同写入场景的消费需求,SDK提供如下两个参数进行限流配置。

    参数

    说明

    默认值

    spark.loghub.batchGet.step

    限制单次消费请求获取的最大LogGroup个数。

    100

    spark.streaming.loghub.maxRatePerShard

    限制单批次内每个Shard消费的日志条数。

    10000

    通过spark.streaming.loghub.maxRatePerShard可指定每个批次每个Shard期望消费的最大日志条数。Spark SDK的实现原理是每次从服务端获取spark.loghub.batchGet.step中的LogGroup个数并累计其中的日志条数,直到达到或超过spark.streaming.loghub.maxRatePerShard。因此spark.streaming.loghub.maxRatePerShard并非一个精确控制单批次消费日志条数的参数,实际上每个批次消费的日志条数与spark.loghub.batchGet.step以及每个LogGroup中包含的日志条数相关。

  • 示例

    import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.streaming.aliyun.logservice.{CanCommitOffsets, LoghubUtils}
    
    object TestDirectLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkAddress = if (args.length >= 8) args(7) else "localhost:2181"
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkAddress,
            "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            accessKeyId,
            accessKeySecret,
            endpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
    
          loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
            println(s"count by key: ${rdd.map(s => {
              s.sorted
              (s.length, s)
            }).countByKey().size}")
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()
      }
    }

更多信息,请参见GitHub