本文为您介绍如何在本地搭建Spark的调试环境(以IDEA为例)。
背景信息
本文示例使用的工具如下所示:
Maven 3.8.6
Java 8
IntelliJ IDEA
环境准备
在IDEA中通过 ,新建一个Maven项目。
在pom.xml中添加Spark的相关依赖,下面以Spark 3.3.0为例。
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <spark.version>3.3.0</spark.version> </properties> <dependencies> <!-- spark --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>${spark.version}</version> </dependency> </dependencies>
案例
下面为您介绍一些具体案例。
案例一:SparkPi
该案例介绍如何使用Spark计算Pi的粗略值。
创建测试用例
SparkPi.scala
。import org.apache.spark.sql.SparkSession import scala.math.random object SparkPi { def main(args: Array[String]): Unit = { val spark = SparkSession.builder .appName("Spark Pi") .master("local[4]") .getOrCreate() val slices = if (args.length > 0) args(0).toInt else 2 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow val count = spark.sparkContext.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x * x + y * y <= 1) 1 else 0 }.reduce(_ + _) println(s"Pi is roughly ${4.0 * count / (n - 1)}") spark.stop() } }
运行main,运行成功后,返回以下信息。
Pi is roughly 3.1476957384786926
案例二:Spark连接OSS
该案例使用Spark访问OSS,推荐使用阿里云的JindoSDK,详情请参见Spark使用JindoSDK在IDE开发调试。
因为JindoSDK暂时不支持Windows系统和Mac M1系列,所以该类系统如果想在本地测试,可以使用hadoop-aliyun
,具体操作如下。
在pom.xml中添加
hadoop-aliyun
相关的依赖。<!-- oss --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aliyun</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>com.aliyun.oss</groupId> <artifactId>aliyun-sdk-oss</artifactId> <version>3.15.2</version> </dependency>
创建测试用例
SparkOSS.scala
。import org.apache.spark.sql.SparkSession object SparkOSS { def main(args: Array[String]): Unit = { val sparkMaster = "local[4]" val ossAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID") val ossAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET") val ossEndpoint = "xxx" val inputPath = "oss://xxx" val outputPath = "oss://xxx" val spark = SparkSession.builder .appName("Spark OSS Word Count") .master(sparkMaster) .config("spark.hadoop.fs.oss.accessKeyId", ossAccessKeyId) .config("spark.hadoop.fs.oss.accessKeySecret", ossAccessKeySecret) .config("spark.hadoop.fs.oss.endpoint", ossEndpoint) .config("spark.hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem") .getOrCreate() val input = spark.sparkContext.textFile(inputPath, 2) input.flatMap(_.split(" ")) .map(x => (x, 1)) .reduceByKey(_ + _) .saveAsTextFile(outputPath) spark.stop() } }
说明运行代码示例前必须先配置环境变量。关于如何配置环境变量,请参见配置环境变量。
需要配置以下参数:
sparkMaster
:本地运行可配置local[4]
,集群运行时需配置成yarn-client
或yarn-cluster
。ossAccessKeyId
:访问OSS所需的AccessKey ID。ossAccessKeySecret
:访问OSS所需的AccessKey Secret。ossEndpoint
:访问OSS所需的Endpoint。inputPath
:wordCount输入文件。outputPath
:wordCount输出文件。
运行main,运行成功后,在OSS控制台查看outputPath下是否有输出文件。
案例三:Spark连接DLF
该案例介绍如何使用Spark连接阿里云DLF,读写数据库表。
在pom.xml中添加
metastore-client
相关的依赖。<!-- dlf --> <dependency> <groupId>com.aliyun.datalake</groupId> <artifactId>metastore-client-hive2</artifactId> <version>0.2.14</version> </dependency>
引入Hive相关依赖。
在集群如下位置下载该部分依赖的JAR包。
$SPARK_HOME/jars/hive-common-x.x.x.jar $SPARK_HOME/jars/hive-exec-x.x.x-core.jar
在IDEA的 页面,导入刚刚下载的JAR包。
创建测试用例
SparkDLF.scala
。import org.apache.spark.sql.SparkSession object SparkDLF { def main(args: Array[String]): Unit = { val sparkMaster = "local[4]" val dlfCatalogAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID") val dlfCatalogAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET") val dlfCatalogEndpoint = "xxx" val dlfCatalogId = "xxx" val warehouseDir = "/tmp/warehouse" val spark = SparkSession.builder() .appName("Spark DLF Example") .master(sparkMaster) .config("spark.hadoop.hive.imetastoreclient.factory.class", "com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory") .config("spark.hadoop.dlf.catalog.accessKeyId", dlfCatalogAccessKeyId) .config("spark.hadoop.dlf.catalog.accessKeySecret", dlfCatalogAccessKeySecret) .config("spark.hadoop.dlf.catalog.endpoint", dlfCatalogEndpoint) .config("spark.hadoop.dlf.catalog.id", dlfCatalogId) .config("spark.hadoop.hive.metastore.warehouse.dir", warehouseDir) .enableHiveSupport() .getOrCreate() import spark.sql // create database sql("create database if not exists test_db") // create table sql("create table test_db.test_tbl (key int, value string)") // insert sql("insert into test_db.test_tbl values (0, 'a')") // select sql("select * from test_db.test_tbl").show() // drop table sql("drop table test_db.test_tbl") // drop database sql("drop database test_db") spark.stop() } }
需要配置以下参数:
sparkMaster
:本地运行可配置local[4]
,集群运行时需配置成yarn-client
或yarn-cluster
。dlfCatalogAccessKeyId
:访问DLF所需的AccessKey ID。dlfCatalogAccessKeySecret
:访问DLF所需的AccessKey Secret。dlfCatalogEndpoint
:访问DLF所需的Endpoint。dlfCatalogId
:指定的DLF Catalog Id。warehouseDir
:测试数据库的地址。支持以下地址:本地:例如
/tmp/warehouse
。EMR HDFS:例如
hdfs://${clusterIP}:9000/xxx
,由于该方式要连接EMR集群内的HDFS,需通过SSL-VPN的方式连接VPC,详情请参见客户端远程连接VPC。OSS:例如
oss://
,配置参考案例二:Spark连接OSS。
运行main,运行成功后返回以下信息。
+---+-----+ |key|value| +---+-----+ | 0| a| +---+-----+