本文为您介绍如何使用Spark Shell,以及RDD的基础操作。
启动Spark Shell
Spark的Shell作为一个强大的交互式数据分析工具,提供了一个简单的方式学习API。 Spark既可以使用Scala,也可以使用Python。
您可以按照以下操作步骤来启动Spark Shell。
使用SSH方式登录集群的Master节点,详情请参见登录集群。
执行以下命令,启动Spark Shell。
spark-shell在Spark Shell中,已经在名为sc的变量中为您创建了一个特殊的SparkContext,如果您自己创建SparkContext会不生效。您可以使用
--master参数设置SparkContext连接到哪个主节点,并且可以通过--jars参数来设置添加到CLASSPATH的JAR包,多个JAR包时使用逗号(,)分隔。更多参数信息,您可以通过命令spark-shell --help获取。
RDD基础操作
Spark围绕着弹性分布式数据集(RDD)的概念展开,RDD是可以并行操作的元素的容错集合。Spark支持通过集合来创建RDD和通过外部数据集构建RDD两种方式来创建RDD。例如,共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据集。
创建RDD示例:
通过集合来创建RDD
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)通过外部数据集构建RDD
val distFile = sc.textFile("data.txt")
通常,Spark RDD的常用操作有两种,分别为Transformation操作和Action操作。Transformation操作并不会立即执行,而是到了Action操作才会被执行。
Transformation操作
操作
描述
map()参数是函数,函数应用于RDD每一个元素,返回值是新的RDD。
flatMap()参数是函数,函数应用于RDD每一个元素,拆分元素数据,变成迭代器,返回值是新的RDD。
filter()参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD。
distinct()没有参数,将RDD里的元素进行去重操作。
union()参数是RDD,生成包含两个RDD所有元素的新RDD。
intersection()参数是RDD,求出两个RDD的共同元素。
subtract()参数是RDD,去掉原RDD里和参数RDD里相同的元素。
cartesian()参数是RDD,求两个RDD的笛卡尔积。
Action操作
操作
描述
collect()返回RDD所有元素。
count()返回RDD中的元素个数。
countByValue()返回各元素在RDD中出现的次数。
reduce()并行整合所有RDD数据,例如求和操作。
fold(0)(func)和
reduce()功能一样,但是fold带有初始值。aggregate(0)(seqOp,combop)和
reduce()功能一样,但是返回的RDD数据类型和原RDD不一样。foreach(func)对RDD每个元素都是使用特定函数。
Spark Shell入门示例
先从外部存储系统读一个文本文件构造了一个RDD,然后通过RDD的Map算子计算得到文本文件中每一行的长度,最后通过Reduce算子计算得到了文本文件中各行长度之和。
使用SSH方式登录集群的Master节点,详情请参见登录集群。
创建data.txt文件,并将其上传到HDFS。
本文示例中data.txt文件的内容所示。
Hello Spark This is a test file 1234567890您可以通过以下命令将data.txt上传到HDFS。
hadoop fs -put data.txt /user/root/启动Spark Shell。
spark-shell执行以下命令,统计data.txt文件中所有行的字符长度总和。
val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b)