Spark(五)

Spark(五),第1张

Spark(五)

5.2 双Value类型

5.2.1 union

  1. 作用:对源 RDD 和参数 RDD 求并集后返回一个新的 RDD

#(1)创建第一个 RDD
scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at :24
​
#(2)创建第二个 RDD
scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at :24
​
#(3)计算两个 RDD 的并集
scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[25] at union at :28
​
#(4)打印并集结果
scala> rdd3.collect()
res18: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)

5.2.2 subtract

  1. 作用:计算差的一种函数,去除两个 RDD 中相同的元素,不同的 RDD 将保留下来

  2. 需求:创建两个 RDD,求第一个 RDD 与第二个 RDD 的差集

#(1)创建第一个 RDD
scala> val rdd = sc.parallelize(3 to 8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at :24
​
#(2)创建第二个 RDD
scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at :24
​
#(3)计算第一个 RDD 与第二个 RDD 的差集并打印
scala> rdd.subtract(rdd1).collect()
res27: Array[Int] = Array(8, 6, 7)

5.2.3 intersection

  1. 作用:对源 RDD 和参数 RDD 求交集后返回一个新的 RDD

  2. 需求:创建两个 RDD,求两个 RDD 的交集

#(1)创建第一个 RDD
scala> val rdd1 = sc.parallelize(1 to 7)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at :24
​
#(2)创建第二个 RDD
scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at :24
​
#(3)计算两个 RDD 的交集
scala> val rdd3 = rdd1.intersection(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at intersection at :28
​
#(4)打印计算结果
scala> rdd3.collect()
res19: Array[Int] = Array(5, 6, 7)

5.2.4 sample

sample 根据要求从全量数据中随机抽样

  1. 三个参数:withReplacement: true为可放回的随机抽样 false为不可放回的随机抽样 ​ fraction: 设置比例 ​ seed: 种子数

  2. 需求:随机抽取网站日志时间统计

package com.zch.spark.core.exercise
​
import org.apache.spark.{SparkConf, SparkContext}
​

object Exercise_SparkCoreDemo07 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("demo7")
​
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("C:\Users\Administrator\Desktop\test")
​
    rdd.sample(false,0.5,1)
      .map(line => {
        val str = line.split(" ")(3).substring(1, 12)
        (str,1)
      })
      .reduceByKey(_+_)
      .foreach(println)
    //(09/Nov/2021,216)
    //(08/Nov/2021,23458)
    //(31/Oct/2021,4775)
    //(02/Nov/2021,6114)
    //(01/Nov/2021,7408)
    //(03/Nov/2021,6659)
    //(04/Nov/2021,6723)
    //(05/Nov/2021,7999)
    //(06/Nov/2021,4284)
    //(07/Nov/2021,4155)
  }
}

5.2.5 combineByKey

combineByKey :

可以改变元素聚合的value的类型

方法的输入元素类型如果和输出元素类型一致的话,相当于reduceByKey

val rdd2 = rdd1 combineByKey[Long](
  (v: Int) => v.toLong,
  (c: Long, v: Int) => c + v,
  (c1: Long, c2: Long) => c1 + c2
)
rdd2.foreach(println)
​
//  相当于reduceByKey
rdd1.combineByKey[Int](
  (v: Int) => v,
  (c: Int, v: Int) => c + v,
  (c1: Int, c2: Int) => c1 + c2
)
  .foreach(println)

5.2.6 sortBy 和 sortByKey

sortBy底层还是调用sortByKey方法,所以两者的运行效率差不多

不同的是,sortBy可以根据原集合中不存在的key值进行排序

,sortByKey只能根据集合中存在的key值进行排序

package com.zch.spark.core
​
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
​

object SparkCoreDemo09_transformations {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[1]")
      .setAppName("demo9")
​
    val sc = new SparkContext(conf)
​
​
    val rdd: RDD[(String, Int, Double, Int)] = sc.makeRDD(List(
      ("amos", 18, 10.25, 178),
      ("tom", 28, 2.5, 50),
      ("jerry", 16, 5000.0, 20)
    ))
    // 排序
    //    rdd.sortBy()
    //  def sortBy[K](
    //      f: (T) => K,
    //      ascending: Boolean = true,
    //      numPartitions: Int = this.partitions.length)
    rdd.sortBy(t => t._2 * t._3)
      .foreach(println)
​
​
    //  rdd.sortByKey()
    rdd.map(t => {
      (t._3, t)
    })
      .sortByKey(numPartitions = 1)
      .map(_._2)
    //      .foreach(println)
​
​
  }
}
​

5.2.7 join

RDD1.join(RDD2):保存两集合都有的部分

RDD1.leftOuterJoin(RDD2):以左集合为准,RDD1集合中有的值,RDD2中没有,没有的会生成none,有的生成some

RDD1.rightOuterJoin(RDD2):与左连接同理

RDD1.fullOuterJoin(RDD2):为满链接,两集合中的元素都会展示,有的some,没有的none

// join
//  RDD[(K, V)].join
//def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] =
//def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] =
//def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] =
//  def fullOuterJoin[W](other: RDD[(K, W)]) : RDD[(K, (Option[V], Option[W]))] =
val rdd1: RDD[(String, Int)] = sc.makeRDD(List("k1" -> 1, "k2" -> 2))
val rdd2: RDD[(String, Double)] = sc.makeRDD(List("k1" -> 1.1, "k3" -> 2.2))
​
rdd1.join(rdd2)
    .foreach(println)//(k1,(1,1.1))
​
rdd1.leftOuterJoin(rdd2)
    .foreach(println)//(k2,(2,None))
                      //(k1,(1,Some(1.1)))
​
rdd1.fullOuterJoin(rdd2)
      .foreach(println)//(k2,(Some(2),None))
                        //(k3,(None,Some(2.2)))
                        //(k1,(Some(1),Some(1.1)))

5.2.8 groupByKey 和 cogroup

groupByKey:根据key值,将相同的分为一组,如果key没有相同的,则不显示

cogroupBy:没有相同的key值,显示为空,完美解决没有相同key值就不显示的问题

val rdd3 = sc.makeRDD(List(
  "huawei" -> 8754,
  "huawei" -> 53465,
  "xiaomi" -> 678,
  "xiaomi" -> 235,
  "xiaomi" -> 2388,
  "vivo" -> 678,
  "vivo" -> 456,
  "vivo" -> 7489,
  "oppo" -> 234
))
​
val rdd4 = sc.makeRDD(List(
  "huawei" -> 875,
  "huawei" -> 53465,
  "huawei" -> 111,
  "xiaomi" -> 678,
  "xiaomi" -> 235,
  "oppo" -> 234
))
​
val rdd5 = sc.makeRDD(List(
  "htc" -> 875,
  "nokia" -> 53465
))
​
//def groupByKey(): RDD[(K, Iterable[V])] =
val rdd31 = rdd3.groupByKey()
  .map(t => (t._1, t._2.toList))
  .foreach(println)//(oppo,List(234))
                    //(vivo,List(678, 456, 7489))
                    //(huawei,List(8754, 53465))
                    //(xiaomi,List(678, 235, 2388))

5.2.9 cartesian 笛卡尔积

//   def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] =
val rdd6 = sc.makeRDD(List("油泼面", "饺子", "火锅", "回锅肉"))
val rdd7 = sc.makeRDD(List("冰峰", "酸梅汤", "可乐"))
​
rdd6.cartesian(rdd7)
     .foreach(println)
//(油泼面,冰峰)
//(油泼面,酸梅汤)
//(饺子,冰峰)
//(油泼面,可乐)
//(饺子,酸梅汤)
//(饺子,可乐)
//(火锅,冰峰)
//(回锅肉,冰峰)
//(火锅,酸梅汤)
//(火锅,可乐)
//(回锅肉,酸梅汤)
//(回锅肉,可乐)
​

5.2.10 pipe

pipe可以接收一个系统命令(shell命令 shell脚本 windows dos命令) 将原RDD中每一个元素 传入命令或者脚本作为参数 并将脚本执行的输出结果收集到新的RDD并返回

#!/bin/bash
​
echo "开始执行任务"
​
while read X
    do
        echo "接收到$X"
    done
echo "执行结束"
val rdd = sc.makeRDD(1 to 10)
rdd.pipe("/root/test.sh").collect

结果为:

  

5.2.11 mapValues

针对于(K,V)形式的类型只对V进行 *** 作

scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at :24
​
scala> rdd3.mapValues(_+"|||").collect()
res26: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))

5.2.12 subtract

计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来

scala> val rdd = sc.parallelize(3 to 8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at :24
​
scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at :24
​
scala> rdd.subtract(rdd1).collect()
res27: Array[Int] = Array(8, 6, 7)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5677491.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存