Java/Scala

更新时间:
复制为 MD 格式

本文以spark-3.x为示例做代码演示。

下载模板项目工程

如已下载项目工程,该步骤可跳过。下载步骤参考本地模式下使用Spark客户端

模板项目工程中提供了spark-1.x、spark-2.x、spark-3.x的样例代码,本文将以spark-3.x为主做案例演示。

示例一:WordCount

  • 详细代码

    package com.aliyun.odps.spark.examples
    
    import org.apache.spark.sql.SparkSession
    
    object WordCount {
      def main(args: Array[String]) {
        val spark = SparkSession
          .builder()
          .appName("WordCount")
          .getOrCreate()
        val sc = spark.sparkContext
    
        try {
          sc.parallelize(1 to 100, 10).map(word => (word, 1)).reduceByKey(_ + _, 10).take(100).foreach(println)
        } finally {
          sc.stop()
        }
      }
    }
  • 提交方式

    cd PATH_TO_SPARK/MaxCompute-Spark/spark-3.x
    mvn clean package
    
    cd $SPARK_HOME
    bin/spark-submit --class \
        com.aliyun.odps.spark.examples.WordCount \
        PATH_TO_PROJECT/MaxCompute-Spark/spark-3.x/target/spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar

示例二:GraphX PageRank

  • 详细代码

    package com.aliyun.odps.spark.examples.graphx
    
    import org.apache.spark.graphx.{Edge, Graph, VertexId}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SparkSession
    
    object PageRank {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("PageRank")
          .getOrCreate()
        val sc = spark.sparkContext
    
        // build vertices
        val users: RDD[(VertexId, Array[String])] = sc.parallelize(List(
          "1,BarackObama,Barack Obama",
          "2,ladygaga,Goddess of Love",
          "3,jeresig,John Resig",
          "4,justinbieber,Justin Bieber",
          "6,matei_zaharia,Matei Zaharia",
          "7,odersky,Martin Odersky",
          "8,anonsys"
        ).map(line => line.split(",")).map(parts => (parts.head.toLong, parts.tail)))
    
        // build edges
        val followers: RDD[Edge[Double]] = sc.parallelize(Array(
          Edge(2L, 1L, 1.0),
          Edge(4L, 1L, 1.0),
          Edge(1L, 2L, 1.0),
          Edge(6L, 3L, 1.0),
          Edge(7L, 3L, 1.0),
          Edge(7L, 6L, 1.0),
          Edge(6L, 7L, 1.0),
          Edge(3L, 7L, 1.0)
        ))
    
        // build graph
        val followerGraph: Graph[Array[String], Double] = Graph(users, followers)
    
        // restrict the graph to users with usernames and names
        val subgraph = followerGraph.subgraph(vpred = (vid, attr) => attr.size == 2)
    
        // compute PageRank
        val pageRankGraph = subgraph.pageRank(0.001)
    
        // get attributes of the top pagerank users
        val userInfoWithPageRank = subgraph.outerJoinVertices(pageRankGraph.vertices) {
          case (uid, attrList, Some(pr)) => (pr, attrList.toList)
          case (uid, attrList, None) => (0.0, attrList.toList)
        }
    
        println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
      }
    }
  • 提交方式

    cd PATH_TO_SPARK/MaxCompute-Spark/spark-3.x
    mvn clean package
    
    cd $SPARK_HOME
    bin/spark-submit --class \
        com.aliyun.odps.spark.examples.graphx.PageRank \
        PATH_TO_PROJECT/MaxCompute-Spark/spark-3.x/target/spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar

示例三:MaxCompute Table读写

  • 详细代码

    package com.aliyun.odps.spark.examples.sparksql
    
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    object SparkSQL {
      def main(args: Array[String]) {
        val spark = SparkSession
          .builder()
          .appName("SparkSQL-on-MaxCompute")
          .config("spark.sql.broadcastTimeout", 20 * 60)
          .config("spark.sql.crossJoin.enabled", true)
          .config("spark.sql.defaultCatalog","odps")
          .config("spark.sql.catalog.odps", "org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog")
          .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
          .config("spark.sql.extensions", "org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions")
          .config("spark.sql.catalogImplementation","hive")
          .getOrCreate()
    
        import spark._
        import sqlContext.implicits._
        val tableName = "mc_test_table"
        val ptTableName = "mc_test_pt_table"
        // Drop Create
        sql(s"DROP TABLE IF EXISTS ${tableName}")
        sql(s"DROP TABLE IF EXISTS ${ptTableName}")
    
        sql(s"CREATE TABLE ${tableName} (name STRING, num BIGINT)")
        sql(s"CREATE TABLE ${ptTableName} (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING)")
    
        val df = spark.sparkContext.parallelize(0 to 99, 2).map(f => {
          (s"name-$f", f)
        }).toDF("name", "num")
    
        val ptDf = spark.sparkContext.parallelize(0 to 99, 2).map(f => {
          (s"name-$f", f, "2018", "0601")
        }).toDF("name", "num", "pt1", "pt2")
    
        // 写普通表
        df.write.insertInto(tableName) // insertInto语义
        df.writeTo(tableName).overwritePartitions() // insertOverwrite use datasourceV2
    
        // 写分区表
        // DataFrameWriter无法指定分区写入,需要通过临时表再用SQL写入特定分区
        df.createOrReplaceTempView(s"${ptTableName}_tmp_view")
        sql(s"insert into table ${ptTableName} partition (pt1='2018', pt2='0601') select * from ${ptTableName}_tmp_view")
        sql(s"insert overwrite table ${ptTableName} partition (pt1='2018', pt2='0601') select * from ${ptTableName}_tmp_view")
    
        ptDf.write.insertInto(ptTableName) // 动态分区 insertInto语义
        ptDf.write.mode("overwrite").insertInto(ptTableName) // 动态分区 insertOverwrite语义
    
        // 读普通表
        val rdf = sql(s"select name, num from $tableName")
        println(s"rdf show, ${rdf.count()}")
        rdf.show()
        rdf.printSchema()
    
        // 读分区表
        val rptdf = sql(s"select name, num, pt1, pt2 from $ptTableName where pt1 = '2018' and pt2 = '0601'")
        println(s"rptdf show, ${rptdf.count()}")
        rptdf.show()
        rptdf.printSchema()
      }
    }
  • 提交方式

    cd PATH_TO_SPARK/MaxCompute-Spark/spark-3.x
    mvn clean package
    
    cd $SPARK_HOME
    bin/spark-submit --class \
        com.aliyun.odps.spark.examples.sparksql.SparkSQL \
        PATH_TO_PROJECT/MaxCompute-Spark/spark-3.x/target/spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar

示例四:访问阿里云OSS

参考访问阿里云OSS添加OSS依赖。

  • 详细代码

    package com.aliyun.odps.spark.examples.oss
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object JindoFsDemo {
      def main(args: Array[String]): Unit = {
        val bucket : String = args(0)
        val ossPath : String = args(1)
    
        //using access-key-id/access-key-secret
        val conf = new SparkConf()
          .setAppName("jindo-fs-demo")
          .set("spark.hadoop.fs.AbstractFileSystem.oss.impl", "com.aliyun.emr.fs.oss.OSS")
          .set("spark.hadoop.fs.oss.impl", "com.aliyun.emr.fs.oss.JindoOssFileSystem")
          .set("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-internal.aliyuncs.com")
          .set("spark.hadoop.fs.oss.accessKeyId", "xxx")
          .set("spark.hadoop.fs.oss.accessKeySecret", "xxx")
    
        val sc = new SparkContext(conf)
    
        try {
          read_oss_dir(sc, "demo", s"oss://${bucket}/${ossPath}")
        } finally {
          sc.stop()
        }
      }
    
      /**
        * compute cost time using jindo sdk
        */
      def read_oss_dir(sc: SparkContext, job_des:String, ossPath: String): Unit = {
        val startTime: Long = System.currentTimeMillis()
        val inputData = sc.textFile(ossPath, 20)
        val cnt = inputData.count
        val endTime:Long = System.currentTimeMillis()
        val cost:Long = endTime - startTime
        println(s"job:$job_des, count:$cnt, consume:$cost")
      }
    }
  • 提交方式

    cd {{PATH_TO_SPARK}}/MaxCompute-Spark/spark-3.x
    mvn clean package
    
    cd $SPARK_HOME
    bin/spark-submit --class \
        com.aliyun.odps.spark.examples.oss.JindoFsDemo \
        {{PATH_TO_PROJECT}}/MaxCompute-Spark/spark-3.x/target/spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar \
        {{OSS_BUCKET}} \
        {{OSS_PATH}}