本文以spark-3.x为示例做代码演示。
下载模板项目工程
如已下载项目工程,该步骤可跳过。下载步骤参考本地模式下使用Spark客户端。
模板项目工程中提供了spark-1.x、spark-2.x、spark-3.x的样例代码,本文将以spark-3.x为主做案例演示。
示例一:WordCount
package com.aliyun.odps.spark.examples import org.apache.spark.sql.SparkSession object WordCount { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("WordCount") .getOrCreate() val sc = spark.sparkContext try { sc.parallelize(1 to 100, 10).map(word => (word, 1)).reduceByKey(_ + _, 10).take(100).foreach(println) } finally { sc.stop() } } }提交方式
cd PATH_TO_SPARK/MaxCompute-Spark/spark-3.x mvn clean package cd $SPARK_HOME bin/spark-submit --class \ com.aliyun.odps.spark.examples.WordCount \ PATH_TO_PROJECT/MaxCompute-Spark/spark-3.x/target/spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar
示例二:GraphX PageRank
package com.aliyun.odps.spark.examples.graphx import org.apache.spark.graphx.{Edge, Graph, VertexId} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession object PageRank { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("PageRank") .getOrCreate() val sc = spark.sparkContext // build vertices val users: RDD[(VertexId, Array[String])] = sc.parallelize(List( "1,BarackObama,Barack Obama", "2,ladygaga,Goddess of Love", "3,jeresig,John Resig", "4,justinbieber,Justin Bieber", "6,matei_zaharia,Matei Zaharia", "7,odersky,Martin Odersky", "8,anonsys" ).map(line => line.split(",")).map(parts => (parts.head.toLong, parts.tail))) // build edges val followers: RDD[Edge[Double]] = sc.parallelize(Array( Edge(2L, 1L, 1.0), Edge(4L, 1L, 1.0), Edge(1L, 2L, 1.0), Edge(6L, 3L, 1.0), Edge(7L, 3L, 1.0), Edge(7L, 6L, 1.0), Edge(6L, 7L, 1.0), Edge(3L, 7L, 1.0) )) // build graph val followerGraph: Graph[Array[String], Double] = Graph(users, followers) // restrict the graph to users with usernames and names val subgraph = followerGraph.subgraph(vpred = (vid, attr) => attr.size == 2) // compute PageRank val pageRankGraph = subgraph.pageRank(0.001) // get attributes of the top pagerank users val userInfoWithPageRank = subgraph.outerJoinVertices(pageRankGraph.vertices) { case (uid, attrList, Some(pr)) => (pr, attrList.toList) case (uid, attrList, None) => (0.0, attrList.toList) } println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n")) } }提交方式
cd PATH_TO_SPARK/MaxCompute-Spark/spark-3.x mvn clean package cd $SPARK_HOME bin/spark-submit --class \ com.aliyun.odps.spark.examples.graphx.PageRank \ PATH_TO_PROJECT/MaxCompute-Spark/spark-3.x/target/spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar
示例三:MaxCompute Table读写
package com.aliyun.odps.spark.examples.sparksql import org.apache.spark.sql.{SaveMode, SparkSession} object SparkSQL { def main(args: Array[String]) { val spark = SparkSession .builder() .appName("SparkSQL-on-MaxCompute") .config("spark.sql.broadcastTimeout", 20 * 60) .config("spark.sql.crossJoin.enabled", true) .config("spark.sql.defaultCatalog","odps") .config("spark.sql.catalog.odps", "org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog") .config("spark.sql.sources.partitionOverwriteMode", "dynamic") .config("spark.sql.extensions", "org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions") .config("spark.sql.catalogImplementation","hive") .getOrCreate() import spark._ import sqlContext.implicits._ val tableName = "mc_test_table" val ptTableName = "mc_test_pt_table" // Drop Create sql(s"DROP TABLE IF EXISTS ${tableName}") sql(s"DROP TABLE IF EXISTS ${ptTableName}") sql(s"CREATE TABLE ${tableName} (name STRING, num BIGINT)") sql(s"CREATE TABLE ${ptTableName} (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING)") val df = spark.sparkContext.parallelize(0 to 99, 2).map(f => { (s"name-$f", f) }).toDF("name", "num") val ptDf = spark.sparkContext.parallelize(0 to 99, 2).map(f => { (s"name-$f", f, "2018", "0601") }).toDF("name", "num", "pt1", "pt2") // 写普通表 df.write.insertInto(tableName) // insertInto语义 df.writeTo(tableName).overwritePartitions() // insertOverwrite use datasourceV2 // 写分区表 // DataFrameWriter无法指定分区写入,需要通过临时表再用SQL写入特定分区 df.createOrReplaceTempView(s"${ptTableName}_tmp_view") sql(s"insert into table ${ptTableName} partition (pt1='2018', pt2='0601') select * from ${ptTableName}_tmp_view") sql(s"insert overwrite table ${ptTableName} partition (pt1='2018', pt2='0601') select * from ${ptTableName}_tmp_view") ptDf.write.insertInto(ptTableName) // 动态分区 insertInto语义 ptDf.write.mode("overwrite").insertInto(ptTableName) // 动态分区 insertOverwrite语义 // 读普通表 val rdf = sql(s"select name, num from $tableName") println(s"rdf show, ${rdf.count()}") rdf.show() rdf.printSchema() // 读分区表 val rptdf = sql(s"select name, num, pt1, pt2 from $ptTableName where pt1 = '2018' and pt2 = '0601'") println(s"rptdf show, ${rptdf.count()}") rptdf.show() rptdf.printSchema() } }提交方式
cd PATH_TO_SPARK/MaxCompute-Spark/spark-3.x mvn clean package cd $SPARK_HOME bin/spark-submit --class \ com.aliyun.odps.spark.examples.sparksql.SparkSQL \ PATH_TO_PROJECT/MaxCompute-Spark/spark-3.x/target/spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar
示例四:访问阿里云OSS
参考访问阿里云OSS添加OSS依赖。
package com.aliyun.odps.spark.examples.oss import org.apache.spark.{SparkConf, SparkContext} object JindoFsDemo { def main(args: Array[String]): Unit = { val bucket : String = args(0) val ossPath : String = args(1) //using access-key-id/access-key-secret val conf = new SparkConf() .setAppName("jindo-fs-demo") .set("spark.hadoop.fs.AbstractFileSystem.oss.impl", "com.aliyun.emr.fs.oss.OSS") .set("spark.hadoop.fs.oss.impl", "com.aliyun.emr.fs.oss.JindoOssFileSystem") .set("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-internal.aliyuncs.com") .set("spark.hadoop.fs.oss.accessKeyId", "xxx") .set("spark.hadoop.fs.oss.accessKeySecret", "xxx") val sc = new SparkContext(conf) try { read_oss_dir(sc, "demo", s"oss://${bucket}/${ossPath}") } finally { sc.stop() } } /** * compute cost time using jindo sdk */ def read_oss_dir(sc: SparkContext, job_des:String, ossPath: String): Unit = { val startTime: Long = System.currentTimeMillis() val inputData = sc.textFile(ossPath, 20) val cnt = inputData.count val endTime:Long = System.currentTimeMillis() val cost:Long = endTime - startTime println(s"job:$job_des, count:$cnt, consume:$cost") } }提交方式
cd {{PATH_TO_SPARK}}/MaxCompute-Spark/spark-3.x mvn clean package cd $SPARK_HOME bin/spark-submit --class \ com.aliyun.odps.spark.examples.oss.JindoFsDemo \ {{PATH_TO_PROJECT}}/MaxCompute-Spark/spark-3.x/target/spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar \ {{OSS_BUCKET}} \ {{OSS_PATH}}
该文章对您有帮助吗?