Spark是一个通用的大数据计算引擎。本文为您介绍如何在Zeppelin中使用Spark。
背景信息
Zeppelin支持Spark的4种主流语言,包括Scala、PySpark、R和SQL。Zeppelin中所有语言在同一个Spark Application里,即共享一个SparkContext和SparkSession。例如,您在Scala里注册的table和UDF是可以被其他语言使用的。
Spark解释器的基本架构如下图所示:
Zeppelin支持Spark,您可以在Zeppelin里使用Spark的所有功能。
Scala(%spark)
以%spark开头的就是Scala代码的段落(Paragraph)。因为Zeppelin已经为您内置了Scala的SparkContext (sc)和SparkSession(spark)变量,所以您无需再创建SparkContext或者SparkSession。
代码示例如下:
%spark
val sum = sc.range(1,10).sum()
println("Sum = " + sum)
Code Completion
Zeppelin里的Scala Shell是支持Code Completion的,按Tab键即可显示当前环境下的候选方法名,如下图所示:
ZeppelinContext
ZeppelinContext(变量名为z)是在Spark Scala Shell环境下创建的一个提供一些高级用法的Class。比较实用的方法是z.show。使用z.show展示DataFrame示例如下所示:
PySpark(%spark.pyspark)
以%spark.pyspark开头的就是PySpark代码的段落(Paragraph)。因为Zeppelin已经为您内置了PySpark的SparkContext (sc)和SparkSession(spark)变量,所以您无需再创建SparkContext或者SparkSession。
代码示例如下:
%spark.pyspark
sum = sc.range(1,10).sum()
print("Sum = " + str(sum))
SparkR(%spark.r)
如果您需要使用SparkR,那么请确保您的EMR集群里安装了R语言以及knitr包(需要在每个NodeManager节点上安装,因为数据开发中默认配置的是Yarn-Cluster模式,Driver有可能运行在任意一个NodeManager节点上)。
安装命令如下:
执行如下命令安装R语言。
sudo yum install epel-release sudo yum install R
在R里面安装knitr包。
install.packages("knitr")
示例如下:
使用SparkR创建一个DataFrame,并注册为一张table。
使用Spark SQL查询注册的table。
SQL(%spark.sql)
以%spark.sql开头的就是Spark SQL的段落(Paragraph)。您可以运行所有Spark支持的SQL语句,通过Zeppelin可视化展示,如下图所示:
Zeppelin的Spark SQL解释器和其他Spark解释器(PySpark、SparkR和Spark解释器)共享SparkContext和SparkSession,即用其他Spark解释器注册的表也可以使用Spark SQL解释器进行访问。例如:
%spark
case class People(name: String, age: Int)
var df = spark.createDataFrame(List(People("jeff", 23), People("andy", 20)))
df.createOrReplaceTempView("people")
%spark.sql
select * from people
Spark SQL解释器还支持并行运行,即支持同时运行多个SQL。另外,由于Spark SQL本身的特性,Spark SQL Statement支持大多数Hive SQL语法。Spark集成Hive后,通常场景下,您可以使用Spark SQL解释器访问Hive表来进行更高效的分析计算,数据开发里的Spark解释器默认已经开启了Hive。
配置Spark
在阿里云EMR的数据开发里,Spark解释器配置的是Isolated Per Note模式,也就是说每个Note都有一个独立的Spark Application(每个Note对应一个Yarn App,Zeppelin里使用的是Yarn-Cluster模式)。对于每一个Spark Application,您可以在%spark.conf里配置所有Spark相关的配置选项,具体配置选项,请参见Spark Configuration。
代码示例如下:
因为Spark只支持在启动Spark Application之前配置属性,所以%spark.conf必须在运行Spark代码之前运行。
如果您在启动Spark Application之后想修改配置(例如,修改Driver Memory),则您必须重启当前Note的Spark解释器。重启步骤如下:
单击图标。
单击图标。
重新运行%spark.conf段落。
EMR数据开发中的Spark解释器默认配置如下:
Yarn-Cluster模式。
启用Hive。
Driver内存1 GB,Executor内存1 GB。
启用Dynamic Executor Allocation。
初始化Executor 1个,最大Executor 10个,更多配置信息,请参见Spark Configuration。
第三方依赖
Spark作业经常会有第三方依赖。标准的Spark通常使用以下3种配置:
spark.jars
spark.jars可以用来指定JAR文件,多个JAR包可以用逗号(,)隔开。您可以把JAR包放在OSS上,也可以放在目标EMR集群的HDFS上。建议您放在OSS上,以便于您的JAR包可以被多个EMR集群共享,即使切换集群也不用更改配置。
如下图示例,段落①使用%spark.conf指定OSS上的JAR,这个JAR里写了一个Java UDF,段落②使用PySpark注册Java UDF,然后在SQL中使用Java UDF。
spark.jars.packages
spark.jars.packages可以用Package的形式来指定依赖包,Spark会动态下载这些Package到ClassPath里,多个Package以逗号(,)分隔。
如下图示例,段落①指定delta包,段落②使用这个delta包。
spark.files
spark.files用来指定File文件,多个File文件以逗号(,)分隔。
内置教程
EMR数据开发集群自带了很多Spark教程,详细信息请在如下图页面查看。