Spark RDD详解

RDDResilient Distributed Dataset)是Spark中的核心概念,它表示一个不可变的分布式数据集合。每个RDD被分成不同的部分(partition),这样就可以在集群(cluster)的不同节点(node)上进行并行计算。在Spark中,所有的工作都围绕RDD展开:创建RDD,转换现有的RDD,对RDD进行运算得到计算结果,等等。

创建RDD有两种方式:

a) 在驱动程序中加载对象集合。例如:

val lines = sc.parallelize(List("a", "b", "c"))

这种方式在实际中用的较少,因为它要求把整个数据集加载到一台机器的内存里。

b) 加载外部数据集。例如:

val lines = sc.textFile("/path/to/README.md")

这种方式使用较多。

RDD支持两种形式的运算(operation):“转换”(transformation)和“处理”(action)。

a)转换(transformation

“转换”是这样一种运算:作用在一个RDD上,返回一个新的RDD(例如:mapfilter操作)。“转换” 运算是“Lazy Evaluation”的:Spark不会马上执行运算,直到第一次使用新的RDD才开始。许多“转换” 运算是“element-wise”的,也即一次只操作一个元素(element),但这并不是对所有的“转换”运算都成立。

举个例子:

val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))

上面例子找到log.txt文件中包含error字段的行。filter()操作并不会改变已经存在的inputRDD, 而是返回一个指向新的RDD的指针:errorsRDD

通过“转换”运算可以不断地从现有RDD生成新的RDDSpark会记录不同RDD之间的依赖性,称之为“谱系图”(lineage graph)。 Spark可以利用这个信息根据需求计算RDD或者恢复丢失的数据。

b) 处理(action

“处理”运算会返回一个最终的值给驱动程序,或是写一些数据到外部的存储系统。“处理”运算会强制进行在RDD 上的“转换”运算,因为需要得到真正的计算结果。

举个例子:

println("Input had " + errorsRDD.count() + " concerning lines")

上面例子调用count()计算行数。

如果对一个运算是“转换”还是“处理”分不清的话,可以查看它的返回值。“转换”运算返回RDD类型,而“处理”运算返回其它类型。

持久化(PersistenceCaching

默认情况下,每次对RDD进行“处理”(action)运算时,Spark都会对RDD进行重新计算。为了避免每次都重新计算,可以对RDD进行持久化操作(调用RDD.persist())。第一次运算后,Spark会把RDD内容存在内存里(分布在集群的不同机器),以便将来复用。

总结一下,每个Spark程序或Spark shell会话工作流程如下:
a) 从外部数据创建输入RDD
b) 使用“转换”(transformation)运算从输入RDD生成新的RDD
c) 对需要重复使用的RDD进行持久化操作;
d) 对RDD进行并行运算。

参考资料:
《Learning Spark》