Spark常见转换(transformation)运算简介

本文介绍的转换(transformation)运算适用于所有类型的RDD

Element-wise转换:

Spark中,最常用的的转换运算是map()filter()map()运算会对RDD中的每个元素进行函数运算,而filter()运算只会作用于RDD中符合过滤条件的元素,运算结果生成一个新的RDD。看下面这个例子:

spark_map_filter

需要注意的是map()运算的结果类型不一定要同输入类型一致。

有时需要从一个输入元素生成多个输出元素,这时可以使用flatMap()运算。同map()一样,flatMap()
也会对输入RDD中的每个元素进行运算,但运算结果是一个包含返回值的迭代器(iterator)。最终产生的RDD会包含所有迭代器中的元素。但要注意的是,这个RDD的元素数据类型并不是迭代器,而是迭代器所包含的元素的数据类型。flatMap()map()的区别可以用下图来理解

spark_flatmap_map

可以认为flat的含义就是“铲平”迭代器,把里面的元素“拿”出来,生成一个新的RDD

 

伪集合运算(Pseudo set operations):

RDD支持很多类似数学里的集合运算:取“交集”,“并集”等等。需要注意的是,进行运算的这些RDD必须具有相同的数据类型。下图展示了一些运算:

rdd_set_operation

两个RDD之间还可以进行笛卡尔积运算:

Catersian

Spark RDD详解

RDDResilient Distributed Dataset)是Spark中的核心概念,它表示一个不可变的分布式数据集合。每个RDD被分成不同的部分(partition),这样就可以在集群(cluster)的不同节点(node)上进行并行计算。在Spark中,所有的工作都围绕RDD展开:创建RDD,转换现有的RDD,对RDD进行运算得到计算结果,等等。

创建RDD有两种方式:

a) 在驱动程序中加载对象集合。例如:

val lines = sc.parallelize(List("a", "b", "c"))

这种方式在实际中用的较少,因为它要求把整个数据集加载到一台机器的内存里。

b) 加载外部数据集。例如:

val lines = sc.textFile("/path/to/README.md")

这种方式使用较多。

RDD支持两种形式的运算(operation):“转换”(transformation)和“处理”(action)。

a)转换(transformation

“转换”是这样一种运算:作用在一个RDD上,返回一个新的RDD(例如:mapfilter操作)。“转换” 运算是“Lazy Evaluation”的:Spark不会马上执行运算,直到第一次使用新的RDD才开始。许多“转换” 运算是“element-wise”的,也即一次只操作一个元素(element),但这并不是对所有的“转换”运算都成立。

举个例子:

val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))

上面例子找到log.txt文件中包含error字段的行。filter()操作并不会改变已经存在的inputRDD, 而是返回一个指向新的RDD的指针:errorsRDD

通过“转换”运算可以不断地从现有RDD生成新的RDDSpark会记录不同RDD之间的依赖性,称之为“谱系图”(lineage graph)。 Spark可以利用这个信息根据需求计算RDD或者恢复丢失的数据。

b) 处理(action

“处理”运算会返回一个最终的值给驱动程序,或是写一些数据到外部的存储系统。“处理”运算会强制进行在RDD 上的“转换”运算,因为需要得到真正的计算结果。

举个例子:

println("Input had " + errorsRDD.count() + " concerning lines")

上面例子调用count()计算行数。

如果对一个运算是“转换”还是“处理”分不清的话,可以查看它的返回值。“转换”运算返回RDD类型,而“处理”运算返回其它类型。

持久化(PersistenceCaching

默认情况下,每次对RDD进行“处理”(action)运算时,Spark都会对RDD进行重新计算。为了避免每次都重新计算,可以对RDD进行持久化操作(调用RDD.persist())。第一次运算后,Spark会把RDD内容存在内存里(分布在集群的不同机器),以便将来复用。

总结一下,每个Spark程序或Spark shell会话工作流程如下:
a) 从外部数据创建输入RDD
b) 使用“转换”(transformation)运算从输入RDD生成新的RDD
c) 对需要重复使用的RDD进行持久化操作;
d) 对RDD进行并行运算。

参考资料:
《Learning Spark》

Spark应用浅析

每个Spark应用(application)都包含一个用来在集群(cluster)上启动各种并行操作的驱动程序(driver program)。驱动程序不仅包含了Spark应用的main函数,还定义了集群上的分布式数据集(distributed dataset),而且会对这些数据集进行各种操作。在你使用Spark Shell进行交互操作时:

[root@Fedora bin]# ./spark-shell
scala> sc.parallelize(1 to 1000).count()
res0: Long = 1000

Spark Shell自身就是驱动程序。

驱动程序通过一个SparkContext对象来访问Spark,这个对象代表一个针对计算集群(computing cluster)的连接。Spark Shell会自动产生一个叫scSparkContext对象,所以在上面的例子中可以直接使用sc。在Spark Shell中输入sc,可以看到它的类型信息:

scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@45707f76

有了SparkContext对象之后,就可以用它来产生数据集,然后在数据集上进行各种操作。而为了进行这些操作,驱动程序要管理一群被称作“executor”的节点(node)。下图描述了Spark如何在集群上执行任务:spark_cluster

 

参考资料:
《Learning Spark》

Spark结构初探

Apache Spark(以下简称Spark)是一个快速的,通用的集群计算平台(Apache Spark is a cluster computing platform designed to be fast and general-purpose),它由多个紧密结合的构件组成:

spark_stack

 

Spark Core包含Spark的最基本的功能:任务调度,内存管理,故障恢复,存储系统的交互,等等。

Spark SQL是用来处理结构化数据的程序包。它不仅允许使用SQLHQLHive Query Language)来查询数据,并且支持多种数据源:Hive tablesParquetJSON

Spark Streaming用来处理实时的数据流。

MLib是一个提供了很多机器学习算法的库。

GraphX是一个提供操作图表以及对图表进行并行计算的库。

Spark除了自带了一个简单的Cluster ManagerStandalone Scheduler以外,也支持Hadoop YARNApache Mesos

Spark可以把存储在Hadoop Distributed File SystemHDFS)或其它支持Hadoop API的存储系统(包含你本地文件系统,Amazon S3CassandraHiveHBase等)上的文件转化成分布式数据集(distributed datasets)。要注意,Hadoop对于Spark来说不是必不可少的,只要存储系统实现Hadoop API即可。

参考资料:
《Learning Spark》

利用Spark API写一个单独的程序

本文参考Spark网站的Self-Contained Applications一节,使用Scala语言开发一个单独的小程序。

(1)首先安装sbt,参考官方文档。我使用的是RPM包格式:

curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
sudo yum install sbt

(2)接下来在/home文件夹下建立一个SparkApp的文件夹,文件夹布局如下:

bash-4.1# find /home/SparkApp/
/home/SparkApp/
/home/SparkApp/simple.sbt
/home/SparkApp/src
/home/SparkApp/src/main
/home/SparkApp/src/main/scala
/home/SparkApp/src/main/scala/SimpleApp.scala

其中simple.sbt文件内容如下所示:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0"

SimpleApp.scala程序如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "file:///usr/local/spark/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

(3)执行sbt package命令打包jar文件:

bash-4.1# sbt package
......
[success] Total time: 89 s, completed May 25, 2015 10:16:51 PM

(4)调用spark-submit脚本执行程序:

bash-4.1# /usr/local/spark/bin/spark-submit --class "SimpleApp" --master local[4] target/scala-2.10/simple-project_2.10-1.0.jar
......
Lines with a: 60, Lines with b: 29

可以看到,输出正确结果。

搭建Spark开发环境

本文使用docker搭建Spark环境,使用的image文件是sequenceiq提供的1.3.0版本

首先pull Spark image文件:

docker pull sequenceiq/spark:1.3.0

pull成功后,运行Spark

docker run -i -t -h sandbox sequenceiq/spark:1.3.0 bash

测试Spark是否工作正常:

bash-4.1# spark-shell --master yarn-client --driver-memory 1g --executor-memory 1g --executor-cores 1
......
scala> sc.parallelize(1 to 1000).count()
......
res0: Long = 1000

输出1000,OK!

(1)启动spark-shell,输出log很多,解决方法如下:
a)把/usr/local/spark/conf文件夹下的log4j.properties.template文件复制生成一份log4j.properties文件:

bash-4.1# cd /usr/local/spark/conf
bash-4.1# cp log4j.properties.template log4j.properties

b)把log4j.properties文件里的“log4j.rootCategory=INFO, console”改成“log4j.rootCategory=WARN, console”即可。

(2)启动spark-shell会有以下warning

15/05/25 04:49:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

提示找不到hadoop的库文件,解决办法如下:

export LD_LIBRARY_PATH=/usr/local/hadoop/lib/native/:$LD_LIBRARY_PATH

请参考stackoverflow的相关讨论:
a)Hadoop “Unable to load native-hadoop library for your platform” error on CentOS
b)Hadoop “Unable to load native-hadoop library for your platform” error on docker-spark?

(3)在Quick Start中提到如下例子:

scala> val textFile = sc.textFile("README.md")
......
scala> textFile.count() // Number of items in this RDD

执行会有错误:

scala> textFile.count()
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://sandbox:9000/user/root/README.md
        at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

可以看到程序尝试从hdfs中寻找文件,所以报错。

解决方法有两种:
a) 指定本地文件系统:

scala> val textFile = sc.textFile("file:///usr/local/spark/README.md")
textFile: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/README.md MapPartitionsRDD[3] at textFile at <console>:21

scala> textFile.count()
res1: Long = 98

b)上传文件到hdfs上:

bash-4.1# hadoop fs -put /usr/local/spark/README.md README.md

接着运行spark-shell:

bash-4.1# spark-shell --master yarn-client --driver-memory 1g --executor-memory 1g --executor-cores 1
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.3.0
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_51)
Type in expressions to have them evaluated.
Type :help for more information.
15/05/25 05:22:15 WARN Client: SPARK_JAR detected in the system environment. This variable has been deprecated in favor of the spark.yarn.jar configuration variable.
15/05/25 05:22:15 WARN Client: SPARK_JAR detected in the system environment. This variable has been deprecated in favor of the spark.yarn.jar configuration variable.
Spark context available as sc.
SQL context available as sqlContext.

scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:21

scala> textFile.count()
res0: Long = 98

参考邮件:
Spark Quick Start – call to open README.md needs explicit fs prefix

P.S.在主机(非docker环境)下载sparkhttps://spark.apache.org/downloads.html)运行时,会有以下warning

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

解决办法是把/path/to/spark/conf文件夹下的log4j.properties.template文件复制生成一份log4j.properties文件即可。

参考stackoverflow的讨论:
log4j:WARN No appenders could be found for logger (running jar file, not web app)