本文介绍了如何使用DLA Spark Streaming访问LogHub。
前提条件
- 已经创建了Spark虚拟集群。具体操作请参见创建虚拟集群。
- 已经开通对象存储OSS(Object Storage Service)服务。具体操作请参见开通OSS服务。
操作步骤
- 准备以下测试代码来连接LogHub,并将测试代码打包成jar包上传至您的OSS。
package com.aliyun.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
object SparkLogHub {
def main(args: Array[String]): Unit = {
if (args.length < 8) {
System.err.println(
"""Usage: LoghubSample <sls project> <sls logstore> <loghub group name> <sls endpoint>
| <access key id> <access key secret> <batch interval seconds> <checkpoint dir>
""".stripMargin)
System.exit(1)
}
val loghubProject = args(0)
val logStore = args(1)
val loghubGroupName = args(2)
val endpoint = args(3)
val accessKeyId = args(4)
val accessKeySecret = args(5)
val batchInterval = Milliseconds(args(6).toInt * 1000)
val checkPointDir = args(7)
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("LoghubSample")
val ssc = new StreamingContext(conf, batchInterval)
val loghubStream = LoghubUtils.createStream(
ssc,
loghubProject,
logStore,
loghubGroupName,
endpoint,
accessKeyId,
accessKeySecret,
StorageLevel.MEMORY_AND_DISK)
loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd => println(rdd.count()))
ssc.checkpoint(checkPointDir) // set checkpoint directory
ssc
}
val ssc = StreamingContext.getOrCreate(checkPointDir, functionToCreateContext _)
ssc.start()
ssc.awaitTermination()
}
}
- 登录Data Lake Analytics管理控制台。
- 在页面左上角,选择LogHub所在地域。
- 单击左侧导航栏中的。
- 在作业编辑页面,单击创建作业。
- 在创建作业模板页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
- 单击Spark作业名,在Spark作业编辑框中输入以下Spark Streaming任务内容。保存并提交Spark作业。
{
"args": [
"<sls project>", #sls的project名称。
"<ls logstore>, #sls的logstore名称。
"<loghub group name>", #sls的消费者组名称。
"<sls endpoint>", #sls的endpoint。
"<access key id>", #访问sls所需的accessKeyId。
"<access key secret>", #访问sls所需的accessKeySecret。
"<batch interval seconds>", #Streaming的batch interval。
"<checkpoint dir>" #checkpoint的oss路径。
],
"name": "LogHub",
"className": "com.aliyun.spark.streaming.SparkLogHub",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.dla.connectors": "oss",
"spark.executor.instances": 1,
"spark.dla.job.log.oss.uri": "oss://</path/to/store/your/spark/log>", #存放Spark日志的路径。
"spark.executor.resourceSpec": "medium"
},
"file": "oss://path/to/spark-examples-0.0.1-SNAPSHOT-shaded.jar"
}