日志服务采集到日志数据后,您可以通过运行Spark Streaming任务消费日志数据。
使用限制
Spark Streaming消费仅支持Spark 2.x版本。
日志服务提供的Spark SDK实现了Receiver模式和Direct模式两种消费模式。Maven依赖如下:
<dependency>
  <groupId>com.aliyun.emr</groupId>
  <artifactId>emr-logservice_2.11</artifactId>
  <version>1.7.2</version>
</dependency>Receiver模式
Receiver模式通过消费组消费日志数据并暂存在Spark Executor,Spark Streaming任务启动之后从Executor读取并处理数据。每条数据以JSON字符串的形式返回。消费组自动定时保存Checkpoint到服务端,无需手动更新Checkpoint。更多信息,请参见通过消费组消费日志数据。
- 参数说明 - 参数 - 类型 - 说明 - project - String - 日志服务Project名称。 - logstore - String - 日志服务Logstore名称。 - consumerGroup - String - 消费组名称。 - endpoint - String - 日志服务Project所在地域的服务入口。更多信息,请参见服务接入点。 - accessKeyId - String - 访问日志服务的AccessKey ID。 - accessKeySecret - String - 访问日志服务的AccessKey Secret。 
- 示例 说明- 默认配置下,Receiver模式在异常情况下可能导致数据丢失。为了避免此类情况发生,建议开启Write-Ahead Logs开关(Spark 1.2以上版本支持)。更多关于Write-Ahead Logs的细节请参见Spark。 - import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.aliyun.logservice.LoghubUtils import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.SparkConf object TestLoghub { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint> | <access key id> <access key secret> <batch interval seconds> """.stripMargin) System.exit(1) } val project = args(0) val logstore = args(1) val consumerGroup = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("Test Loghub") val ssc = new StreamingContext(conf, batchInterval) val loghubStream = LoghubUtils.createStream( ssc, project, logstore, consumerGroup, endpoint, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK) loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd => rdd.map(bytes => new String(bytes)).top(10).foreach(println) ) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _) ssc.start() ssc.awaitTermination() } }
Direct模式
Direct模式不需要消费组,使用API在任务运行时直接从服务端请求数据。Direct模式具有如下优势:
- 简化并行:Spark partition个数与Logstore Shard总数一致。只需分裂Shard即可提高任务的并行度。 
- 高效:不需要开启Write-Ahead Logs来保证数据不丢失。 
- Exactly-once语义:直接从服务端按需获取数据,任务成功之后再提交Checkpoint。 - 由于Spark异常退出或者其他原因导致任务未正常结束,可能会导致部分数据被重复消费。 
Direct模式需要依赖ZooKeeper环境,用于临时保存中间状态。同时,必须设置Checkpoint目录。中间状态数据保存在ZooKeeper内对应的Checkpoint目录内。如果任务重启之后希望重新消费,需要在ZooKeeper内删除该目录,并更改消费组名称。
- 参数说明 - 参数 - 类型 - 说明 - project - String - 日志服务Project名称。 - logstore - String - 日志服务Logstore名称。 - consumerGroup - String - 消费组名称,仅用于保存消费位置。 - endpoint - String - 日志服务Project所在地域的服务入口。更多信息,请参见服务接入点。 - accessKeyId - String - 访问日志服务的Access Key ID。 - accessKeySecret - String - 访问日志服务的Access Key Secret。 - zkAddress - String - ZooKeeper的连接地址。 
- 限流配置 - Spark Streaming流式消费是将数据分成微小的批次进行处理,因此日志服务开始消费时,需预知每个批次的边界,即每个批次需要获取的数据条数。 - 日志服务底层的存储模型以LogGroup为单位,正常情况下每个LogGroup对应一次写入请求,例如一次写入请求可能包含数千条日志,这些日志作为一个LogGroup进行存储和消费。而通过WebTracking方式写入日志时,每次写入请求仅包含一条日志,即一个LogGroup只有一条日志。为了能够应对不同写入场景的消费需求,SDK提供如下两个参数进行限流配置。 - 参数 - 说明 - 默认值 - spark.loghub.batchGet.step - 限制单次消费请求获取的最大LogGroup个数。 - 100 - spark.streaming.loghub.maxRatePerShard - 限制单批次内每个Shard消费的日志条数。 - 10000 - 通过spark.streaming.loghub.maxRatePerShard可指定每个批次每个Shard期望消费的最大日志条数。Spark SDK的实现原理是每次从服务端获取spark.loghub.batchGet.step中的LogGroup个数并累计其中的日志条数,直到达到或超过spark.streaming.loghub.maxRatePerShard。因此spark.streaming.loghub.maxRatePerShard并非一个精确控制单批次消费日志条数的参数,实际上每个批次消费的日志条数与spark.loghub.batchGet.step以及每个LogGroup中包含的日志条数相关。 
- 示例 - import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition import org.apache.spark.SparkConf import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.aliyun.logservice.{CanCommitOffsets, LoghubUtils} object TestDirectLoghub { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint> | <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181> """.stripMargin) System.exit(1) } val project = args(0) val logstore = args(1) val consumerGroup = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) val zkAddress = if (args.length >= 8) args(7) else "localhost:2181" def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("Test Direct Loghub") val ssc = new StreamingContext(conf, batchInterval) val zkParas = Map("zookeeper.connect" -> zkAddress, "enable.auto.commit" -> "false") val loghubStream = LoghubUtils.createDirectStream( ssc, project, logstore, consumerGroup, accessKeyId, accessKeySecret, endpoint, zkParas, LogHubCursorPosition.END_CURSOR) loghubStream.checkpoint(batchInterval).foreachRDD(rdd => { println(s"count by key: ${rdd.map(s => { s.sorted (s.length, s) }).countByKey().size}") loghubStream.asInstanceOf[CanCommitOffsets].commitAsync() }) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _) ssc.start() ssc.awaitTermination() } }
更多信息,请参见GitHub。