val test=sc.textFile("file:///home/zl/word.txt")
用hdfs创建
val test2=sc.textFile("/spark/word.txt")
转换算子
filter(func) 过滤
map(func) 将每个元素传递到函数中,返回一个新的数据集
flatMap(func)
groupByKey() 应用于(k,v)
reduceByKey(func) 汇聚
过滤
val lines =sc.textFile("/spark/word.txt")
val linesWithSpark=lines.filter(line=>line.contains("spark"))
map(func),将一行转换为一个数组
val lines =sc.textFile("/spark/word.txt")
val words=lines.map(line=>lines.split(" "))
flatMap(func),这个可以更加细化一点,将每行的每一个词都提取出来
val lines =sc.textFile("/spark/word.txt")
val words=lines.flatMap(line=>lines.split(" "))
groupByKey(),这个无参数,可以实现对每个单词的统计
val words =sc.textFile("/spark/word.txt")
val groupWords=words.groupByKey()
reduceByKey(func),对上一步进行汇总
val lines =sc.textFile("/spark/word.txt")
val words=lines.flatMap(line=>lines.split(" ")).map(word=>(word,1))
val reduceWords=words.reduceByKey((a,b)=>a+b)
行动算子
count() 返回数据集元素个数
first() 取第一个元素
take(n) 取n个元素
reduce(func) 聚合元素
collect() 以数组形式返回数据集中的元素
foreach(func) 将数据集中每个元素传递进入func
count()
val arrRdd=sc.parallelize(Array(1,2,3,4,5))
arrRdd.count()
first()
addRdd.first()
take(n)
addRdd.take(3)
reduce(func) 统计
addRdd.reduce((a,b)=>a+b)
collect(),以数组形式返回所有元素
addRdd.colletc()
foreach(func) 用来遍历的
arrRdd.foreach(x=>println(x))
做一个简单的词频统计
这是一个txt文件
val lines =sc.textFile("/spark/word.txt")
val words = lines.flatMap(line=>line.split(" "))
val wordCount=words.map(word=>(word,1))
val word=wordCount.reduceByKey((a,b)=>a+b)
打印出来结果
分区是为了减少通信开销,spark计算能力强,所以能用计算解决的事就不要走IO
RDD依赖关系窄依赖:OneToOne,独生子女
父RDD的每一个分区只能被一个子RDD分区使用
当子RDD分区进行算子 *** 作,因为某个分区 *** 作失败导致数据丢失时,只需要重新对父RDD中对应的分区进行算子 *** 作即可恢复数据
宽依赖:OneToMany
RDD机制1.持久化机制
spark中,RDD采用惰性求值,即每次调用行动算子 *** 作,都会从头开始计算,这就会带来很高的代价,所以为了避免重复计算,可以让spark对数据集进行持久化。这样速度更快,通常超10倍提速
两种方式:
cache(),persist(存储级别)
persist
import org.apache.spark.storage.StorageLevel
val list= List("hadoop","spark","hive")
val listRDD=sc.parallelize(list)
listRDD.persist(StorageLevel.DISK_ONLY)
println(listRDD.count())
println(listRDD.collect().mkString(","))
cache
val list= List("hadoop","spark","hive","zlei")
val listRDD=sc.parallelize(list)
listRDD.cache()
println(listRDD.count())
println(listRDD.collect().mkString(","))
容错机制
RDD提供两种故障恢复的方式,
1,血统lineage,通过RDD之间的依赖关系进行数据恢复,宽窄依赖恢复难度和过程不同,这是通过计算进行恢复,容错
2,设置检查点checkpoint,本质上是通过存储进行容错,将RDD写入磁盘
对于窄依赖,RDD分区的转换处理是在一个线程里完成的,所以窄依赖会被spark划分到同一个stage中,
对于宽依赖,由于有shuffle的存在,所以只能在父RDD处理完成后,下一个stage才能开始接下来的 *** 作。
因此宽依赖是划分stage的依据
RDD Objects
DAGScheduler
TaskScheduler
worker
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)