Apache Spark 是一个快速通用的集群计算系统,它提供了提供了java,scala,python和R的高级API,以及一个支持一般图计算的优化引擎。它同样也一系列丰富的高级工具包括:Spark sql 用于sql和结构化数据处理,MLlib用于机器学习,Graphx用于图数据处理,以及Spark Streaming用于流数据处理。
2,快速入门本教程对使用spark进行简单介绍。首先我们会通过spark的交互式 shell工具介绍Python和scalade API,然后展示如何通过java,scala和Python编写一个spark应用程序。
为了方便参照该指南进行学习。请先到 Spark 网站 下载一个 Spark 发布包。由于我们暂时还不会用到 HDFS,所以你可以下载对应任意 Hadoop 版本的 Spark 发布包。
Spark 2.0 版本之前, Spark 的核心编程接口是d性分布式数据集(RDD)。Spark 2.0 版本之后, RDD 被 Dataset 所取代, Dataset 跟 RDD 一样也是强类型的, 但是底层做了更多的优化。Spark 目前仍然支持 RDD 接口, 你可以在 RDD 编程指南 页面获得更完整的参考,但是我们强烈建议你转而使用比 RDD 有着更好性能的 Dataset。想了解关于 Dataset 的更多信息请参考 Spark SQL, Dataframe 和 Dataset 编程指南。
Spark shell 提供了一个简单的方式去学习API,同时它也是一个强大的分布式分析工具。park Shell 既支持 Scala(Scala 运行在 Java 虚拟机上,所以可以很方便的引用现有的 Java 库)也支持 Python。
Scala:
到spark安装目录bin下找到spark-shell :
./bin/spark-shell
**注意:**Python方式略,在之后的介绍中,只展示scala方式,python方式,可见官网详情:
Spark 最主要的抽象概念就是一个叫做 Dataset 的分布式数据集。Dataset 可以从 Hadoop InputFormats(例如 HDFS 文件)创建或者由其他 Dataset 转换而来。下面我们利用 Spark 源码目录下 README 文件中的文本来新建一个 Dataset:
scala> val textFile = spark.read.textFile("README.md") textFile: org.apache.spark.sql.Dataset[String] = [value: string]
你可以直接调用Action算子从Dataset中获取数据,或者转换该 Dataset 以获取一个新的 Dataset。更多细节请参阅 API 文档 。
scala> textFile.count() // Number of items in this Dataset res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs scala> textFile.first() // First item in this Dataset res1: String = # Apache Spark
现在我们将该 Dataset 转换成一个新的 Dataset。我们调用 filter 这个 transformation 算子返回一个只包含原始文件数据项子集的新 Dataset。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
我们可以将 transformation 算子和 action 算子连在一起:
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 152.2更多关于DataSet的 *** 作
Dataset action 和 transformation 算子可以用于更加复杂的计算。比方说我们想要找到文件中包含单词数最多的行。
Scala scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15
首先,使用 map 算子将每一行映射为一个整数值,创建了一个新的 Dataset。然后在该 Dataset 上调用 reduce 算子找出最大的单词计数。map 和 reduce 算子的参数都是 cala 函数字面量(闭包),并且可以使用任意语言特性或 Scala/Java 库。例如,我们可以很容易地调用其他地方声明的函数。为了使代码更容易理解,下面我们使用Math.max():
scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15
因 Hadoop 而广为流行的 MapReduce 是一种通用的数据流模式。Spark 可以很容易地实现 MapReduce 流程:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count() wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
这里我们调用 flatMap 这个 transformation 算子将一个行的 Dataset 转换成了一个单词的 Dataset, 然后组合 groupByKey 和 count 算子来计算文件中每个单词出现的次数,生成一个包含(String, Long)键值对的 Dataset。为了在 shell 中收集到单词计数, 我们可以调用 collect 算子:
scala> wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)2.3缓存
Spark 还支持把数据集拉到集群范围的内存缓存中。当数据需要反复访问时非常有用,比如查询一个小的热门数据集或者运行一个像 PageRank 这样的迭代算法。作为一个简单的示例,我们把 linesWithSpark 这个数据集缓存起来。
Scala scala> linesWithSpark.cache() res7: linesWithSpark.type = [value: string] scala> linesWithSpark.count() res8: Long = 15 scala> linesWithSpark.count() res9: Long = 15
用 Spark 浏览和缓存一个 100 行左右的文本文件看起来确实有点傻。但有趣的部分是这些相同的函数可以用于非常大的数据集,即使这些数据集分布在数十或数百个节点上。如 RDD 编程指南 中描述的那样, 你也可以通过 bin/spark-shell 连接到一个集群,交互式地执行上面那些 *** 作。
2.4自包含的(self-contained)应用程序假设我们想使用 Spark API 编写一个自包含(self-contained)的 Spark 应用程序。下面我们将快速过一下一个简单的应用程序,分别使用 Scala(sbt编译),Java(maven编译)和 Python(pip) 编写
Scala
首先创建一个非常简单的 Spark 应用程序 – 简单到连名字都叫 SimpleApp.scala:
import org.apache.spark.sql.SparkSession object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val spark = SparkSession.builder.appName("Simple Application").getOrCreate() val logData = spark.read.textFile(logFile).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println(s"Lines with a: $numAs, Lines with b: $numBs") spark.stop() } }
**注意:**应用程序需要定义一个 main 方法,而不是继承 scala.App。scala.App 的子类可能不能正常工作。
这个程序只是统计 Spark README 文件中包含‘a’和包含’b’的行数。注意,你需要把 YOUR_SPARK_HOME 替换成 Spark 的安装目录。与之前使用 Spark Shell 的示例不同,Spark Shell 会初始化自己的 SparkSession 对象, 而我们需要初始化 SparkSession 对象作为程序的一部分。
我们调用 SparkSession.builder 来构造一个 [[SparkSession]] 对象, 然后设置应用程序名称, 最后调用 getOrCreate 方法获取 [[SparkSession]] 实例。
为了让 sbt 能够正常工作,我们需要根据一个标准规范的 Scala 项目目录结构来放置 SimpleApp.scala 和 build.sbt 文件。一切准备就绪后,我们就可以创建一个包含应用程序代码的 JAR 包,然后使用 spark-submit 脚本运行我们的程序。
# Your directory layout should look like this $ find . . ./build.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala # Package a jar containing your application $ sbt package ... [info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit --class "SimpleApp" --master local[4] target/scala-2.12/simple-project_2.12-1.0.jar ... Lines with a: 46, Lines with b: 233 RDD编程 3.1,概述:
总的来说,每个 Spark 应用程序都包含一个驱动器(driver)程序,驱动器程序运行用户的 main 函数,并在集群上执行各种并行 *** 作。Spark 最重要的一个抽象概念就是d性分布式数据集(resilient distributed dataset – RDD), RDD是一个可分区的元素集合,这些元素分布在集群的各个节点上,并且可以在这些元素上执行并行 *** 作。RDD通常是通过HDFS(或者Hadoop支持的其它文件系统)上的文件,或者驱动器中的Scala集合对象来创建或转换得到。其次,用户也可以请求Spark将RDD持久化到内存里,以便在不同的并行 *** 作里复用之;最后,RDD具备容错性,可以从节点失败中自动恢复。
Spark 第二个重要抽象概念是共享变量,共享变量是一种可以在并行 *** 作之间共享使用的变量。默认情况下,当Spark把一系列任务调度到不同节点上运行时,Spark会同时把每个变量的副本和任务代码一起发送给各个节点。但有时候,我们需要在任务之间,或者任务和驱动器之间共享一些变量。Spark 支持两种类型的共享变量:广播变量 和 累加器,广播变量可以用于在各个节点上缓存数据,而累加器则是用来执行跨节点的 “累加” *** 作,例如:计数和求和。
本文将会使用 Spark 所支持的所有语言来展示 Spark 的这些特性。如果你能启动 Spark 的交互式shell动手实验一下,效果会更好(对于 Scala shell请使用bin/spark-shell,而对于python,请使用bin/pyspark)。
Spark 2.4.3(目前官网最新版本)默认使用Scala2.11版本进行构建与并发的(备注:不同的spark版本使用不同的scala的版本进行构建与并发,不如:spark2.2.1 使用scala2.11;如果想用 Scala 写应用程序,你需要使用兼容的 Scala 版本(如:2.11.X)。
要编写 Spark 应用程序,你需要添加 Spark 的 Maven 依赖。Spark 依赖可以通过以下 Maven 坐标从 Maven 中央仓库中获得:
groupId = org.apache.spark artifactId = spark-core_2.12 version = 2.4.3
另外,如果你想要访问 HDFS 集群,那么需要添加对应 HDFS 版本的 hadoop-client 依赖:
groupId = org.apache.hadoop artifactId = hadoop-client version =
最后,你需要在程序中添加下面几行来引入一些 Spark 类:
import org.apache.spark.SparkContext import org.apache.spark.SparkConf
(在 Spark 1.3.0 版本之前,你需要显示地 import org.apache.spark.SparkContext._ 来启用必要的隐式转换)
(使用java,python方式初始化链接spark详情请见官网)
Spark 程序需要做的第一件事就是创建一个 SparkContext 对象,SparkContext 对象决定了 Spark 如何访问集群。而要新建一个 SparkContext 对象,你还得需要构造一个 SparkConf 对象,SparkConf对象包含了你的应用程序的配置信息。
每个JVM进程中,只能有一个活跃(active)的 SparkContext 对象。如果你非要再新建一个,那首先必须将之前那个活跃的 SparkContext 对象stop()掉。
Scala val conf = new SparkConf().setAppName(appName).setMaster(master) new SparkContext(conf)
appName 参数值是你的应用展示在集群UI上的应用名称。master参数值是Spark, Mesos or YARN cluster URL 或者特殊的“local”(本地模式)。实际上,一般不应该将master参数值硬编码到代码中,而是应该用spark-submit脚本的参数来设置。然而,如果是本地测试或单元测试中,你可以直接在代码里给master参数写死一个”local”值。
3.3.1:使用shellScala
在 Spark Shell 中,默认已经为你新建了一个 SparkContext 对象,变量名为sc。所以 spark-shell 里不能自建SparkContext对象。你可以通过–master参数设置要连接到哪个集群,而且可以给–jars参数传一个逗号分隔的jar包列表,以便将这些jar包加到classpath中。你还可以通过–packages设置逗号分隔的maven工件列表,以便增加额外的依赖项。同样,还可以通过–repositories参数增加maven repository地址。
下面是一个示例,在本地4个CPU core上运行的实例。
$ ./bin/spark-shell --master local[4] 或者,将 code.jar 添加到 classpath 下: $ ./bin/spark-shell --master local[4] --jars code.jar 通过 maven标识添加依赖: $ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
spark-shell –help 可以查看完整的选项列表。实际上,spark-shell 是在后台调用 spark-submit 来实现其功能的(spark-submit script.)。
3.3.2 d性分布式数据集Spark的核心概念是d性分布式数据集(RDD),RDD是一个可容错、可并行 *** 作的分布式元素集合。总体上有两种方法可以创建 RDD 对象:由驱动程序中的集合对象通过并行化 *** 作创建,或者从外部存储系统中数据集加载(如:共享文件系统、HDFS、Hbase或者其他Hadoop支持的数据源)。
3.3.2.1 并行集合并行集合是以一个已有的集合对象(例如:Scala Seq)为参数,调用 SparkContext.parallelize() 方法创建得到的 RDD。集合对象中所有的元素都将被复制到一个可并行 *** 作的分布式数据集中。例如,以下代码将一个1到5组成的数组并行化成一个RDD:
Scala val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)
一旦创建成功,该分布式数据集(上例中的distData)就可以执行一些并行 *** 作。如,distData.reduce((a, b) => a + b),这段代码会将集合中所有元素加和。后面我们还会继续讨论分布式数据集上的各种 *** 作。
并行集合的一个重要参数是分区(partition),即这个分布式数据集可以分割为多少分区(partition)。Spark将会为集群中的每个分区(partition)运行一个task。一般情况下,在集群中每个CPU对应2-4个分区。通常,Spark会基于集群的情况,自动设置这个分区数。当然,你也可以通过设置第二参数parallelize (比如: sc.parallelize(data, 10)来手动设置分区数量。注意:Spark代码里有些地方仍然使用分片(slice)这个术语,这只不过是分区的一个别名,主要为了保持向后兼容。
3.3.2.2 外部数据集Spark 可以通过Hadoop所支持的任何数据源来创建分布式数据集,包括:本地文件系统、HDFS、Cassandra、Hbase、Amazon S3 等。Spark 支持的文件格式包括:文本文件(text files)、SequenceFiles,以及其他 Hadoop 支持的输入格式(InputFormat)。
文本文件创建RDD可以用 SparkContext.textFile 方法。这个方法输入参数是一个文件的URI(本地路径,或者 hdfs://,s3n:// 等),其输出RDD是一个文本行集合。以下是一个简单示例:
scala> val distFile = sc.textFile("data.txt") distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at:26
一旦RDD被创建以后,distFile 就可以执行数据集的一些 *** 作。比如,我们可以把所有文本行的长度加和:distFile.map(s => s.length).reduce((a, b) => a + b)。
以下是一些读取文本文件的一些注意点:
1,如果是本地文件系统路径,那么这个文件必须在所有的 worker 节点上能够以相同的路径访问到。所以要么把文件复制到所有worker节点上同一路径下,要么挂载一个共享文件系统。
2,所有 Spark 基于文件输入的方法(包括textFile)都支持输入参数为:目录,压缩文件,以及通配符。例如:textFile(“/my/directory”), textFile(“/my/directory/.txt”), 以及 textFile(“/my/directory/.gz”)。
3,textFile 方法同时还支持第二个可选参数,用以控制数据的分区个数。默认地,Spark会为文件的每一个block创建一个分区(HDFS上默认block大小为128MB,老版本block的大小为64M),你可以通过传递一个更大的参数来上调分区数。注意:分区数不能少于block个数。
除了文本文件之外,Spark的Scala API还支持其他几种数据格式:
1,SparkContext.wholeTextFiles 可以读取一个包含很多小文本文件的目录,并且以 (filename, content) 键值对的形式返回结果。这与textFile 不同,textFile只返回文件的内容,每行作为一个元素。 wholeTextFiles,分区数由数据位置决定,在一些情况下,可能导致分区数特别少,对于这些情况,wholeTextFiles提供了第二个可选的参数,来控制极少分区数的情况。
2,对于 SequenceFiles,可以调用 SparkContext.sequenceFile[K, V],其中 K 和 V 分别是文件中 key 和 value 的类型。这些类型都应该是Hadoop Writable 接口的子类, 如:IntWritable and Text 等。另外,Spark 允许你为一些常用Writable指定原生类型,例如:sequenceFile[Int, String] 将自动读取 IntWritable 和 Text。
4,对于其他的 Hadoop InputFormat,你可以用 SparkContext.hadoopRDD 方法,并传入任意的 JobConf 对象和 InputFormat,以及 key class、value class。这和设置 Hadoop job 的输入源是同样的方法。你还可以使用 SparkContext.newAPIHadoopRDD,该方法接收一个基于新版Hadoop MapReduce API (org.apache.hadoop.mapreduce)的InputFormat作为参数。
5,RDD.saveAsObjectFile 和 SparkContext.objectFile 支持将 RDD 中元素以 Java 对象序列化的格式保存成文件。虽然这种序列化方式不如 Avro 效率高,却为保存 RDD 提供了一种简便方式。
RDDS支持俩种类型的算子:转换算子(transformations),从已有的RDD创建一个新的RDD,和actions算子,通过对数据集(RDD)的计算,将计算结果值返回给driver驱动器。例如:map 是一个 transformation 算子,它将数据集中每个元素传给一个指定的函数,并将该函数返回结果构建为一个新的RDD;而 reduce 是一个 action 算子,它可以将 RDD 中所有元素传给指定的聚合函数,并将最终的聚合结果返回给驱动器(还有一个 reduceByKey 算子,返回一个分布式数据集(RDD))。
在spark中所有的转换算子都是懒加载的,也就是说,transformation 算子并不立即计算结果,而是记录下对基础数据集(如:一个数据文件)的转换 *** 作,只有等到某个 action 算子需要计算一个结果返回给驱动器的时候,transformation 算子所记录的 *** 作才会被计算。这样的设计,可以使spark运行的更加高效,例如:map算子创建了一个数据集,同时该数据集下一步会调用reduce算子,那么Spark将只会返回reduce的最终聚合结果(单独的一个数据)给驱动器,而不是将map所产生的数据集整个返回给驱动器。
默认情况,每次调用 action 算子的时候,每个由 transformation 转换得到的RDD都会被重新计算。然而,你也可以通过调用 persist(或者cache) *** 作来持久化一个 RDD,这意味着 Spark 将会把 RDD 的元素都保存在集群中,因此下一次访问这些元素的速度将大大提高。同时,Spark 还支持将RDD元素持久化到内存或者磁盘上,甚至可以支持跨节点多副本。
以下简要说明以下RDD的基本 *** 作,示例如下:
val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b)
其中,第一行是从外部文件加载数据,并创建一个基础RDD。这时候,数据集并没有加载进内存除非有其他 *** 作施加于lines,这时候的lines RDD其实可以说只是一个指向 data.txt 文件的指针。第二行,用lines通过map转换得到一个lineLengths RDD,同样,lineLengths也是懒惰计算的。最后,我们使用 reduce算子计算长度之和,reduce是一个action算子。此时,Spark将会把计算分割为一些小的任务,分别在不同的机器上运行,每台机器上都运行相关的一部分map任务,并在本地进行reduce,并将这些reduce结果都返回给驱动器。
如果我们后续需要重复用到 lineLengths RDD,我们可以增加一行:
lineLengths.persist()
这一行加在调用 reduce 之前,则 lineLengths RDD 首次计算后,Spark会将其数据保存到内存中。
Spark 的 API 很多都依赖于在驱动程序中向集群传递 *** 作函数。以下是两种建议的实现方式:
1,函数(Anonymous function syntax),这种方式代码量比较少。
2,全局单件中的静态方法。例如,你可以按如下方式定义一个 object MyFunctions 并传递其静态成员函数 MyFunctions.func1:
object MyFunctions { def func1(s: String): String = { ... } } dd.map(MyFunctions.func1)
注意:技术上来说,你也可以传递一个类对象实例上的方法(不是单例对象),不过这会致传递函数的同时,需要把相应的对象也发送到集群中各节点上。例如:
class MyClass { def func1(s: String): String = { ... } def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) } }
如果我们创建一个新的MyClass实例,并调用其 doStuff 方法,同时doStuff中的 map算子引用了该MyClass实例上的 func1 方法,那么,整个MyClass对象将被发送到集群中所有节点上。这就类似去写一个rdd.map(x => this.func1(x)).
类似的,如果应用外部对象的成员变量,也会导致对整个对象实例的引用:
class MyClass { val field = "Hello" def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) } }
上面的代码对 field 的引用等价于 rdd.map(x => this.field + x),这将导致应用整个this对象。为了避免类似问题,最简单的方式就是,将field复制到一个本地临时变量中,而不是从外部直接访问之,如下:
def doStuff(rdd: RDD[String]): RDD[String] = { val field_ = this.field rdd.map(x => field_ + x) }3.4.3 理解闭包
Spark里一个比较难的事情就是,理解在整个集群上跨节点执行的变量和方法的作用域以及生命周期。Spark里一个频繁出现的问题就是RDD算子在变量作用域之外修改了其值。下面的例子,我们将会以foreach() 算子为例,来递增一个计数器counter,不过类似的问题在其他算子上也会出现。
3.4.3.1示例:
考虑如下例子,我们将会计算RDD中原生元素的总和,如果不是在同一个 JVM 中执行,其表现将有很大不同。例如,这段代码如果使用Spark本地模式(–master=local[n])运行,和在集群上运行(例如,用spark-submit提交到YARN上)结果完全不同。
Scala var counter = 0 var rdd = sc.parallelize(data) // Wrong: Don't do this!! rdd.foreach(x => counter += x) println("Counter value: " + counter)3.4.3.2,本地模式 VS 集群模式
上面这段代码其行为是没有被定义的,并且可能不能按照预期运行工作。为运行Job,spark将RDD *** 作的处理分解为多个任务,每个任务在一个exeutor中运行。在执行之前,spark计算任务是闭包的(closuer)。闭包(closuer)是执行器在RDD上执行计算时必须可见的变量和方法(比如:本例中的foreach())。闭包(closure)被序列化,并且发送到每个执行器(executor)中。
上例中,现在发送到每个执行器(executor)的闭包中的变量是副本,因此。当在foreach()函数中引用counter变量时,它已不再是diver节点的计数器(counter)。在driver节点的内存中,仍然存在一个counter变量,但是执行器已经不能再访问它!执行器只能看到序列化闭包中的副本。由于计数器(counter)的所有 *** 作都引用了序列化闭包中的值,计数器(countter)的值最终仍然为0。
在本地模式,某些情况下,foreach算子实际上将和驱动器(driver)在同一个JVM中执行,并且将引用相同的原始计数器(counter),并且可能实际会跟新它。
为了确保在这些场景中能定义良好的行为,应该使用累加器(Accumulator)。累加器(Accumulator)在Spark中专门用于提供一种机制,在集群中跨工作节点执行拆分任务时,安全的更新变量。在本指南的累加器部分(Accumulators)会更加详细的说明。
通常,闭包(closures)结构 类似于循环或局部定义的方法,不应该被用于转换一些全局的状态。Spark没有定义或者保证从闭包(closures)外部引用对象的突变。有一些执行此 *** 作的代码可能在本地模式下工作,但这只是偶然发生的,这样的代码在分布式模式下将不会像预期的那样工作。如果在一些全局计算时,使用累加器(Accumulator)是必须的。
另一种常用来打印RDD中所有元素的语法是使用rdd.foreach(println) 或者 rdd.map(println)方法。在单台机器上,这将产生预期的结果并打印RDD中所有的元素。然而,在集群模式下,被执行器调用的输出stdout 被正在写入执行器的stdout 取代,而不是驱动器(driver)上的stdout,因此驱动器(driver)上的stdout不会显示输出结果。想要在驱动器(driver)上打印所有的RDD元素,一种方式是使用collect()方法,首先将RDD提取到driver节点,因此可以使用rdd.collect().foreach(println)。这种方式可能导致驱动器(driver)节点的内存溢出,因为collect()算计将整个RDD中的数据收集到回迁到单台机器上。如果你只是需要打印RDD中的一小部分元素,一种更安全的策略是使用take()算子:rdd.take(100).foreach(println)。
3.4.4,使用key-value键值对虽然,大部分Spark算子都能在包含任何对象类型的RDD上工作,但是有一小部分特殊的算子只能在key-value键值对的RDD上运行。这些算子最常见的应用是在分布式混洗(shuffle)过程,比如对key元素的分组或者聚合。
在scala中,这些算子在包含Tuple2对象(内建于scala语言中,可以这样简单的被创建:(a,b))的RDD上自动可用。Key-value键值对算子在ParRDDFunction类上可用,这个类型也会自动包装到包含tuples的RDD上。
例如:下面的这个例子将reduceByKey算子应用到key-value键值对来计算文件中每行文本出现的次数:
val lines = sc.textFile("data.txt") val pairs = lines.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a + b)
我们也可以使用counts.sortByKey(),例如:对键值对按字母顺序排列,最终使用counts.collect()将排序结果以对象数组拉取到驱动器(driver)的内存中。
注意:当我们在使用自定义的对象作为key-value键值对算子中的key时,必须确保实现一个自定义的equals()方法带有一个匹配的hashCode()方法。完整详细的描述,请参考Object.hashCode()文档。
下列列表描述了一些spark支持的经常使用的算子。详细的说明请参阅RDD API文档和键值对RDD方法文档:
Transformation算子 含义
map(func) 返回一个新的分布式数据集,其中每个元素都是由源RDD中一个元素经func转换得到的。
filter(func) 返回一个新的数据集,其中包含的元素来自源RDD中元素经func过滤后(func返回true时才选中)的结果
flatMap(func) 类似于map,但每个输入元素可以映射到0到n个输出元素(所以要求func必须返回一个Seq而不是单个元素)
mapPartitions(func) 类似于map,但基于每个RDD分区(或者数据block)独立运行,所以如果RDD包含元素类型为T,则 func 必须是 Iterator => Iterator 的映射函数。
mapPartitionsWithIndex(func) 类似于 mapPartitions,只是func 多了一个整型的分区索引值,因此如果RDD包含元素类型为T,则 func 必须是 Iterator => Iterator 的映射函数。
sample(withReplacement, fraction, seed) 采样部分(比例取决于 fraction )数据,同时可以指定是否使用回置采样(withReplacement),以及随机数种子(seed)
union(otherDataset) 返回源数据集和参数数据集(otherDataset)的并集
intersection(otherDataset) 返回源数据集和参数数据集(otherDataset)的交集
distinct([numTasks])) 返回对源数据集做元素去重后的新数据集
groupByKey([numTasks]) 只对包含键值对的RDD有效,如源RDD包含 (K, V) 对,则该算子返回一个新的数据集包含 (K, Iterable) 对。注意:如果你需要按key分组聚合的话(如sum或average),推荐使用 reduceByKey或者 aggregateByKey 以获得更好的性能。注意:默认情况下,输出计算的并行度取决于源RDD的分区个数。当然,你也可以通过设置可选参数 numTasks 来指定并行任务的个数。
reduceByKey(func, [numTasks]) 如果源RDD包含元素类型 (K, V) 对,则该算子也返回包含(K, V) 对的RDD,只不过每个key对应的value是经过func聚合后的结果,而func本身是一个 (V, V) => V 的映射函数。另外,和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 如果源RDD包含 (K, V) 对,则返回新RDD包含 (K, U) 对,其中每个key对应的value都是由 combOp 函数 和 一个“0”值zeroValue 聚合得到。允许聚合后value类型和输入value类型不同,避免了不必要的开销。和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。
sortByKey([ascending], [numTasks]) 如果源RDD包含元素类型 (K, V) 对,其中K可排序,则返回新的RDD包含 (K, V) 对,并按照 K 排序(升序还是降序取决于 ascending 参数)
join(otherDataset, [numTasks]) 如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中将包含内关联后key对应的 (K, (V, W)) 对。外关联(Outer joins) *** 作请参考 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 算子。
cogroup(otherDataset, [numTasks]) 如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中包含 (K, (Iterable, Iterable))。该算子还有个别名:groupWith
cartesian(otherDataset) 如果源RDD包含元素类型 T 且参数RDD(otherDataset)包含元素类型 U,则返回的新RDD包含前二者的笛卡尔积,其元素类型为 (T, U) 对。
pipe(command, [envVars]) 以shell命令行管道处理RDD的每个分区,如:Perl 或者 bash 脚本。RDD中每个元素都将依次写入进程的标准输入(stdin),然后按行输出到标准输出(stdout),每一行输出字符串即成为一个新的RDD元素。
coalesce(numPartitions) 将RDD的分区数减少到numPartitions。当以后大数据集被过滤成小数据集后,减少分区数,可以提升效率。
repartition(numPartitions) 将RDD数据重新混洗(reshuffle)并随机分布到新的分区中,使数据分布更均衡,新的分区个数取决于numPartitions。该算子总是需要通过网络混洗所有数据。
repartitionAndSortWithinPartitions(partitioner) 根据partitioner(spark自带有HashPartitioner和RangePartitioner等)重新分区RDD,并且在每个结果分区中按key做排序。这是一个组合算子,功能上等价于先 repartition 再在每个分区内排序,但这个算子内部做了优化(将排序过程下推到混洗同时进行),因此性能更好。
3.4.6 *** 作算子以下列表,列举了一些spark支持,经常使用的 *** 作算子。详细的信息请参阅RDD API文档和键值对RDD方法文档。
Action算子 作用
reduce(func) 将RDD中元素按func进行聚合(func是一个 (T,T) => T 的映射函数,其中T为源RDD元素类型,并且func需要满足 交换律 和 结合律 以便支持并行计算)
collect() 将数据集中所有元素以数组形式返回驱动器(driver)程序。通常用于,在RDD进行了filter或其他过滤 *** 作后,将一个足够小的数据子集返回到驱动器内存中。
count() 返回数据集中元素个数
first() 返回数据集中首个元素(类似于 take(1) )
take(n) 返回数据集中前 n 个元素
takeSample(withReplacement,num, [seed]) 返回数据集的随机采样子集,最多包含 num 个元素,withReplacement 表示是否使用回置采样,最后一个参数为可选参数seed,随机数生成器的种子。
takeOrdered(n, [ordering]) 按元素排序(可以通过 ordering 自定义排序规则)后,返回前 n 个元素
saveAsTextFile(path) 将数据集中元素保存到指定目录下的文本文件中(或者多个文本文件),支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。保存过程中,Spark会调用每个元素的toString方法,并将结果保存成文件中的一行。
saveAsSequenceFile(path)(Java and Scala) 将数据集中元素保存到指定目录下的Hadoop Sequence文件中,支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。适用于实现了Writable接口的键值对RDD。在Scala中,同样也适用于能够被隐式转换为Writable的类型(Spark实现了所有基本类型的隐式转换,如:Int,Double,String 等)
saveAsObjectFile(path)(Java and Scala) 将RDD元素以Java序列化的格式保存成文件,保存结果文件可以使用 SparkContext.objectFile 来读取。
countByKey() 只适用于包含键值对(K, V)的RDD,并返回一个哈希表,包含 (K, Int) 对,表示每个key的个数。
foreach(func) 在RDD的每个元素上运行 func 函数。通常被用于累加 *** 作,如:更新一个累加器(Accumulator ) 或者 和外部存储系统互 *** 作。
Spark RDD API同时也公开了一些算子的异步版本,比如:foreachde foreachAsync,它会向调用者立即返回一个FutureAction,而不是在 *** 作完成时阻塞。这可用于管理或者等待动作的异步执行。
3.4.7 混洗算子(Shuffle Operations)Spark中有一些算子能够触发众所周知的shuffle *** 作。Spark中的shuffle *** 作是将数据重新分布,以此来实现数据可以跨partition进行分组。Shuffle过程通常需要跨executors和machines之间进行数据拷贝,所以导致shuffle过程是一个复杂并且代价昂贵的 *** 作。
3.4.7.1 背景:为了很好的理解shuffle过程中发生了什么,可以以reduceByKey *** 作为例来考虑。reduceByKey算子会生成一个新的RDD,将原RDD中的相同key对应的所有value值组合为一个二元组(tuple):(key,与key关联的所有value值执行reduce函数产生的结果)。这个算子的难点是,对于某个key,它关联的所有value值,并不一定分布在同一个partition中,甚至不在同一台机器上,但是这些value值必须在相同的位置才能计算结果。
在spark中,为了进行某些特性的 *** 作,数据通常分布到所需要的各个partition中。在分析过程中个,一个task任务将 *** 作一个partition的数据。因此,为了组织在单个reduceByKey中的reduce任务中执行所有的数据,Spark需要执行一个all-to-all的 *** 作。这必须从读取Rdd中的所有partition,并从中找到所有key对应的所有value值,并且将每个key对应得
value值放到一起,以便后续计算每个key对应的value的最终结果,这个过程就叫shuffle。
虽然,shuffle过程之后,每个partition中的数据的顺序都是确定的,并且分区自身的顺序也是确定的,但是这些数据的顺序并不确定。如果需要shuffle过程后,分区内的数据元素有序,则可以使用以下方式:
1,mapPartition 为每个partition排序,比如:.sorted;
2,repartitionAndSortWithinPartitions:重分区的同时,高效的对分区排序;
3,sortBy:对RDD元素进行全局排序;
能导致shuffle过程的 *** 作有:重分区算子类(repartition),比如:reparation算子,和coalesce。‘Bykey’类算子(除了counting)如:groupByKey和reduceByKey。以及join类算子:比如,cogroup和join。
Shuffle过程是一个分厂昂贵的 *** 作,因为它包括磁盘I/O,数据序列化,以及网络I/O。为了组织好shuffle过程的数据,spark需要生成task任务集,一系列map任务去组织数据,一系列reduce任务去计算数据。这些命名来源于MapReduce,和spark的map和reduce算子没有直接关系。
另外,单独的map task任务的输出结果尽量保存在内存中,一直到内存方不下。然后,这些数据会根据目标partition进行排序,写入一个文件。在reduce过程中,task任务读取与之紧密相关的已经排好序的blocks。
某些shuffle算子会导致非常明显的堆内存增长,因为这些算子在传输数据前后,会在内存中维护组织数据记录的各种数据结构。特别的,reduceByKey和aggregateByKey算子会在map端创建这些数据结构,而‘byKey’系列的算子会在reduce端创建数据结构。当数据在内存无法存储时,spark会将这些数据落盘存储到磁盘上,当然这会导致额外的磁盘I/O和垃圾回收的开销。
Shuffle *** 作还会在磁盘上产生大量的临时文件。比如在Spark1.3版本中,这些文件将一直保留到与其对应的RDD不在使用且被垃圾回收之后才会被删除。这么做的原因是,如果该RDD的血统信息(即:生成RDD的父RDD以及其爷爷RDD,统称为RDD的血统)被重新计算时,则不需要重新生成这些shuffle文件。如果应用程序保留这些RDD的引用或者GC启动频率较低,那么垃圾回收只能隔很长一段时间才能发生。这就意味着长时间运行的Spark job会占用大量的磁盘空间。Spark的临时存储目录是由spark.local.dir 配置参数指定,在初始化Spark contex时。
Shuffle *** 作受到大量参数控制,详细的参数信息请参考Spark Configuration Guide.
Spark中其中一个最重要的能力就是持久化(persisting或者 caching)数据集到内存中,从而可以在跨 *** 作之间可以复用这些数据。当一个RDD被持久化后,那么每个节点会将该节点计算的任何partition存储到内存中,并且在该数据集(或者由该数据集衍生出的其他数据集)上的其他 *** 作中复用partition中的数据。这使得后续的 *** 作更快(通常能提升10倍)。因此,缓存对于迭代算法和快速交互分析是一个非常有用的工具。
可以使用persisit()或者cache()方法来标记一个需要持久化的RDD。在该RDD被一个action算子第一个计算时,该RDD就会被缓存到该节点的内存中。Spark的缓存具有容错功能-如果RDD中的任何一个partition数据丢失,那么spark会根据其血统信息自动重新计算。
另外,每个被持久化的RDD可以使用不同的存储级别。例如,你可以将数据集存储到磁盘,或者以Java 序列化对象(为了节省空间)保存到内存中,或者跨节点多副本存储。这些存储等级的设置通过给persist方法传递参数StorageLevel。Cache()方法本身是一个使用默认存储级别的快捷方式,默认存储等级是StorageLevel.MEMORY_onLY(以未序列化的java对象存储在内存中),所有存储等级如下:
Storage Level Meaning
MEMORY_onLY RDD以未序列化的java对象存储在JVM中,如果一个RDD未能全部存储到内存中,那么一部分分区中的数据将不会被缓存,而是在需要的时候从新计算,这是默认的缓存方式
MEMORY_AND_DISK 以未序列化后的java对象存储在JVM中,如果RDD中的数据不能再内存中全部存储,那么部分partition数据存储到磁盘,在需要的时候,从磁盘中读取。
MEMORY_ONLY_SER 以序列化的java对象存储RDD(每个分区中以字节数组存储)。通常这种方式比未序列化对象更节省空间,特别是使用更快的序列化方式,但是在的去的时候,更加消耗CPU
MEMORY_AND_DISK_SER 与MEMORY_ONLY_SER方式类似,不同的是当分区的数据在内存中无法全部存储时,将该部分数据存储到磁盘上,而不是每次需要的时候从新计算。
DISK_onLY 将RDD中的数据全部存储到磁盘中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc 和上面存储默认类似,但是每个分区的只在集群中的俩个节点保留副本。
OFF_HEAP (experimental) 和MEMPRY_ONLY_SER类似。只是将数据存储到堆外内存。这里需要堆外空间来存储。
注意:在python中,通常使用Pickle library方式来序列化存储对象,所以是否选择序列化级别无关紧要。在python中可用的存储级别有:MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_onLY and DISK_ONLY_2。
Spark中,自动会将shuffle过程中产生的中间数据缓存(比如:reduceBykey),甚至需要用户调用persisit()方法。这样做是为了避免在shuffle过程中一个节点失败而重新计算数据。当然,还是建议用户在对需要重复使用的结果RDD调用persist()方法。
3.5.1 选择哪种缓存级别Spark的缓存等级主要在于在内存使用和CPU之间做一些权衡。我们建议使用以下步骤来选择一种合适的存储级别:
1,如果RDD能够使用默认的存储方式(MEMORY_ONLY),那么尽量使用默认的存储方式,这样是CPU效率最高的方式,可以尽可能快速的执行spark算子;
2,如果默认存储级别不能满足需求,那么尽量使用MEMOTY_ONLY_SER并且选择一个搞笑的序列化协议来尽量节省数据的存储空间,同时执行速度也不错。
3,尽量不要将数据存储到磁盘上,除非重新计算数据的代价非常大,或者需要过滤巨大的数据。否则的话,从新计算一个分区和从磁盘上读取数据速度差不多。
4,如果需要支持容错,那么可以使用带副本的缓存级别(比如:使用spark来服务web请求)。所有的存储等级都通过从新计算丢失的数据来提供了完全容错机制,但是副本机制可以是task任务继续运行而不需要等待重新计算丢失的分区数据。
Spark会自动监控每个节点上缓存数据的使用率,并且会使用最近最少使用(LRU)方式将旧数据删除,如果你想手动将RDD中的数据移除内存来取代等待数据自动从内存中移除,那么可以使用RDD.unpersist()方法。
3.6 共享变量通常,当我们给Spark算子(比如:map或者reduce)传递一个函数时,这些函数将会在远程的集群节点上运行,并且这些函数引用的变量都是各个节点上独自的副本。这些变量被复制到每台机器上,并且对远程机器上变量的任何更新都不会传播回driver程序。通常来说,跨task任务读写共享变量的效率非常低下。但是,spark提供了俩种比较通用的共享变量:广播变量(broadcast variables)和累加器(accumulators)
3.6.1 广播变量广播变量允许编程人员在每台机器上缓存一份只读变量,而不是将副本随着task任务一起分发。广播变量可以被用来:比如,在每个节点缓存一个大型输入数据集的副本,这是一种比较高效的方式。Spark还尝试使用高效的广播算法来分配广播变量以降低通信成本。
Spark的 *** 作会由一组stage 来 *** 作,stage之间被‘shuffle’过程分割开来。Spark会自动广播在每个stage中的task任务中需要使用的公共的数据。被广播的数据被序列化后缓存,然后在task任务使用之前反序列化。这意味着,只有在跨多个stage中的task任务之间需要使用共同的数据,或者以序列化和反序列化方式缓存数据的时候,显示的创建广播变量才是必须的。
广播变量可以通过调用SparkContext.broadcast(v)的方式来将变量v广播。广播变量是对变量v的包装,或者它的value值时,通过调用value方法。具体的方式如下:
Scala: scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)
在广播变量被创建之后,集群中的任何方法都不应该再使用原始的v数据,这样才能避免数据v被传输多次。另外,对象v被广播之后不应该再被更新,这样才能确保每个阶段拿到相同的广播值。(比如:如果变量被更新后,一个新的节点拿更新后的值,那么该节点与其他节点v的值将不一样)。
3.6.2 累加器(Accumulators)累加器是只通过关联和交换 *** 作累计“添加”的变量,因此可以高效的支持并行计算。它可以被用来实现累加器(比如:在MapReduce中)或者求总和。Spark原生支持数字类型的累加器,并且开发者可以添加新的类型。
作为开发者,即可以创建有个命名的累加器,也可以创建一个未命名的累加器。正如下图所示,一个命名的累加器(在被初始化为counter)将会Web UI上展示每个stage中修改累加器。Spark在“Tasks”任务表中显示每个task任务修改累加器的值。
在UI上追踪累加器的执行,对理解程序在每个stage中运行是非常有帮助的。(注意:这种方式并不支持Python)
一个数字类型的累加器可以通过调用SparkContext.longAccumulator() 或者 SparkContext.doubleAccumulator()方法分别去做Long类型和Double类型的累加值。Task任务运行在集群中,可以使用add方法做累加。但是,executor程序不能去读取累加器的值,只有driver程序可以读取累加器的value值,通过使用它的value方法。
下面的示例展示了累加器用来累加一个数组元素的值。
Scala: scala> val accum = sc.longAccumulator("My Accumulator")accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0) scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))...10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.valueres2: Long = 10
当然,以上代码支持了Long类型的累加器,开发者也可以创建自己的类型,通过继承AccumulatorV2。AccumulatorV2抽象类有一系列方法用来被子类重写,方法:reset 方法来将累加器的值重新设置为0;add方法用来给累加器增加另一个值;merge方法用来将其他相同类型的累加器合并到一个。其他需要必须被重写的方法在API 文档中有详细讲解。比如:假设我们有一个MyVector类提供了数学的verctors,我们可以这样写:
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] { private val myVector: MyVector = MyVector.createZeroVector def reset(): Unit = { myVector.reset() } def add(v: MyVector): Unit = { myVector.add(v) } ...} // Then, create an Accumulator of this type:val myVectorAcc = new VectorAccumulatorV2// Then, register it into spark context:sc.register(myVectorAcc, "MyVectorAcc1")
**注意:**当开发着定义了自己的AccumulatorV2类型,那么结果类型可以与添加的元素类型不同。
对于只在action算子内部执行跟新的累加器,spark保证每个task任务对累加器的跟新将只执行一次,即:重新执行task任务将不会再更新累加器的值。在转换(transformations)算子中,用户应该能意识到,如果tasks或者job中的stage被重新执行时,每个task任务中对累加器的更新 *** 作将不仅执行一次。
累加器并不会改变Spark的懒运算模型。如果累加器的值在一个RDD的 *** 作中被更新,那么RDD作为action的一部分被计算时,累加器的value值将只被更新一次。因此,当在一个懒加载执行的转换算子比如map()中,累加器的更新不能保证被执行。
下面的示例证明该属性:
Scala val accum = sc.longAccumulatordata.map { x => accum.add(x); x } // Here, accum is still 0 because no actions have caused the map operation to be computed.
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)