import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("stone").setMaster("local") val sc = new SparkContext(conf) sc.setLogLevel("ERROR") val v1 = sc.parallelize(List(1, 2, 3, 4)) val v2 = sc.parallelize(List(6, 5, 3, 4)) v1.foreach(println) v2.foreach(println) println("--------------union-----------") v1.union(v2).foreach(println) println("--------------distinct-----------") v1.union(v2).distinct().foreach(println) println("--------------fileter-----------") v1.union(v2).distinct().filter(_ >=3).foreach(print) println("--------------差集-----------") v1.subtract(v2).foreach(print) println("--------------交集-----------") v1.intersection(v2).foreach(print) println("--------------dikaer-----------") v1.cartesian(v2).foreach(print) println(v1.partitions.size) println("--------------=====-----------") val kv1: RDD[(String, Int)] = sc.parallelize(List( ("zhang", 11), ("zhangsan", 12), ("lisi", 13), ("wangwu", 14) )) val kv2: RDD[(String, Int)] = sc.parallelize(List( ("zhan", 21), ("zhang", 22), ("lisi", 23), ("zhaoliu", 28) )) val cogroup: RDD[(String, (Iterable[Int], Iterable[Int]))] = kv1.cogroup(kv2) cogroup.foreach(println) val join: RDD[(String, (Int, Int))] = kv1.join(kv2) join.foreach(println) println("--------------=====-----------") val left: RDD[(String, (Int, Option[Int]))] = kv1.leftOuterJoin(kv2) left.foreach(println) println("--------------=====-----------") val right: RDD[(String, (Option[Int], Int))] = kv1.rightOuterJoin(kv2) right.foreach(println) println("--------------=====-----------") val full: RDD[(String, (Option[Int], Option[Int]))] = kv1.fullOuterJoin(kv2) full.foreach(println) } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)