本文介绍Spark Streaming如何消费Log Service中的日志数据和统计日志条数。
Spark接入Log Service
方法一:Receiver Based DStream
val logServiceProject = args(0) // LogService中的project名。 val logStoreName = args(1) // LogService中的logstore名。 val loghubConsumerGroupName = args(2) // loghubGroupName相同的作业将共同消费logstore的数据。 val loghubEndpoint = args(3) // 阿里云日志服务数据类API Endpoint。 val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID") // 访问日志服务的AccessKey Id。 val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET") // 访问日志服务的AccessKey Secret。 val numReceivers = args(4).toInt // 启动多少个Receiver来读取logstore中的数据。 val batchInterval = Milliseconds(args(5).toInt * 1000) // Spark Streaming中每次处理批次时间间隔。 val conf = new SparkConf().setAppName("Test Loghub Streaming") val ssc = new StreamingContext(conf, batchInterval) val loghubStream = LoghubUtils.createStream( ssc, logServiceProject, logStoreName, loghubConsumerGroupName, loghubEndpoint, numReceivers, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK) loghubStream.foreachRDD(rdd => println(rdd.count())) ssc.start() ssc.awaitTermination()
说明运行代码示例前必须先配置环境变量。关于如何配置环境变量,请参见配置环境变量。
方法二: Direct API Based DStream
val logServiceProject = args(0) val logStoreName = args(1) val loghubConsumerGroupName = args(2) val loghubEndpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) val zkConnect = args(7) val checkpointPath = args(8) def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("Test Direct Loghub Streaming") val ssc = new StreamingContext(conf, batchInterval) val zkParas = Map("zookeeper.connect" -> zkConnect, "enable.auto.commit" -> "false") val loghubStream = LoghubUtils.createDirectStream( ssc, logServiceProject, logStoreName, loghubConsumerGroupName, accessKeyId, accessKeySecret, loghubEndpoint, zkParas, LogHubCursorPosition.END_CURSOR) ssc.checkpoint(checkpointPath) val stream = loghubStream.checkpoint(batchInterval) stream.foreachRDD(rdd => { println(rdd.count()) loghubStream.asInstanceOf[CanCommitOffsets].commitAsync() }) ssc } val ssc = StreamingContext.getOrCreate(checkpointPath, functionToCreateContext _) ssc.start() ssc.awaitTermination()
从E-MapReduce SDK 1.4.0版本开始,提供基于Direct API的实现方式。此种方式可以避免将Loghub数据重复存储到Write Ahead Log中,即无需开启Spark Streaming的WAL特性即可实现数据的at least once。目前Direct API实现方式处于experimental状态,需要注意以下几点:
在DStream的action中,必须做一次commit操作。
一个Spark Streaming中,不支持对LogStore数据源做多个action操作。
Direct API方式需要Zookeeper服务的支持。
支持MetaService
上面的例子中是显式地将AccessKey传入到接口中,但是从E-MapReduce SDK 1.3.2版本开始,Spark Streaming可以基于MetaService实现免AccessKey处理LogService数据,具体可以参见E-MapReduce SDK中的LoghubUtils类说明。
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
E-MapReduce SDK支持Log Service的三种消费模式,即BEGIN_CURSOR、END_CURSOR和SPECIAL_TIMER_CURSOR,默认是 END_CURSOR。
BEGIN_CURSOR:从日志头开始消费,如果有checkpoint记录,则从checkpoint处开始消费。
END_CURSOR:从日志尾开始消费,如果有checkpoint记录,则从checkpoint处开始消费。
SPECIAL_TIMER_CURSOR:从指定时间点开始消费,如果有checkpoint记录,则从checkpoint处开始消费,单位为秒。
以上三种消费模式都受到checkpoint记录的影响,如果存在checkpoint记录,则从checkpoint处开始消费,不管指定的是什么消费模式。E-MapReduce SDK基于“SPECIAL_TIMER_CURSOR”模式支持用户强制在指定时间点开始消费,在LoghubUtils#createStream接口中,以下参数需要组合使用:
cursorPosition:LogHubCursorPosition.SPECIAL_TIMER_CURSOR
forceSpecial:true
E-MapReduce的服务器(除了Master节点)无法连接公网。配置LogService endpoint时,请注意使用Log Service提供的内网 endpoint,否则无法请求到Log Service。
附录
完整示例代码,请参见Spark接入LogService。