本文简单介绍如何使用 Spark RDD API 开发一个离线作业消费 LogService 数据。

示例代码

## TestBatchLoghub.Scala

object TestBatchLoghub {
  def main(args: Array[String]): Unit = {
    if (args.length < 6) {
      System.err.println(
        """Usage: TestBatchLoghub <sls project> <sls logstore> <sls endpoint>
          |  <access key id> <access key secret> <output path> <start time> <end time=now>
        """.stripMargin)
      System.exit(1)
    }

    val loghubProject = args(0)
    val logStore = args(1)
    val endpoint = args(2)
    val accessKeyId = args(3)
    val accessKeySecret = args(4)
    val outputPath = args(5)
    val startTime = args(6).toLong

    val sc = new SparkContext(new SparkConf().setAppName("test batch loghub"))
    var rdd:JavaRDD[String] = null
    if (args.length > 7) {
      rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime, args(7).toLong)
    } else {
      rdd = LoghubUtils.createRDD(sc, loghubProject, logStore, accessKeyId, accessKeySecret, endpoint, startTime)
    }

    rdd.saveAsTextFile(outputPath)
  }
}
说明 Maven pom 文件可以参见aliyun-emapreduce-demo

编译运行

## 编译命令
mvn clean package -DskipTests

## 编译完后,作业jar包位于target/shaded/下

## 提交执行

spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g 
--num-executors 2--class x.x.x.TestBatchLoghub xxx.jar <sls project> <sls logstore> 
<sls endpoint> <access key id> <access key secret> <output path> <start time> [<end time=now>]
注意
  • x.x.x.TestBatchLoghub 和 xxx.jar 需要替换成真实的类路径和包路径。
  • 作业资源需要根据实际数据规模和实际集群规模调整,如果集群太小,直接运行以上命令可能无法执行。