spark RDDd性分布式数据集

spark RDDd性分布式数据集,第1张

创建
val test=sc.textFile("file:///home/zl/word.txt")

用hdfs创建

val test2=sc.textFile("/spark/word.txt")
转换算子
filter(func) 过滤
map(func) 将每个元素传递到函数中,返回一个新的数据集
flatMap(func) 
groupByKey() 应用于(k,v)
reduceByKey(func) 汇聚

过滤

val lines =sc.textFile("/spark/word.txt")
val linesWithSpark=lines.filter(line=>line.contains("spark"))

map(func),将一行转换为一个数组

val lines =sc.textFile("/spark/word.txt")
val words=lines.map(line=>lines.split(" "))

flatMap(func),这个可以更加细化一点,将每行的每一个词都提取出来

val lines =sc.textFile("/spark/word.txt")
val words=lines.flatMap(line=>lines.split(" "))

groupByKey(),这个无参数,可以实现对每个单词的统计

val words =sc.textFile("/spark/word.txt")
val groupWords=words.groupByKey()

reduceByKey(func),对上一步进行汇总

val lines =sc.textFile("/spark/word.txt")
val words=lines.flatMap(line=>lines.split(" ")).map(word=>(word,1))
val reduceWords=words.reduceByKey((a,b)=>a+b)

行动算子

count()  返回数据集元素个数
first() 取第一个元素
take(n) 取n个元素
reduce(func) 聚合元素
collect() 以数组形式返回数据集中的元素
foreach(func) 将数据集中每个元素传递进入func

count()

val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd.count()

first()

addRdd.first()

take(n)

addRdd.take(3)

reduce(func) 统计

addRdd.reduce((a,b)=>a+b)

collect(),以数组形式返回所有元素

addRdd.colletc()

foreach(func) 用来遍历的

arrRdd.foreach(x=>println(x))
做一个简单的词频统计

这是一个txt文件

val lines =sc.textFile("/spark/word.txt")
val words = lines.flatMap(line=>line.split(" "))
val wordCount=words.map(word=>(word,1))
val word=wordCount.reduceByKey((a,b)=>a+b)

打印出来结果

RDD的分区

分区是为了减少通信开销,spark计算能力强,所以能用计算解决的事就不要走IO

RDD依赖关系

窄依赖:OneToOne,独生子女
父RDD的每一个分区只能被一个子RDD分区使用
当子RDD分区进行算子 *** 作,因为某个分区 *** 作失败导致数据丢失时,只需要重新对父RDD中对应的分区进行算子 *** 作即可恢复数据

宽依赖:OneToMany

RDD机制

1.持久化机制
spark中,RDD采用惰性求值,即每次调用行动算子 *** 作,都会从头开始计算,这就会带来很高的代价,所以为了避免重复计算,可以让spark对数据集进行持久化。这样速度更快,通常超10倍提速

两种方式:

cache(),persist(存储级别)

persist

import org.apache.spark.storage.StorageLevel
val list= List("hadoop","spark","hive")
val listRDD=sc.parallelize(list)
listRDD.persist(StorageLevel.DISK_ONLY)
println(listRDD.count())
println(listRDD.collect().mkString(","))

cache


val list= List("hadoop","spark","hive","zlei")
val listRDD=sc.parallelize(list)
listRDD.cache()
println(listRDD.count())
println(listRDD.collect().mkString(","))
容错机制

RDD提供两种故障恢复的方式,
1,血统lineage,通过RDD之间的依赖关系进行数据恢复,宽窄依赖恢复难度和过程不同,这是通过计算进行恢复,容错
2,设置检查点checkpoint,本质上是通过存储进行容错,将RDD写入磁盘

根据依赖关系进行调度阶段的划分

对于窄依赖,RDD分区的转换处理是在一个线程里完成的,所以窄依赖会被spark划分到同一个stage中,
对于宽依赖,由于有shuffle的存在,所以只能在父RDD处理完成后,下一个stage才能开始接下来的 *** 作。
因此宽依赖是划分stage的依据

流程
RDD Objects
DAGScheduler
TaskScheduler
worker

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/719347.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-25
下一篇 2022-04-25

发表评论

登录后才能评论

评论列表(0条)

保存