大家好,我是小林!
今天开始给大家写 《大数据面试突击》之 Spark 系列文章,虽然现在 Flink 在流计算领域很火,但是 Spark 凭借其自身的诸多优势,仍然还是大多数互联网公司的标配。目前,大数据领域计算框架层出不穷,诸如 Flink 等,Spark为什么还能在大数据领域占有一席之地?其原因有二:
Spark 开发效率和执行效率快。首先它支持多种开发语言,其中包括 Phthon、Java、Scala、R语言以及 SQL,使得我们开发效率极高;其次执行速度也很快。Spark 支持的计算场景很全。其中包括大数据常见的计算场景有:批处理、流计算、数据分析、机器学习以及图计算。
本次 Spark 系列文章,我会带你吃透 Spark Core 核心原理,熟悉 Spark 开发中常见的算子。主要分为三个部分:Spark Core、Spark SQL以及Structured Streaming。 来,看下小林给大家准备的框架图吧:
小林,你是不是搞错了,不是四个部分嘛?是的,Spark MLlib 机器学习这部分小林目前也还没涉猎,先看前面三个部分各位看官的反馈吧,如果反响好的话,为了你们我会再卷一波,搞定 Spark MLlib。话不多说,开始我们今天的文章吧!
Spark 背景在 Spark 出现以前,Hadoop 框架在大数据领域可谓是德高望重,拥有分布式存储之王 HDFS 和 通用的调度系统 YARN 以及分布式计算框架 MapReduce。MR 分布式计算的通用逻辑图如下所示:
由上图可知,MapReduce 提供了两类计算抽象,分别是 Map 和 Reduce。Map 通常封装的是业务数据转换的逻辑,Map 计算结束后,把数据存到 HDFS ,通过数据分发,也就是 Shuffle ,Reduce 端进行数据聚合 *** 作,最后把计算结果存放到 HDFS。在这个过程中,Hadoop 采用 HDFS 为计算抽象之间的数据接口来规避廉价磁盘引入的系统稳定性问题,采用 YARN 来完成分布式资源调度从而充分利用廉价的硬件资源。
上面的流程看起来很丝滑, Hadoop 的三个组件搭配,几乎可以实现大部分批处理任务。但是,往往一个任务中的数据量会很大,需要大量的 Map 和 Reduce,也就是说,上述的逻辑计算流程会被重复执行多次,就会使得数据不断的落盘,不断的 shuffle ,导致性能变差。
为了解决这个问题,2009 年 Spark 应运而生,加州伯克利分校的 AMP 实验室等人提出了基于内存的分布式计算引擎 – Spark Core。
Spark 诞生于加州大学伯克利分校的 AMP 实验室(the Algorithms, Machines and People lab),并于 2010 年开源。2013 年,Spark 捐献给阿帕奇软件基金会(Apache Software Foundation),并于 2014 年成为 Apache 顶级项目。
当然,Spark 远不止 Spark Core 这么简单,Spark Core 只是它的基石,它的模块大致可以分为以下几个部分:
什么是 RDD?RDD(Resilient Distributed Datasets),全称是“d性分布式数据集”。从名字来看,完全不知道这是啥,官方给的解释是:RDD 是一种抽象,是 Spark 对于分布式数据集的抽象,它包含了所有内存中以及磁盘中的分布式数据。小林给你打个不恰当的比方,在理解 RDD 时可以将其类比成数组。数组是某种同类数据的集合,只不过数组的数据分布在单进程内,而 RDD 可以跨进程、跨节点分布,即 Spark 将分布式存储的数据集作了一层数据模型的抽象,称为 RDD。
为什么要提出 RDD 这样一个概念?
我们反观 MapReduce 的计算过程,其 Map 逻辑和 Reduce 计算逻辑之间,需要依靠 HDFS 作为数据接口。Map 将计算的中间结果以文件的形式存到 HDFS,Reduce 从 HDFS 读取数据后进行聚合,把最终结果再次存到 HDFS 上。
可以看出,Map 和 Reduce 因为其中间存在 HDFS 这样的数据接口,使得 Map 与 Reduce 之间计算不能流畅的衔接,HDFS 数据又是使用副本机制实现高可用,多副本便会带来更多的磁盘 IO 和网络 IO。所以,Spark 提出了 RDD 的数据模型,将所有中间环节所产生的数据文件以某种统一的方式归纳、抽象出来,使得 Map 与 Redue 不需要 HDFS,从而上下游计算便可以更好的衔接在一起,减少了数据的落盘、发包与收包的动作。
RDD 有哪五大特性?要想深度理解 RDD ,那必须绕不开 RDD 的五大特性。
dependencies:表明了上下游 RDD 之间的依赖。
任何一个 RDD 都不是凭空产生的,它是由上一个 RDD,通过执行某种 compute 的计算逻辑得到的。我们习惯把上一个 RDD 简称为当前 RDD 的父 RDD,术语叫做依赖。
compute:计算函数。
为 RDD 之间转换提供了某种计算逻辑。
partitions:数据分片。
在分布式计算中,一份非常大的数据集通常会按照某种规则分成很多份,散落在集群中的不同节点上,我们把某个节点上的数据称为数据分片。
partitioner:划分数据分片的规则。
preferredLocations:该属性表明的是数据分片的物理位置偏好。
在 Spark 任务计算时,会根据数据分片的存放位置进行调度,优先调度本地数据。一般位置偏好有这么几种:
本地内存:数据分片存在当前计算节点的内存,可直接访问本地磁盘:数据分片在当前计算节点的磁盘中有副本,可就地访问本机架磁盘:跨节点其他机架磁盘:跨机架无所谓:跨机房
给大家做个总结,各种类型的 RDD 有五大特性:
我们继续细品 RDD 的全称,d性分布式数据集。那么,RDD d性到底体现在哪里?哈哈,不要被d性这个词给搞懵了,d性无非说的就是 RDD 的容错,考虑到checkpoint 机制还么有讲解,目前,小林从 RDD 的特性的角度来看。
每个 RDD 都有 dependencies 属性,该 dependencies 记录了父 RDD 的相关元数据信息。一个计算任务中,每个 RDD 的依赖关系便构成了一个血统,父 RDD 通过算子得到子 RDD。当 RDD 存在异常时,Spark 任务便可以根据血统来重新生成异常 RDD。另外从 RDD 的数据分片属性来看,数据分片的数量可以调整,体现了 RDD 的d性。 大数据永远的 WordCount
哈哈,恭喜你!终于学完本节枯燥的理论了,小林给大家用一个简单的案例,来固化一下上面所提到概念。这个最基本的 wordcount 案例,也是很多互联网公司要求能够手写出来的。其代码如下:
object WordCount { def main(args:Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local[*]").setAppName("WordCount") val sc = new SparkContext(conf) //读取文件内容 val lineRDD:RDD[String] = sc.textFile("./data/words.txt",2) //以行为单位进行分词 val wordsRDD: RDD[String] = lineRDD.flatMap(line=>{line.split(" ")}) //把 RDD 元素转换为(key,value) 的形式 val kvRDD: RDD[(String, Int)] = wordsRDD.map(word=>{new Tuple2(word,1)}) //把相同的单词作为一组,并计算 val resultRDD: RDD[(String, Int)] = kvRDD.reduceByKey((v1, v2)=>{v1+v2}) //获取位置排在前面 3 位的词汇,注意这里没有排序不是词频前三 resultRDD.count() } }
看到上述这段程序,有个需要注意的点:local[*],local 模式就是 Spark 单机的运行模式,其它部署模式后面我们都会讲到。local 一共有以下三种方式设置:
local:所有计算都运行在一个线程当中,一般 IDEA 本地测试使用这种方式local[k]:指定使用几个线程来运行计算,比如local[4]就是运行4个worker线程。通常我们的cpu有几个core,就指定几个线程,最大化利用 cpu 的计算能力local[*]:这种模式直接帮你按照cpu最多cores来设置线程数了。 WordCount Job 逻辑执行图
阅读完这段程序后,我们头脑中 Job 的逻辑执行流程应该是这样的:
所谓的 Job 逻辑执行图,就是 RDD 上下游的数据依赖图。
首先,通过调用 textFile 生成 lineRDD,然后调用 flatMap 算子把 lineRDD 转换为 wordsRDD;
然后,为了后续的聚合运算,调用 map 算子转换成 (key,value) 型 kvRDD;
之后调用 reduceByKey 做分组聚合,将(key,value)中的 value 进行单词计数。
最后代码中还有一行 resultRDD.count() 。当程序执行到 count() 时,才会触发 Job 开始。触发后,会先在每个 parktition 上执行计数,然后将每个分区的执行结果发送到 Driver ,最后在 Driver 端进行 sum 求和。
总的来说,Job 的逻辑执行图描述的是 Job 的数据流,整个 Job 会经过哪些转换(transformation ),中间生成哪些 RDD,以及 RDD 之间的依赖关系。RDD 代表的是分布式数据形态,RDD 到另一个 RDD 之间的转换,本质上是数据形态上的转换。(上述红色的框框,你可以理解为一个 RDD,是不是一下清晰很多啦,跟着小林继续吧)
WordCount Job 物理执行图Job 的逻辑执行图还只是刻画了数据上的依赖关系,实际的 task 执行图会更加复杂。学过 Hadoop 的都知道,Hadoop 的整个数据流是固定的,一个进行 Map 处理,一个进行 Reduce 聚合处理,我们只需要把我们自己的计算逻辑分别写在 map( ) 和 reduce( ) 函数即可。
而 Spark 的数据依赖更加灵活,很难将数据依赖流和物理 task 执行统一在一起。因此,Spark 将 Job 的数据流和具体的 task 执行流分开,并设计了一种算法,将逻辑执行图转换成具体的物理执行图,具体算法下一篇再作讨论。
针对 WordCount Job,其物理执行的 DAG 图如下:
从上述的物理执行图可以看出,这个 WordCount 只产生了一个 Job,由 action(resultRDD.count())触发产生。
整个 Job 分成了两个 Stage,分别为紫色和蓝色两个框框(这里不明白 Stage 概念没关系,下一篇会介绍这个概念)。其中 Stage1 (也就是紫色) 包含了 2 个 task ,每个 task 负责从 words.txt 读取部分数据,然后并将各个 RDD 通过 Transformation 算子进行转换,最后将各个分区的转换结果写入本地磁盘。Stage0(也就是蓝色)同样包含了 2 个 task,我们可将其称为 ResultTask,每个 task 首先 shuffle 自己要处理的数据,边 fetch 数据边进行聚合 *** 作,最后进行 count() 计算得到 result,这里的 result 是每个分区内包含多少条 records。task 执行完后,Driver 收集每个 task 的执行结果,进行 sum()。WordCount Job 结束。
可以得知,Job 的物理执行图比较复杂。这里值得提醒的一点是,每个 application 中并不是只包含一个 Job ,可能会包含多个 Job,每个 Job 包含多个 Stage,每个 Stage 包含多个 Task。具体 Job 的个数可以通过 Action 算子界定,即有多少个 Action 算子,就会产生多少个 Job。
注意:
在 RDD 编程模型中分为两类算子: Transformation 和 Action 算子,我们通过调用 Transformation 算子去定义并描述数据形态的转换,然后调用 Action 类算子触发执行,将计算结果收集或者存放到磁盘。Spark 这种编程模型,势必会把 Job 切分成两个环节:
基于不同数据形态之间的转换,构建 DAG (有向无环图)通过 Action 类算子,以回溯的方式去触发执行这个 DAG 。
也就是说,在调用 Action 类算子之前,整个任务并不会立即执行,只有当调用到 Action 类型算子,之前的转换算子才会执行。这种计算模式我们称之为 “延迟计算”,又名惰性计算(lazy evaluation)。
总结本文先介绍了 Spark 产生的背景,详细描述了 Spark 中的 RDD 的概念以及为什么要抽象 RDD 这个概念;叙述了 RDD 的五大特性及其作用;最后使用一个 WordCount 案例,讲解了wordcount 的 Job 的逻辑执行图以及物理执行图,引出了什么是延迟计算。
最后,大家可以思考一下,Job 的逻辑执行图是如何生成的?
物理执行图又是如何生成的?
如何划分 Job,如何划分 Stage,怎么划分 task?
有哪些 Transformation 算子以及 Action 算子?欢迎大家留言与我讨论呀!
好了,今天的文章,就到这里,**看完记得给小林点赞、在看加转发至朋友圈呀。**我们下篇文章见!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)