Spark的特点

Spark的特点,第1张

电商从业者必看。不知从哪天开始,只要连了网,一开启电脑就看到很多来自各大网站广告,有直接卖东西的,有热门推荐,让人关窗口都来不及的广告,这些广告哪儿来,有些东西虽然是自己关注已旧的内容容有些却是自己身边人在使用的。

我们卖家最想知道的就是:

逛淘宝的人想买什么?

买家想买我家的宝贝吗?

能承受的价格是多少?

买家买东西时最看重的是什么?

作为一个卖家想给大家提供的工具有:

Ambari

Hadoop

HBase

Spark

ZooKeeper

Scala

文章来自:IT十八掌

checkpoint是什么

(1)、Spark 在生产环境下经常会面临transformation的RDD非常多(例如一个Job中包含1万个RDD)或者具体transformation的RDD本身计算特别复杂或者耗时(例如计算时长超过1个小时),这个时候就要考虑对计算结果数据持久化保存;

(2)、Spark是擅长多步骤迭代的,同时擅长基于Job的复用,这个时候如果能够对曾经计算的过程产生的数据进行复用,就可以极大的提升效率;

(3)、如果采用persist把数据放在内存中,虽然是快速的,但是也是最不可靠的;如果把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏,系统管理员可能清空磁盘。

(4)、Checkpoint的产生就是为了相对而言更加可靠的持久化数据,在Checkpoint的时候可以指定把数据放在本地,并且是多副本的方式,但是在生产环境下是放在HDFS上,这就天然的借助了HDFS高容错、高可靠的特征来完成了最大化的可靠的持久化数据的方式;

假如进行一个1万个算子 *** 作,在9000个算子的时候persist,数据还是有可能丢失的,但是如果checkpoint,数据丢失的概率几乎为0。

checkpoint原理机制

1当RDD使用cache机制从内存中读取数据,如果数据没有读到,会使用checkpoint机制读取数据。此时如果没有checkpoint机制,那么就需要找到父RDD重新计算数据了,因此checkpoint是个很重要的容错机制。checkpoint就是对于一个RDD chain(链)如果后面需要反复使用某些中间结果RDD,可能因为一些故障导致该中间数据丢失,那么就可以针对该RDD启动checkpoint机制,使用checkpoint首先需要调用sparkContext的setCheckpoint方法,设置一个容错文件系统目录,比如hdfs,然后对RDD调用checkpoint方法。之后在RDD所处的job运行结束后,会启动一个单独的job来将checkpoint过的数据写入之前设置的文件系统持久化,进行高可用。所以后面的计算在使用该RDD时,如果数据丢失了,但是还是可以从它的checkpoint中读取数据,不需要重新计算。

2persist或者cache与checkpoint的区别在于,前者持久化只是将数据保存在BlockManager中但是其lineage是不变的,但是后者checkpoint执行完后,rdd已经没有依赖RDD,只有一个checkpointRDD,checkpoint之后,RDD的lineage就改变了。persist或者cache持久化的数据丢失的可能性更大,因为可能磁盘或内存被清理,但是checkpoint的数据通常保存到hdfs上,放在了高容错文件系统。

问题:哪些 RDD 需要 cache?

会被重复使用的(但不能太大)。

问题:用户怎么设定哪些 RDD 要 cache?

因为用户只与 driver program 打交道,因此只能用 rddcache() 去 cache 用户能看到的 RDD。所谓能看到指的是调用 transformation() 后生成的 RDD,而某些在 transformation() 中 Spark 自己生成的 RDD 是不能被用户直接 cache 的,比如 reduceByKey() 中会生成的 ShuffledRDD、MapPartitionsRDD 是不能被用户直接 cache 的。

运算时间很长或运算量太大才能得到的 RDD,computing chain 过长或依赖其他 RDD 很多的 RDD。 实际上,将 ShuffleMapTask 的输出结果存放到本地磁盘也算是 checkpoint,只不过这个 checkpoint 的主要目的是去 partition 输出数据。

问题:什么时候 checkpoint?

cache 机制是每计算出一个要 cache 的 partition 就直接将其 cache 到内存了。但 checkpoint 没有使用这种第一次计算得到就存储的方法,而是等到 job 结束后另外启动专门的 job 去完成 checkpoint 。 也就是说需要 checkpoint 的 RDD 会被计算两次。因此,在使用 rddcheckpoint() 的时候,建议加上 rddcache(), 这样第二次运行的 job 就不用再去计算该 rdd 了,直接读取 cache 写磁盘。其实 Spark 提供了 rddpersist(StorageLevelDISK_ONLY) 这样的方法,相当于 cache 到磁盘上,这样可以做到 rdd 第一次被计算得到时就存储到磁盘上,但这个 persist 和 checkpoint 有很多不同,之后会讨论。

RDD 需要经过 [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ] 这几个阶段才能被 checkpoint。

Initialized: 首先 driver program 需要使用 rddcheckpoint() 去设定哪些 rdd 需要 checkpoint,设定后,该 rdd 就接受 RDDCheckpointData 管理。用户还要设定 checkpoint 的存储路径,一般在 HDFS 上。

marked for checkpointing: 初始化后,RDDCheckpointData 会将 rdd 标记为 MarkedForCheckpoint。

checkpointing in progress: 每个 job 运行结束后会调用 finalRdddoCheckpoint(),finalRdd 会顺着 computing chain 回溯扫描,碰到要 checkpoint 的 RDD 就将其标记为 CheckpointingInProgress,然后将写磁盘(比如写 HDFS)需要的配置文件(如 core-sitexml 等)broadcast 到其他 worker 节点上的 blockManager。完成以后,启动一个 job 来完成 checkpoint(使用 rddcontextrunJob(rdd, CheckpointRDDwriteToFile(pathtoString, broadcastedConf)) )。

checkpointed: job 完成 checkpoint 后,将该 rdd 的 dependency 全部清掉,并设定该 rdd 状态为 checkpointed。然后, 为该 rdd 强加一个依赖,设置该 rdd 的 parent rdd 为 CheckpointRDD, 该 CheckpointRDD 负责以后读取在文件系统上的 checkpoint 文件,生成该 rdd 的 partition。

有意思的是我在 driver program 里 checkpoint 了两个 rdd,结果只有一个(下面的 result)被 checkpoint 成功,pairs2 没有被 checkpoint,也不知道是 bug 还是故意只 checkpoint 下游的 RDD:

checkPoint是一种容错机制,当我们的程序需要很多transformation *** 作的时候,如果我们担心中间某些关键的后面会反复几次使用的RDD,可能会因为节点的故障,导致持久化数据的丢失,那么就可以针对该RDD额外启动checkpoint机制, 实现容错和高可用。

首先要调用 SparkContext的setCheckPointDir()方法,设置一个容错的文件系统的目录,比如HDFS; 然后对RDD调用checkpoint()方法。之后,在RDD所处的job运行结束之后,会启动一个单独的job,来将checkPoint的RDD的数据写入之前设置的文件系统,进行高可用、容错的类持久化 *** 作。

此时就算在后面使用RDD时,它的持久化的数据,不小心丢失了,但是还是可以从它的checkpoint文件中直接读取其数据,从而不需要重新计算。 (CacheManager)

答:首先使用SparkContextsetCheckpointDir() ,设置checkpoint的目录,然后使用RDDcheckpoin进行checkpoint。

剖析,当我们使用了checkpoint之后,发生的一系列 *** 作:

1、 对RDD调用了checkpoint()方法之后,它就接受RDDCheckpointData对象的管理。

2、 RDDCheckpointData对象,会负责将调用了checkpoint()方法的RDD的状态,设置为MarkedForCheckpoint。

3、 在RDD所在的那个job运行结束之后,会调用job中,最后一个RDD的doCheckPoint()方法,该方法沿着finalRDD的lineage向上查找,标记为MarkedForCheckpoint的RDD,并将其标记为CheckpointingInProgress。

4、 然后启动一个单独的job,来将lineage中,标记为CheckpointingInProgress的RDD,进行checkpoint的 *** 作,也就是将这个RDD写入到SparkcontextsetCheckpointDir()方法设置的文件系统中。

答:最主要的区别:

在于持久化,只是将数据保存在BlockManager中,但是rdd的lineage没有发生改变。

但是checkpoint执行完以后,rdd已经没有之前所谓的依赖rdd了,而只有一个强行为其设置的checkpointRDD,也就是说,checkpoint之后,rdd的lineage就改变了。

其次,持久化的数据丢失的可能性更大,无论是磁盘、或者是内存,都有可能丢失;但是checkpoint的数据,通常是保存在容错、高可用的文件系统中的,比如HDFS,依赖于这种高容错的文件西永,所以checkpoint的数据丢失可能性非常低。

答:如果一个RDD没有缓存,而且还设置了checkpoint,这样的 *** 作是很悲剧的,细想,本来当前RDD的这个job都执行结束了,但是由于中间的rdd没有持久化,那么checkpoint job想要将rdd 的数据写入到外部文件系统中的话,还得从rdd之前所有的rdd,全部重新计算一次,然后才能计算出rdd的数据,再将其checkpoint到外部的文件系统中,

所以我们通常建议,对要checkpoint的rdd使用persisit(StorageLevel_DISK_OMLY),该rdd计算之后,就直接将其持久化到磁盘上去,然后后面进行checkpoint *** 作是,直接从磁盘上读取rdd的数据,并checkpoint到外部文件系统即可,不需要重新计算一次rdd。

·速度快:Spark基于内存进行计算(当然也有部分计算基于磁盘,比如shuffle)。

·容易上手开发:Spark的基于RDD的计算模型,比Hadoop的基于Map-Reduce的计算模型要更加易于理解,更加易于上手开发,实现各种复杂功能,比如二次排序、topn等复杂 *** 作时,更加便捷。

·超强的通用性:Spark提供了Spark RDD、Spark SQL、Spark Streaming、Spark MLlib、Spark GraphX等技术组件,可以一站式地完成大数据领域的离线批处理、交互式查询、流式计算、机器学习、图计算等常见的任务。

·集成Hadoop:Spark并不是要成为一个大数据领域的“独裁者”,一个人霸占大数据领域所有的“地盘”,而是与Hadoop进行了高度的集成,两者可以完美的配合使用。Hadoop的HDFS、Hive、HBase负责存储,YARN负责资源调度;Spark负责大数据计算。实际上,Hadoop+Spark的组合,是一种“double win”的组合。

·极高的活跃度:Spark目前是Apache基金会的顶级项目,全世界有大量的优秀工程师是Spark的committer。并且世界上很多顶级的IT公司都在大规模地使用Spark。

1、丰富的数据开发经验,对数据处理、数据建模、数据分析等有深刻认识和实战经验。

2、熟悉SQL,有一定的SQL性能优化经验。

3、熟练掌握Java语言,MapReduce编程,脚本语言Shell/Python/Perl之一。

4、业务理解力强,对数据、新技术敏感,对云计算、大数据技术充满热情。

5、深入理解Map-Reduce模型,对Hadoop、Spark、Storm等大规模数据存储与运算平台有实践经验。

这五点因素并代表全部,只是为大家罗列出一些基础的技能,但这也能够给一些转行者提供一些方向。

0买高性能机器,增加节点

1设置磁盘文件预读值大小为16384,使用linux命令:

echo 16384 > /sys/block/{磁盘名}/queue/read_ahead_kb

2 Spark 任务序列化只支持JavaSerializer,数据序列化支持JavaSerializer和 KryoSerializer 。KryoSerializer能达到JavaSerializer的十倍。

3在sparkdriverextraJavaOptions和sparkexecutorextraJavaOptions配置项中添加参数:" -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ",如果频繁出现Full GC,需要优化GC。把RDD做Cache *** 作,通过日志查看RDD在内存中的大小,如果数据太大,需要改变RDD的存储级别来优化。

4一般并行度设置为集群CPU总和的2-3倍

5大表和小表做join *** 作时可以把小表Broadcast到各个节点,从而就可以把join *** 作转变成普通的 *** 作,减少了shuffle *** 作。

6 合理设计DAG,减少shuffle  //TODO

7使用 mapPartitions 可以更灵活地 *** 作数据,例如对一个很大的数据求TopN,当N不是很大时,可以先使用mapPartitions对每个partition求TopN,collect结果到本地之后再做排序取TopN。这样相比直接对全量数据做排序取TopN效率要高很多。

8当之前的 *** 作有很多filter时,使用 coalesce 减少空运行的任务数量

9当任务数过大时候Shuffle压力太大导致程序挂住不动,或者出现linux资源受限的问题。此时需要对数据重新进行分区,使用 repartition 。

10配置多个磁盘给 localDir ,shuffle时写入数据速度增快

11 别collect大数据量,数据会回到driver端,容易OOM。非要collect,请配置 sparksqlbigdatathriftServeruseHdfsCollect 为true,会存在hdfs再读

12尽量用reduceByKey,会在Map端做本地聚合

13 broadcase set/map而不是Iterator, set/map 查询效率O(1) ,iteratorO(n)

14 数据发生倾斜,repartition大法 ,查出key,salt it

15使用Hash Shuffle时,通过设置 sparkshuffleconsolidateFiles 为true,来合并shuffle中间文件,减少shuffle文件的数量,减少文件IO *** 作以提升性能

16Spark SQL 小表join,把小表broadcast出去。配置 sparksqlautoBroadcastJoinThreshold 和 sparksqlbigdatauseExecutorBroadcast 。小表在join 右端。

17SparkSQL数据倾斜,配置 sparksqlplannerskewJoin 和 sparksqlplannerskewJointhreshold

18 SparkSQL 小文件,配置 sparksqlsmallfilecombine 和  sparksqlsmallfilesplitsize

以上就是关于云计算它对当今的电子商务平台erp等企业信息管理系统有什么影响全部的内容,包括:云计算它对当今的电子商务平台erp等企业信息管理系统有什么影响、揭秘Spark_checkpoint、Spark的特点等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/8772810.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-21
下一篇 2023-04-21

发表评论

登录后才能评论

评论列表(0条)

保存