value型:
输入输出一对一:map flatMap mapPartitions
输入输出多对一:union cartesian
输入输出多对多:groupBy
输出为输入子集:filter distinct subtract sample takeSample
cache型:cache persist
key-value型:
一对一:mapValues
单个RDD聚集:combineByKey reduceByKey partitionBy
两个RDD:cogroup
连接:join leftOutJoin rightOutJoin
无输出(不到hdfs、本地):foreach
HDFS:saveAsTextFile saveAsObjectFile
scala集合数据类型:collect collectAsMap reduceByKeyLocally lookup count
top reduce fold aggregate
在Spark中创建RDD方式大概可以分为三种:
(1)、从集合中创建RDD;(2)、从外部存储创建RDD;(3)、从其他RDD创建。
从集合中创建RDD,Spark主要提供了两中函数:parallelize和makeRDD。
- map 给定输入映射成新的元素输出
val first = sc.parallelize(1 to 5,2) first.map(1 to _).collect val first = sc.parallelize(List("Hello","World","学习","大数据"),2) first.map(_.length).collect
-
flatMap 给定输入,将结果打平成一维集合
-
mapPartitions 以分区为单位进行计算处理,在map过程中需要频繁创建额外对象是,如文件输出流 *** 作、jdbc *** 作用mapPartitions
val rdd=sc.parallelize(Seq(1,2,3,4,5),3) var rdd2=rdd.mapPartitions(partition=>{ //在此处加入jdbc等一次初始化多次使用的代码 partition.map(num => num * num) } ) rdd2.max
- glom 将每个分区的值形成一个数组
12 34 567 先平均分,分完余数再从后往前分。
val a = sc.parallelize(Seq("one","two","three","four","five","six","seven"),3) a.glom.collect 结果: Array[Array[String]] = Array(Array(one, two), Array(three, four), Array(five, six, seven))
- union 两个RDD合并成一个RDD
val a = sc.parallelize(1 to 4, 2) val b = sc.parallelize(3 to 6, 2) a.union(b).collect (a ++ b).collect (a union b).collect
- groupBy 分组
val a = sc.parallelize(Seq(1,3,4,5,9,100,200), 3) a.groupBy(x => { if (x > 10) ">10" else "<=10" }).collect
- filter 输出为输入分区的子集(连接作用,输出需要用到输入)
val a = sc.parallelize(1 to 21, 3) val b = a.filter(_ % 4 == 0) b.collect
- distinct 去重
val c = sc.parallelize(List("张三", "李四", "李四", "王五"), 2) c.distinct.collect
- cache 缓存到内存,RDD需要反复使用
val c=a.union(b).cache c.count c.distinct().collect2.2 k-v型转换算子
- mapValue 仅对value进行处理
val first = sc.parallelize(List(("张一",1),("张二",2),("张三",3),("张四",4)),2) val second= first.mapValues(x=>x+1) second.collect
- combineByKey ,key对应的value形成一个集合
val first = sc.parallelize(List(("张一",1),("李一",1),("张一",2),("张一",3),("李一",3),("李三",3),("张四",4)),2) val second= first.combineByKey(List(_), (x:List[Int], y:Int) => y :: x, (x:List[Int], y:List[Int]) => x ::: y) second.collect 结果: Array[(String, List[Int])] = Array((李一,List(1, 3)), (张一,List(2, 1, 3)), (张四,List(4)), (李三,List(3)))
- reduceByKey 按key聚合
val second = first.map(x => (x, 1)) second.reduceByKey(_ + _).collect
- join 按key进行join *** 作,最后将v做flatmap打平 *** 作
first.join(second).collect2.3 行动算子
-
foreach
- saveAsTextFile
first.saveAsTextFile("file:///home/spark/text") //指定hdfs保存的目录,默认路径是hdfs系统中/user/当前用户路径下 first.saveAsTextFile("spark_shell_output_1")
-
collect 将分布式RDD返回Driver节点,即可本地输出
-
collectAsMap map版的collect
-
lookup 指定key对应元素形成的seq
val first = sc.parallelize(List("小米", "华为", "华米", "大米", "苹果","米老鼠"), 2) val second=first.map(x=>({if(x.contains("米")) "有米" else "无米"},x)) second.lookup("有米")
- reduce 聚合迭代,再 *** 作
//求value型列表的和 val a = sc.parallelize(1 to 10, 2) a.reduce(_ + _) //求key-value型列表的value的和 val a = sc.parallelize(List(("one",1),("two",2),("three",3),("four",4)), 2) a.reduce((x,y)=>("sum",x._2 + y._2))._2
- fold 先对rdd分区的每一个分区进行op函数,在调用op函数
//和是61,公式=(1+2+10)+(3+4+10)+(5+6+10)+10=61 sc.parallelize(List(1, 2, 3, 4, 5, 6), 3).fold(10)(_+_)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)