1.算子:RDD的方法就叫算子
RDD:spark中分区的集合
textFile(“文件路径”)
parallilize(数组/元组/map等一系列集合)
2.spark中算子分类:
(1)Transformations类算子:不能自己执行,需要Action类算子。
flatMap,map,sortBy,sortByKey,mapToPair,reduceByKey
(2)Action类算子:触发Transformation类算子执行
3.Transformation 类算子:
(1)filter 过滤符合条件的算子, true保留,false过滤掉
(2)map 将每个数据项通过map函数映射变为一个新的元素
(3)flatMap将每个输入项映射为0到多个输出项
val value: RDD[String] = fileRDD.flatMap(_.split(" "))
(4)reduceByKey 将相同的key进行处理
val value2: RDD[(String, Int)] = value1.reduceByKey((v1: Int, v2: Int) => {v1 + v2})
(5)sample:随机抽样
参数解释:boolean 是否放回
fraction:double 抽样比例
seed:long 任意定义,随机抽样后下次运行抽样不会改变
(6)sortByKey/sortBy 排序
object filterWC { def main(args: Array[String]): Unit = { val conf=new SparkConf() conf.setMaster("local") conf.setAppName("filter") val sc=new SparkContext(conf) val lines: RDD[String] = sc.textFile("D:\BigData\spark\filterWC\src\main\data\word") val value: RDD[String] = lines.filter(line => line.equals("hello spark")) //输出所有hello spark val value1: RDD[String] = lines.sample(false, 0.5) } }
4.Action类算子
(1)count() 统计RDD个数
(2)take(Int) 取出前Int条数据
(3)first() 取出第一条数据
(4)foreach
(5)collect() 将所有结果回收到Driver端
object countWC { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("count") conf.setMaster("local") val context: SparkContext = new SparkContext(conf) val value: RDD[String] = context.textFile("D:\BigData\spark\filterWC\src\main\data\word") val value1: RDD[Int] = context.parallelize(1 to 10) value.count() val strings: Array[String] = value.take(3) val str: String = value.first() val strings1: Array[String] = value.collect() value1.reduce((x, y) => x + y) } }
5.持久化算子cache
作用:将RDD的数据持久化到内存中。cacahe是懒执行
object cacheWC { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("count") conf.setMaster("local") val context: SparkContext = new SparkContext(conf) val value: RDD[String] = context.textFile("D:\BigData\spark\filterWC\src\main\data\word") val value1: value.type = value.cache() val l1: Long = value1.count() //从磁盘中取 val l2:Long=value1.count() //从内存中取 } }
由于是懒执行,所以第一次cache算子没有执行,第二次由于count是触发算子,所以是从内存中取
6.持久化算子persisit
可以指定持久化的级别StorageLevel。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。
MEMORY_ONLY:仅仅使用内存。
MEMORY_AND_DISK:使用内存和磁盘
DISK_ONLY:仅仅使用磁盘
value= value.persist(StorageLevel.MEMORY_AND_DISK)
cache()相当于MEMORY_ONLY
7.持久化算子checkpoint
只放到磁盘
8.例题
要求: 1.切分单词,找出出现次数最多的单词
2.过滤出现次数最多的单词,对剩余的单词进行统计wordcount
3.按照出现次数由大到小排序
package zjc.bigdata import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object homework01 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("homework01") conf.setMaster("local") val context: SparkContext = new SparkContext(conf) val value: RDD[String] = context.textFile("D:\BigData\spark\filterWC\src\main\data\word") val tuple: (String, Int) = value.flatMap(line => line.split(" ")) .map(word => new Tuple2(word, 1)) .reduceByKey((v1: Int, v2: Int) => { v1 + v2 }) .sortBy(_._2, false) .first() //(hello,11) value.flatMap(line => line.split(" ")) .map(word => new Tuple2(word, 1)) .reduceByKey((v1: Int, v2: Int) => {v1 + v2}) .sortBy(_._2, false) .filter(word=>{!tuple.equals(word)}).foreach(println) } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)