Spark本地调试环境搭建

本文为您介绍如何在本地搭建Spark的调试环境(以IDEA为例)。

背景信息

本文示例使用的工具如下所示:

  • Maven 3.8.6

  • Java 8

  • IntelliJ IDEA

环境准备

  1. 在IDEA中通过File > New > Project,新建一个Maven项目。

  2. pom.xml中添加Spark的相关依赖,下面以Spark 3.3.0为例。

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <spark.version>3.3.0</spark.version>
    </properties>
    
    <dependencies>
        <!-- spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

案例

下面为您介绍一些具体案例。

案例一:SparkPi

该案例介绍如何使用Spark计算Pi的粗略值。

  1. 创建测试用例SparkPi.scala

    import org.apache.spark.sql.SparkSession
    import scala.math.random
    
    object SparkPi {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder
          .appName("Spark Pi")
          .master("local[4]")
          .getOrCreate()
    
        val slices = if (args.length > 0) args(0).toInt else 2
        val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    
        val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
          val x = random * 2 - 1
          val y = random * 2 - 1
          if (x * x + y * y <= 1) 1 else 0
        }.reduce(_ + _)
    
        println(s"Pi is roughly ${4.0 * count / (n - 1)}")
    
        spark.stop()
      }
    }
  2. 运行main,运行成功后,返回以下信息。

    Pi is roughly 3.1476957384786926

案例二:Spark连接OSS

该案例使用Spark访问OSS,推荐使用阿里云的JindoSDK,详情请参见Spark使用JindoSDK在IDE开发调试

因为JindoSDK暂时不支持Windows系统和Mac M1系列,所以该类系统如果想在本地测试,可以使用hadoop-aliyun,具体操作如下。

  1. pom.xml中添加hadoop-aliyun相关的依赖。

    <!-- oss -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.2</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aliyun</artifactId>
        <version>3.3.2</version>
    </dependency>
    
    <dependency>
        <groupId>com.aliyun.oss</groupId>
        <artifactId>aliyun-sdk-oss</artifactId>
        <version>3.15.2</version>
    </dependency>
  2. 创建测试用例SparkOSS.scala

    import org.apache.spark.sql.SparkSession
    
    object SparkOSS {
      def main(args: Array[String]): Unit = {
    
        val sparkMaster = "local[4]"
    
        val ossAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
        val ossAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
        val ossEndpoint = "xxx"
    
        val inputPath = "oss://xxx"
        val outputPath = "oss://xxx"
    
        val spark = SparkSession.builder
          .appName("Spark OSS Word Count")
          .master(sparkMaster)
          .config("spark.hadoop.fs.oss.accessKeyId", ossAccessKeyId)
          .config("spark.hadoop.fs.oss.accessKeySecret", ossAccessKeySecret)
          .config("spark.hadoop.fs.oss.endpoint", ossEndpoint)
          .config("spark.hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")
          .getOrCreate()
    
        val input = spark.sparkContext.textFile(inputPath, 2)
    
        input.flatMap(_.split(" "))
          .map(x => (x, 1))
          .reduceByKey(_ + _)
          .saveAsTextFile(outputPath)
    
        spark.stop()
      }
    }          
    说明

    运行代码示例前必须先配置环境变量。关于如何配置环境变量,请参见配置环境变量

    需要配置以下参数:

    • sparkMaster:本地运行可配置local[4],集群运行时需配置成yarn-clientyarn-cluster

    • ossAccessKeyId:访问OSS所需的AccessKey ID。

    • ossAccessKeySecret:访问OSS所需的AccessKey Secret。

    • ossEndpoint:访问OSS所需的Endpoint。

    • inputPath:wordCount输入文件。

    • outputPath:wordCount输出文件。

  3. 运行main,运行成功后,在OSS控制台查看outputPath下是否有输出文件。

    outputPath

案例三:Spark连接DLF

该案例介绍如何使用Spark连接阿里云DLF,读写数据库表。

  1. pom.xml中添加metastore-client相关的依赖。

    <!-- dlf -->
    <dependency>
      <groupId>com.aliyun.datalake</groupId>
      <artifactId>metastore-client-hive2</artifactId>
      <version>0.2.14</version>
    </dependency>
  2. 引入Hive相关依赖。

    1. 在集群如下位置下载该部分依赖的JAR包。

      $SPARK_HOME/jars/hive-common-x.x.x.jar
      $SPARK_HOME/jars/hive-exec-x.x.x-core.jar
    2. 在IDEA的File > Project Structure > Modules页面,导入刚刚下载的JAR包。

      JAR
  3. 创建测试用例SparkDLF.scala

    import org.apache.spark.sql.SparkSession
    
    object SparkDLF {
      def main(args: Array[String]): Unit = {
    
        val sparkMaster = "local[4]"
    
        val dlfCatalogAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
        val dlfCatalogAccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
        val dlfCatalogEndpoint = "xxx"
        val dlfCatalogId = "xxx"
    
        val warehouseDir = "/tmp/warehouse"
    
        val spark = SparkSession.builder()
          .appName("Spark DLF Example")
          .master(sparkMaster)
          .config("spark.hadoop.hive.imetastoreclient.factory.class", "com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory")
          .config("spark.hadoop.dlf.catalog.accessKeyId", dlfCatalogAccessKeyId)
          .config("spark.hadoop.dlf.catalog.accessKeySecret", dlfCatalogAccessKeySecret)
          .config("spark.hadoop.dlf.catalog.endpoint", dlfCatalogEndpoint)
          .config("spark.hadoop.dlf.catalog.id", dlfCatalogId)
          .config("spark.hadoop.hive.metastore.warehouse.dir", warehouseDir)
          .enableHiveSupport()
          .getOrCreate()
    
        import spark.sql
    
        // create database
        sql("create database if not exists test_db")
    
        // create table
        sql("create table test_db.test_tbl (key int, value string)")
    
        // insert
        sql("insert into test_db.test_tbl values (0, 'a')")
    
        // select
        sql("select * from test_db.test_tbl").show()
    
        // drop table
        sql("drop table test_db.test_tbl")
    
        // drop database
        sql("drop database test_db")
    
        spark.stop()
      }
    }

    需要配置以下参数:

    • sparkMaster:本地运行可配置local[4],集群运行时需配置成yarn-clientyarn-cluster

    • dlfCatalogAccessKeyId:访问DLF所需的AccessKey ID。

    • dlfCatalogAccessKeySecret:访问DLF所需的AccessKey Secret。

    • dlfCatalogEndpoint:访问DLF所需的Endpoint。

    • dlfCatalogId:指定的DLF Catalog Id。

    • warehouseDir:测试数据库的地址。支持以下地址:

      • 本地:例如/tmp/warehouse

      • EMR HDFS:例如hdfs://${clusterIP}:9000/xxx,由于该方式要连接EMR集群内的HDFS,需通过SSL-VPN的方式连接VPC,详情请参见客户端远程连接VPC

      • OSS:例如oss://,配置参考案例二:Spark连接OSS

  4. 运行main,运行成功后返回以下信息。

    +---+-----+
    |key|value|
    +---+-----+
    |  0|    a|
    +---+-----+