Spark是一个通用的大数据计算引擎。本文为您介绍如何在Zeppelin中使用Spark。

背景信息

Zeppelin支持Spark的4种主流语言,包括Scala、PySpark、R和SQL。Zeppelin中所有语言在同一个Spark Application里,即共享一个SparkContext和SparkSession。例如,您在Scala里注册的table和UDF是可以被其他语言使用的。

Spark解释器的基本架构如下图所示:Spark

Zeppelin支持Spark,您可以在Zeppelin里使用Spark的所有功能。

Scala(%spark)

%spark开头的就是Scala代码的段落(Paragraph)。因为Zeppelin已经为您内置了Scala的SparkContext (sc)和SparkSession(spark)变量,所以您无需再创建SparkContext或者SparkSession。

代码示例如下:
%spark

val sum = sc.range(1,10).sum()
println("Sum = " + sum)
  • Code Completion
    Zeppelin里的Scala Shell是支持Code Completion的,按Tab键即可显示当前环境下的候选方法名,如下图所示:spark_code
  • ZeppelinContext
    ZeppelinContext(变量名为z)是在Spark Scala Shell环境下创建的一个提供一些高级用法的Class。比较实用的方法是z.show。使用z.show展示DataFrame示例如下所示:Spark-ZeppelinContext

PySpark(%spark.pyspark)

%spark.pyspark开头的就是PySpark代码的段落(Paragraph)。因为Zeppelin已经为您内置了PySpark的SparkContext (sc)和SparkSession(spark)变量,所以您无需再创建SparkContext或者SparkSession。

代码示例如下:
%spark.pyspark

sum = sc.range(1,10).sum()
print("Sum = " + str(sum))

SparkR(%spark.r)

如果您需要使用SparkR,那么请确保您的EMR集群里安装了R语言以及knitr包(需要在每个NodeManager节点上安装,因为数据开发中默认配置的是Yarn-Cluster模式,Driver有可能运行在任意一个NodeManager节点上)。

安装命令如下:
  1. 执行如下命令安装R语言。
    sudo yum install epel-release
    sudo yum install R
  2. 在R里面安装knitr包。
    install.packages("knitr")
示例如下:
  1. 使用SparkR创建一个DataFrame,并注册为一张table。SparkR
  2. 使用Spark SQL查询注册的table。SparkSQL

SQL(%spark.sql)

%spark.sql开头的就是Spark SQL的段落(Paragraph)。您可以运行所有Spark支持的SQL语句,通过Zeppelin可视化展示,如下图所示:SQL
Zeppelin的Spark SQL解释器和其他Spark解释器(PySpark、SparkR和Spark解释器)共享SparkContext和SparkSession,即用其他Spark解释器注册的表也可以使用Spark SQL解释器进行访问。例如:
%spark

case class People(name: String, age: Int)
var df = spark.createDataFrame(List(People("jeff", 23), People("andy", 20)))
df.createOrReplaceTempView("people")
%spark.sql

select * from people

Spark SQL解释器还支持并行运行,即支持同时运行多个SQL。另外,由于Spark SQL本身的特性,Spark SQL Statement支持大多数Hive SQL语法。Spark集成Hive后,通常场景下,您可以使用Spark SQL解释器访问Hive表来进行更高效的分析计算,数据开发里的Spark解释器默认已经开启了Hive。

配置Spark

在阿里云EMR的数据开发里,Spark解释器配置的是Isolated Per Note模式,也就是说每个Note都有一个独立的Spark Application(每个Note对应一个Yarn App,Zeppelin里使用的是Yarn-Cluster模式)。对于每一个Spark Application,您可以在%spark.conf里配置所有Spark相关的配置选项,具体配置选项,请参见Spark Configuration

代码示例如下:Spark_demo
注意 因为Spark只支持在启动Spark Application之前配置属性,所以%spark.conf必须在运行Spark代码之前运行。
如果您在启动Spark Application之后想修改配置(例如,修改Driver Memory),则您必须重启当前Note的Spark解释器。重启步骤如下:
  1. 单击set图标。Interpreter
  2. 单击restart图标。restart
  3. 重新运行%spark.conf段落。
EMR数据开发中的Spark解释器默认配置如下:
  • Yarn-Cluster模式。
  • 启用Hive。
  • Driver内存1 GB,Executor内存1 GB。
  • 启用Dynamic Executor Allocation。

    初始化Executor 1个,最大Executor 10个,更多配置信息,请参见Spark Configuration

第三方依赖

Spark作业经常会有第三方依赖。标准的Spark通常使用以下3种配置:
  • spark.jars

    spark.jars可以用来指定JAR文件,多个JAR包可以用逗号(,)隔开。您可以把JAR包放在OSS上,也可以放在目标EMR集群的HDFS上。建议您放在OSS上,以便于您的JAR包可以被多个EMR集群共享,即使切换集群也不用更改配置。

    如下图示例,段落①使用%spark.conf指定OSS上的JAR,这个JAR里写了一个Java UDF,段落②使用PySpark注册Java UDF,然后在SQL中使用Java UDF。spark-jar1
  • spark.jars.packages

    spark.jars.packages可以用Package的形式来指定依赖包,Spark会动态下载这些Package到ClassPath里,多个Package以逗号(,)分隔。

    如下图示例,段落①指定delta包,段落②使用这个delta包。spark_jar
  • spark.files

    spark.files用来指定File文件,多个File文件以逗号(,)分隔。

内置教程

EMR数据开发集群自带了很多Spark教程,详细信息请在如下图页面查看。Spark-Demo

问题反馈

如果您在使用阿里云E-MapReduce过程中有任何疑问,欢迎您扫描下面的二维码加入钉钉群进行反馈。

emr_dingding