RDD
(Resilient Distributed Dataset
)是Spark
中的核心概念,它表示一个不可变的分布式数据集合。每个RDD
被分成不同的部分(partition
),这样就可以在集群(cluster
)的不同节点(node
)上进行并行计算。在Spark
中,所有的工作都围绕RDD
展开:创建RDD
,转换现有的RDD
,对RDD
进行运算得到计算结果,等等。
创建RDD
有两种方式:
a) 在驱动程序中加载对象集合。例如:
val lines = sc.parallelize(List("a", "b", "c"))
这种方式在实际中用的较少,因为它要求把整个数据集加载到一台机器的内存里。
b) 加载外部数据集。例如:
val lines = sc.textFile("/path/to/README.md")
这种方式使用较多。
RDD
支持两种形式的运算(operation
):“转换”(transformation
)和“处理”(action
)。
a)转换(transformation
)
“转换”是这样一种运算:作用在一个RDD
上,返回一个新的RDD
(例如:map
和filter
操作)。“转换” 运算是“Lazy Evaluation
”的:Spark
不会马上执行运算,直到第一次使用新的RDD
才开始。许多“转换” 运算是“element-wise
”的,也即一次只操作一个元素(element
),但这并不是对所有的“转换”运算都成立。
举个例子:
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
上面例子找到log.txt
文件中包含error
字段的行。filter()
操作并不会改变已经存在的inputRDD
, 而是返回一个指向新的RDD
的指针:errorsRDD
。
通过“转换”运算可以不断地从现有RDD
生成新的RDD
,Spark
会记录不同RDD
之间的依赖性,称之为“谱系图”(lineage graph
)。 Spark
可以利用这个信息根据需求计算RDD
或者恢复丢失的数据。
b) 处理(action
)
“处理”运算会返回一个最终的值给驱动程序,或是写一些数据到外部的存储系统。“处理”运算会强制进行在RDD
上的“转换”运算,因为需要得到真正的计算结果。
举个例子:
println("Input had " + errorsRDD.count() + " concerning lines")
上面例子调用count()
计算行数。
如果对一个运算是“转换”还是“处理”分不清的话,可以查看它的返回值。“转换”运算返回RDD
类型,而“处理”运算返回其它类型。
持久化(Persistence
,Caching
)
默认情况下,每次对RDD
进行“处理”(action
)运算时,Spark
都会对RDD
进行重新计算。为了避免每次都重新计算,可以对RDD
进行持久化操作(调用RDD.persist()
)。第一次运算后,Spark
会把RDD
内容存在内存里(分布在集群的不同机器),以便将来复用。
总结一下,每个Spark
程序或Spark shell
会话工作流程如下:
a) 从外部数据创建输入RDD
;
b) 使用“转换”(transformation
)运算从输入RDD
生成新的RDD
;
c) 对需要重复使用的RDD
进行持久化操作;
d) 对RDD
进行并行运算。
参考资料:
《Learning Spark》。