本文为您介绍如何使用Spark Shell,以及RDD的基础操作。

启动Spark Shell

Spark的Shell作为一个强大的交互式数据分析工具,提供了一个简单的方式学习API。 Spark既可以使用Scala,也可以使用Python。

您可以按照以下操作步骤来启动Spark Shell。

  1. 通过SSH方式连接集群,详情请参见登录集群
  2. 执行以下命令,启动Spark Shell。
    spark-shell

    在Spark Shell中,已经在名为sc的变量中为您创建了一个特殊的SparkContext,如果您自己创建SparkContext会不生效。您可以使用--master参数设置SparkContext连接到哪个主节点,并且可以通过--jars参数来设置添加到CLASSPATH的JAR包,多个JAR包时使用逗号(,)分隔。更多参数信息,您可以通过命令./bin/spark-shell --help获取。

RDD基础操作

Spark围绕着弹性分布式数据集(RDD)的概念展开,RDD是可以并行操作的元素的容错集合。Spark支持通过集合来创建RDD和通过外部数据集构建RDD两种方式来创建RDD。例如,共享文件系统、HDFS、HBase或任何提供Hadoop InputFormat的数据集。

创建RDD示例:
  • 通过集合来创建RDD
    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)
  • 通过外部数据集构建RDD
    val distFile = sc.textFile("data.txt")

RDD构建成功后,您可以对其进行一系列操作,例如Map和Reduce等操作。

例如,运行以下代码,首先从外部存储系统读一个文本文件构造了一个RDD,然后通过RDD的Map算子计算得到文本文件中每一行的长度,最后通过Reduce算子计算得到了文本文件中各行长度之和。
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
通常,Spark RDD的常用操作有两种,分别为Transform操作和Action操作。Transform操作并不会立即执行,而是到了Action操作才会被执行。
  • Transform操作
    操作 描述
    map() 参数是函数,函数应用于RDD每一个元素,返回值是新的RDD。
    flatMap() 参数是函数,函数应用于RDD每一个元素,拆分元素数据,变成迭代器,返回值是新的RDD。
    filter() 参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD。
    distinct() 没有参数,将RDD里的元素进行去重操作。
    union() 参数是RDD,生成包含两个RDD所有元素的新RDD。
    intersection() 参数是RDD,求出两个RDD的共同元素。
    subtract() 参数是RDD,去掉原RDD里和参数RDD里相同的元素。
    cartesian() 参数是RDD,求两个RDD的笛卡尔积。
  • Action操作
    操作 描述
    collect() 返回RDD所有元素。
    count() 返回RDD中的元素个数。
    countByValue() 返回各元素在RDD中出现的次数。
    reduce() 并行整合所有RDD数据,例如求和操作。
    fold(0)(func) reduce()功能一样,但是fold带有初始值。
    aggregate(0)(seqOp,combop) reduce()功能一样,但是返回的RDD数据类型和原RDD不一样。
    foreach(func) 对RDD每个元素都是使用特定函数。