日志采集到日志服务之后,可以使用日志服务提供的Spark SDK在Spark Streaming中对数据进行处理。

日志服务提供的Spark SDK实现了Receiver和 Direct两种消费模式。

Maven依赖:
<dependency>
  <groupId>com.aliyun.emr</groupId>
  <artifactId>emr-logservice_2.11</artifactId>
  <version>1.7.2</version>
</dependency>

Receiver模式

Receiver模式通过消费组从日志服务消费数据并暂存在Spark Executor,Spark Streaming Job启动之后从Executor读取并处理数据。每条数据以JSON字符串的形式返回。消费组自动定时保存checkpoint到服务端,无需手动更新checkpoint。
  • 使用示例。
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.SparkConf
    
    object TestLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd =>
            rdd.map(bytes => new String(bytes)).top(10).foreach(println)
          )
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
  • 参数说明。
    参数名称 类型 说明
    project String 日志服务Project。
    logstore String 日志服务Logstore。
    consumerGroup String 消费组名称。
    endpoint String 日志服务Project所在region的endpoint。
    accessKeyId String 访问日志服务的Access Key ID。
    accessKeySecret String 访问日志服务的Access Key Secret。
  • 注意事项。

    默认配置下,这种方式在异常情况下可能导致数据丢失,为了避免此类情况发生,可以开启Write-Ahead Logs开关(Spark 1.2 之后支持)。请参见Spark部署章节了解更多关于Write-Ahead Logs的细节。

Direct 模式

Direct模式与Receiver相比不再需要消费组,而是直接使用API在任务运行时从服务端直接请求数据。与Receiver模式相比,Direct模式具有如下优势。
  • 简化并行:Spark partition个数与Logstore Shard总数一致。只需分裂Shard即可提高任务的并行度。
  • 更加高效:不再需要Write-Ahead Logs来保证数据不丢失。
  • Exactly-once 语义:数据直接从服务端按需获取,任务成功之后再提交checkpoint。极端情况下由于Spark异常退出或者其他原因,出现任务未正常结束的情况,可能会导致部分数据被重复消费。

Direct模式需要依赖ZooKeeper环境,用于临时保存中间状态。同时,必须设置checkpoint目录。中间状态数据保存在ZooKeeper内对应的checkpoint目录内。如果任务重启之后希望重新消费,需要在ZooKeeper内删除该目录,并更换消费组名称。

  • 使用示例。
    import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.streaming.aliyun.logservice.{CanCommitOffsets, LoghubUtils}
    
    object TestDirectLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkAddress = if (args.length >= 8) args(7) else "localhost:2181"
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkAddress,
            "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            project,
            logStore,
            consumerGroup,
            accessKeyId,
            accessKeySecret,
            endpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
    
          loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
            println(s"count by key: ${rdd.map(s => {
              s.sorted
              (s.length, s)
            }).countByKey().size}")
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()
      }
    }
  • 参数说明。
    参数名称 类型 说明
    project String 日志服务Project。
    logstore String 日志服务logstore。
    consumerGroup String 消费组名称(仅用于保存消费位置)。
    endpoint String 日志服务Project所在region的endpoint。
    accessKeyId String 访问日志服务的Access Key ID。
    accessKeySecret String 访问日志服务的Access Key Secret。
    zkAddress String ZooKeeper连接地址。
  • 参数设置。
    使用Direct模式消费时,需要指定每个批次每个shard消费的数据行数,否则数据读取过程无法结束。可以通过如下两个配置对单个批次进行限流。
    配置 描述 默认值
    spark.loghub.batchGet.step 单次请求返回LogGroup个数。 100
    spark.streaming.loghub.maxRatePerShard 单个shard每秒希望处理的日志条数。 10000
    每个batch实际处理的日志条数为:shard个数 * max(spark.loghub.batchGet.step * n, spark.streaming.loghub.maxRatePerShard * duration)
    • n:返回总行数达到spark.streaming.loghub.maxRatePerShard * duration所需的请求次数。
    • duration:批处理间隔,单位为秒。

    如果需要Union多个DStream,那Shard个数就是所有Logstore的Shard个数之和。

    • 举例说明。
      假设Shard个数为100,平均每个LogGroup包含50条日志,批处理间隔为2秒。希望每批次处理20000条日志,可以按照如下方式配置。
      • spark.loghub.batchGet.step: 4
      • spark.streaming.loghub.maxRatePerShard: 200

      如果每个LogGroup包含60条日志,希望每批次处理20000条日志,在上述配置下,实际会处理105*2*100 = 24000行数据。

    • 精确限流。

      spark.loghub.batchGet.step越小限流越精确,但是可能造成请求次数增加。建议先统计平均每次写入请求(一个LogGroup)内包含多少行日志,然后合理设置如上两个配置。

单击Spark SDK下载源码。

单击Spark和KafKa集成了解更多Receiver和Direct的区别。