本文为您介绍如何使用Spark Shell,以及RDD的基础操作。
启动Spark Shell
Spark的Shell作为一个强大的交互式数据分析工具,提供了一个简单的方式学习API。 Spark既可以使用Scala,也可以使用Python。
您可以按照以下操作步骤来启动Spark Shell。
RDD基础操作
Spark围绕着弹性分布式数据集(RDD)的概念展开,RDD是可以并行操作的元素的容错集合。Spark支持通过集合来创建RDD和通过外部数据集构建RDD两种方式来创建RDD。例如,共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据集。
创建RDD示例:
- 通过集合来创建RDD
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)
- 通过外部数据集构建RDD
val distFile = sc.textFile("data.txt")
RDD构建成功后,您可以对其进行一系列操作,例如Map和Reduce等操作。
例如,运行以下代码,首先从外部存储系统读一个文本文件构造了一个RDD,然后通过RDD的Map算子计算得到文本文件中每一行的长度,最后通过Reduce算子计算得到了文本文件中各行长度之和。
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
通常,Spark RDD的常用操作有两种,分别为Transform操作和Action操作。Transform操作并不会立即执行,而是到了Action操作才会被执行。
- Transform操作
操作 描述 map()
参数是函数,函数应用于RDD每一个元素,返回值是新的RDD。 flatMap()
参数是函数,函数应用于RDD每一个元素,拆分元素数据,变成迭代器,返回值是新的RDD。 filter()
参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD。 distinct()
没有参数,将RDD里的元素进行去重操作。 union()
参数是RDD,生成包含两个RDD所有元素的新RDD。 intersection()
参数是RDD,求出两个RDD的共同元素。 subtract()
参数是RDD,去掉原RDD里和参数RDD里相同的元素。 cartesian()
参数是RDD,求两个RDD的笛卡尔积。 - Action操作
操作 描述 collect()
返回RDD所有元素。 count()
返回RDD中的元素个数。 countByValue()
返回各元素在RDD中出现的次数。 reduce()
并行整合所有RDD数据,例如求和操作。 fold(0)(func)
和 reduce()
功能一样,但是fold带有初始值。aggregate(0)(seqOp,combop)
和 reduce()
功能一样,但是返回的RDD数据类型和原RDD不一样。foreach(func)
对RDD每个元素都是使用特定函数。