Spark-尚硅谷5- 1数据结构:RDD 转换算子

Spark-尚硅谷5- 1数据结构:RDD 转换算子,第1张

Spark-尚硅谷5- 1数据结构:RDD 转换算子 5.1.4.3 RDD 转换算子

算子 : Operator( *** 作)
RDD的方法和Scala集合对象的方法不一样
Scala集合对象的方法都是在同一个节点的内存中完成的。
RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行, 为了和Scala集合对象区分不同的处理效果,所以将RDD的方法称之为算子。
RDD的方法外部的 *** 作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。

RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型

Value 类型 map 转换


map *** 作只能是来一个计算一个,出去一个

map: 转换映射功能,及把一个数据转换成一个新的数据, 把A变成B

匿名函数:只关系逻辑不关心方法名,能省则省,参数名可以推断出来及不需要声明参数类型,逻辑只有一句话则不需要{},入参只一个可以不写()

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - map

        val rdd = sc.makeRDD(List(1,2,3,4))
        // 1,2,3,4
        // 2,4,6,8

        // 转换函数:需要可以转入参数,再将改变后的参数输出
        def mapFunction(num:Int): Int = {
            num * 2
        }

        //val mapRDD: RDD[Int] = rdd.map(mapFunction)
        //val mapRDD: RDD[Int] = rdd.map((num:Int)=>{num*2})
        //val mapRDD: RDD[Int] = rdd.map((num:Int)=>num*2)
        //val mapRDD: RDD[Int] = rdd.map((num)=>num*2)
        //val mapRDD: RDD[Int] = rdd.map(num=>num*2)
        val mapRDD: RDD[Int] = rdd.map(_*2)

        mapRDD.collect().foreach(println)

        sc.stop()

    }
}

小功能:从服务器日志数据 apache.log 中获取用户请求 URL 资源路径

apache.log:

83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js
83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
83.149.9.216 - - 17/05/2015:10:05:57 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Bold.ttf

代码实现

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform_Test {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - map
        val rdd = sc.textFile("datas/apache.log")

        // 长的字符串
        // 短的字符串
        val mapRDD: RDD[String] = rdd.map(
            line => {
                val datas = line.split(" ")
                datas(6)
            }
        )
        mapRDD.collect().foreach(println)

        sc.stop()

    }
}


执行顺序

// 1. rdd的计算一个分区内的数据是一个一个执行逻辑
只使用一个分区,只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。
分区内数据的执行是有序的。
// 2. 不同分区数据计算是无序的。

RDD所有算子转换 *** 作转换后的数据分区不变,除分组之外
不因为算子 *** 作后数据进去其他分区,提高计算效率。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform_Part {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - map
        val rdd = sc.makeRDD(List(1,2,3,4),2)
        // 分区1【1,2】,分区2【3,4】
        rdd.saveAsTextFile("output")
        val mapRDD = rdd.map(_*2)
        //转换之后的数据还是存在原来的分区
        // 分区1【2,4】,分区2【6,8】
        mapRDD.saveAsTextFile("output1")

        sc.stop()

    }
}

mapValues

mapValues(func)

功能:对键值对每个value都应用一个函数,但是,key不会发生变化。

示例

val list = List("hadoop","spark","hive","spark")
val rdd = sc.parallelize(list)
val pairRdd = rdd.map(x => (x,1))
pairRdd.mapValues(_+1).collect.foreach(println)//对每个value进行+1

结果

(hadoop,2)
(spark,2)
(hive,2)
(spark,2)
mapPartitions
  • map *** 作只能是来一个计算一个,出去一个没有缓存的概念及来一批数据处理数据性能不高。
  • mapPartitions是先拿到一个分区内的所有数据再做处理, 及mapPartitions : 可以以分区为单位进行数据转换 *** 作, 但是会将整个分区的数据加载到内存进行引用, 如果处理完的数据是不会被释放掉,存在对象的引用。 在内存较小,数据量较大的场合下,容易出现内存溢出。
  • MapPartitions 算子需要传递一个迭代器,返回一个迭代器
val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
 datas => {
 datas.filter(_==2)
 } )
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - mapPartitions
        val rdd = sc.makeRDD(List(1,2,3,4), 2)

        // mapPartitions : 可以以分区为单位进行数据转换 *** 作
        //                 但是会将整个分区的数据加载到内存进行引用
        //                 如果处理完的数据是不会被释放掉,存在对象的引用。
        //                 在内存较小,数据量较大的场合下,容易出现内存溢出。
        val mpRDD: RDD[Int] = rdd.mapPartitions(
            iter => {
                println(">>>>>>>>>>")
                iter.map(_ * 2)
            }
        )
        mpRDD.collect().foreach(println)
        sc.stop()

    }
}

小功能:获取每个数据分区的最大值

map 和 mapPartitions 的区别

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_Operator_Transform_Test {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - mapPartitions
        val rdd = sc.makeRDD(List(1,2,3,4), 2)

        // 【1,2】,【3,4】
        // 【2】,【4】
        val mpRDD = rdd.mapPartitions(
            iter => {
                List(iter.max).iterator
            }
        )
        mpRDD.collect().foreach(println)

        sc.stop()

    }
}

mapPartitionsWithIndex

根据分区号选择计算哪个分区数据

val dataRDD1 = dataRDD.mapPartitionsWithIndex(
 (index, datas) => {
 datas.map(index, _)
 } )

小功能:获取第二个数据分区的数据

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - mapPartitions
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        // 【1,2】,【3,4】
        val mpiRDD = rdd.mapPartitionsWithIndex(
            (index, iter) => {
                //如果分区号是1计算
                if ( index == 1 ) {
                    iter
                } else {//其他分布不计算
                    Nil.iterator
                }
            }
        )
        mpiRDD.collect().foreach(println)


        sc.stop()

    }
}

查看数据在哪个分区

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_Operator_Transform1 {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - mapPartitions
        val rdd = sc.makeRDD(List(1,2,3,4))

        val mpiRDD = rdd.mapPartitionsWithIndex(
            (index, iter) => {
                // 1,   2,    3,   4
                //(0,1)(2,2),(4,3),(6,4)
                //查看数据在哪个分区
                iter.map(
                    num => {
                        (index, num)
                    }
                )
            }
        )

        mpiRDD.collect().foreach(println)


        sc.stop()

    }
}

flatMap 扁平化映射

flatMap: 把一条数据拆分成一个一个个体使用

val dataRDD = sparkContext.makeRDD(List(
 List(1,2),List(3,4)
),1)
val dataRDD1 = dataRDD.flatMap(
 list => list
)
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Transform1 {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - flatMap
        val rdd: RDD[String] = sc.makeRDD(List(
            "Hello Scala", "Hello Spark"
        ))

        val flatRDD: RDD[String] = rdd.flatMap(
            s => {
                s.split(" ")
            }
        )
        flatRDD.collect().foreach(println)
        sc.stop()

    }
}

小功能:将 List(List(1,2),3,List(4,5))进行扁平化 *** 作

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Transform2 {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - flatMap
        val rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))

        val flatRDD = rdd.flatMap(
            data => {
                data match {
                    case list:List[_] => list
                    case dat => List(dat)
                }
            }
        )

        flatRDD.collect().foreach(println)
        sc.stop()

    }
}

glom 聚合

glomg和flatMap的作用相反

flatMap把一个拆成多个 list->int
glomg把多个合成一个 int -> array

val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4
),1)
val dataRDD1:RDD[Array[Int]] = dataRDD.glom()
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark05_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - glom
        val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)

        // List => Int
        // Int => Array
        //返回的是一个数组
        val glomRDD: RDD[Array[Int]] = rdd.glom()

        glomRDD.collect().foreach(data=> println(data.mkString(",")))
        sc.stop()

    }
}

小功能:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark05_RDD_Operator_Transform_Test {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - glom
        val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)

        // 【1,2】,【3,4】
        // 【2】,【4】
        // 【6】
        //把一个分区中数据放入一个数组中
        val glomRDD: RDD[Array[Int]] = rdd.glom()

        //在取出每个数组中的最大值
        val maxRDD: RDD[Int] = glomRDD.map(
            array => {
                array.max
            }
        )
        //求和
        println(maxRDD.collect().sum)
        sc.stop()

    }
}
groupBy 分组


// groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
// 相同的key值的数据会放置在一个组中
分组和分区没有必然的关系,分组后的数据可能被放在一个分区,也可能放在多个分区,及分组会把数据打乱再重新组合

val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.groupBy(
 _%2
)

奇偶分组

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - groupBy
        val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)

        // groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
        // 相同的key值的数据会放置在一个组中
        def groupFunction(num:Int) = {
            //取模以后的结果当做key,及根据0,1奇偶分组
            num % 2
        }
        val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
        groupRDD.collect().foreach(println)
        
        sc.stop()

    }
}

(0,CompactBuffer(2, 4))
(1,CompactBuffer(1, 3))

根据每个单词首字母分区
❖ 小功能:将 List(“Hello”, “hive”, “hbase”, “Hadoop”)根据单词首写字母进行分组。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Transform1 {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - groupBy
        val rdd  = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)

        // 分组和分区没有必然的关系
        //根据每个单词首字母分区
        val groupRDD = rdd.groupBy(_.charAt(0))

        groupRDD.collect().foreach(println)
        
        sc.stop()

    }
}
(H,CompactBuffer(Hello, Hadoop))
(S,CompactBuffer(Spark, Scala))

❖ 小功能:从服务器日志数据 apache.log 中获取每个小时问量。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Transform_Test {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - groupBy
        val rdd = sc.textFile("datas/apache.log")

        val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
            line => {
                val datas = line.split(" ")
                val time = datas(3)
                //time.substring(0, )
                val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
                val date: Date = sdf.parse(time)
                val sdf1 = new SimpleDateFormat("HH")
                val hour: String = sdf1.format(date)
                (hour, 1)
            }
        ).groupBy(_._1)
        timeRDD.map{
            case ( hour, iter ) => {
                (hour, iter.size)
            }
        }.collect.foreach(println)

        sc.stop()

    }
}

❖ 小功能:WordCount。
groupBay也可是实现wordConut ,先根据word分区再计算数组的长度。

filter 过滤

val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4
),1)
val dataRDD1 = dataRDD.filter(_%2 == 0)

过滤掉奇数,保留偶数

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark07_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - filter
        val rdd = sc.makeRDD(List(1,2,3,4))

        //为true数据保留
        val filterRDD: RDD[Int] = rdd.filter(num=>num%2!=0)

        filterRDD.collect().foreach(println)

        sc.stop()

    }
}

小功能:从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的请求路径

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark07_RDD_Operator_Transform_Test {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - filter
        val rdd = sc.textFile("datas/apache.log")

        rdd.filter(
            line => {
                val datas = line.split(" ")
                val time = datas(3)
                //判断是否以"17/05/2015"开头
                time.startsWith("17/05/2015")
            }
        ).collect().foreach(println)

        sc.stop()

    }
}

sample 抽样

        // sample算子需要传递三个参数
        // 1. 第一个参数表示,抽取数据后是否将数据返回 true(放回),false(丢弃)
        // 2. 第二个参数表示,
        //         如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念
        //         如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数
        // 3. 第三个参数表示,抽取数据时随机算法的种子
        //                    如果不传递第三个参数,那么使用的是当前系统时间
//        println(rdd.sample(
//            false,
//            0.4
//            //1
//        ).collect().mkString(","))
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4
),1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不
要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.{SparkConf, SparkContext}

object Spark08_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - filter
        val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))

        // sample算子需要传递三个参数
        // 1. 第一个参数表示,抽取数据后是否将数据返回 true(放回),false(丢弃)
        // 2. 第二个参数表示,
        //         如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念
        //         如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数
        // 3. 第三个参数表示,抽取数据时随机算法的种子
        //                    如果不传递第三个参数,那么使用的是当前系统时间
//        println(rdd.sample(
//            false,
//            0.4
//            //1
//        ).collect().mkString(","))

        println(rdd.sample(
            false,
            0.4
            //1
        ).collect().mkString(","))

        sc.stop()

    }
}

3,4,8
distinct 去重

val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),1)
val dataRDD1 = dataRDD.distinct()
val dataRDD2 = dataRDD.distinct(2)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark09_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - filter
        val rdd = sc.makeRDD(List(1,2,3,4,1,2,3,4))

        // map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)

        // (1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null)
        // (1, null)(1, null)(1, null)
        // (null, null) => null
        // (1, null) => 1
        val rdd1: RDD[Int] = rdd.distinct()

        rdd1.collect().foreach(println)

        sc.stop()

    }
}

coalesce 缩减分区

多次过滤后每个分区的数据都少了,需要合并数据缩减分区

val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),6)
val dataRDD1 = dataRDD.coalesce(2)
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark10_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - filter
        val rdd = sc.makeRDD(List(1,2,3,4,5,6), 3)

        // coalesce方法默认情况下不会将分区的数据打乱重新组合
        // 这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
        // 如果想要让数据均衡,可以进行shuffle处理
        //val newRDD: RDD[Int] = rdd.coalesce(2)
        val newRDD: RDD[Int] = rdd.coalesce(2,true)
        newRDD.saveAsTextFile("output3")
        sc.stop()
    }
}


coalesce 默认情况是缩减分区,不打乱原有数据的顺序,有可能导致缩减分区后数据不均衡, 如果想要让数据均衡,可以进行shuffle处理,参数 shuffle 的默认值为 false

    //如果想要让数据均衡,可以进行shuffle处理,参数 shuffle 的默认值为 false
        //val newRDD: RDD[Int] = rdd.coalesce(2)
        val newRDD: RDD[Int] = rdd.coalesce(2,true)

repartition 增大分区

val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.repartition(4)
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark11_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - filter
        val rdd = sc.makeRDD(List(1,2,3,4,5,6), 2)

        // coalesce算子可以扩大分区的,但是如果不进行shuffle *** 作,不打乱数据顺序是没有意义,不起作用。
        // 所以如果想要实现扩大分区的效果,需要使用shuffle *** 作
        // spark提供了一个简化的 *** 作
        // 缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
        // 扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle
        //val newRDD: RDD[Int] = rdd.coalesce(3, true)
        val newRDD: RDD[Int] = rdd.repartition(3)

        newRDD.saveAsTextFile("output")
        
        sc.stop()
    }
}

sortBy 根据指定规则排序


val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),2)
//根据num排序
val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark12_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - sortBy
        val rdd = sc.makeRDD(List(6,2,4,5,3,1), 2)

        val newRDD: RDD[Int] = rdd.sortBy(num=>num)
        
        newRDD.collect().foreach(println)
        sc.stop()

    }
}

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark12_RDD_Operator_Transform1 {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - sortBy
        val rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2)

        // sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式,ascending默认是true及升序,false为降序
        // sortBy默认情况下,不会改变分区。但是中间存在shuffle *** 作,
        //Int格式排序
        val newRDD = rdd.sortBy(t=>t._1.toInt, false)

        newRDD.collect().foreach(println)
        sc.stop()

    }
}

 //以原始String 类型排序
        val newRDD = rdd.sortBy(t=>t._1, false)

双 Value 类型

双 Value 类型指的是两个数据源之间的关联 *** 作

intersection union subtract 要求两个数据流的类型一致,zip可以不一致

  • 交集,并集和差集要求两个数据源数据类型保持一致
  • 拉链 *** 作两个数据源的类型可以不一致,但要求两个数据源要求分区数量和每个分区中的数据要保持一致
intersection 两个数据源的交集 *** 作

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.intersection(dataRDD2)
union 并集

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.union(dataRDD2)

与数学中的并集不同的是 相同的数据不会只保留一个,spark中的并集只是把两个数据源直接放在一起。

1,2,3,4,3,4,5,6
subtract差集

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.subtract(dataRDD2)
zip拉链
    // Can't zip RDDs with unequal numbers of partitions: List(2, 4)
  • 两个数据源要求分区数量要保持一致
    // Can only zip RDDs with same number of elements in each partition
  • 两个数据源要求分区中数据数量保持一致

(1,3),(2,4),(3,5),(4,6)

Spark中 拉链 *** 作与scala中不同的是两个数据源要求分区中数据数量保持一致

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark13_RDD_Operator_Transform1 {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)


        //scala 中的拉链 *** 作,数据长度可以不一致
        val ids = List[Long](1,2,3,4,5,6,7)
        val ids1 = List[Long](2,3,4,5,6,7)
        //ids.tail=(2,3,4,5,6,7)
        val okflowIds: List[(Long, Long)] = ids.zip(ids1)

        // TODO 算子 - 双Value类型
        // Can't zip RDDs with unequal numbers of partitions: List(2, 4)
        // 两个数据源要求分区数量要保持一致
        // Can only zip RDDs with same number of elements in each partition
        // 两个数据源要求分区中数据数量保持一致
        val rdd1 = sc.makeRDD(List(1,2,3,4,5,6),2)
        val rdd2 = sc.makeRDD(List(3,4,5,6),2)
        // 运行报错
        val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
        println(rdd6.collect().mkString(","))
        
        sc.stop()

    }
}

交集+并集+差集+拉链

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark13_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - 双Value类型

        // 交集,并集和差集要求两个数据源数据类型保持一致
        // 拉链 *** 作两个数据源的类型可以不一致

        val rdd1 = sc.makeRDD(List(1,2,3,4))

        val rdd2 = sc.makeRDD(List(3,4,5,6))

        val rdd7 = sc.makeRDD(List("3","4","5","6"))

        // 交集 : 【3,4】
        val rdd3: RDD[Int] = rdd1.intersection(rdd2)
        //类型不一致,编译不通过
        //val rdd8 = rdd1.intersection(rdd7)
        println(rdd3.collect().mkString(","))

        // 并集 : 【1,2,3,4,3,4,5,6】
        val rdd4: RDD[Int] = rdd1.union(rdd2)
        println(rdd4.collect().mkString(","))

        // 差集 : 【1,2】
        val rdd5: RDD[Int] = rdd1.subtract(rdd2)
        // 差集 : 【1,2】
        val rdd9: RDD[Int] = rdd2.subtract(rdd1)

        println(rdd5.collect().mkString(","))
        println(rdd9.collect().mkString(","))

        // 拉链 : 【1-3,2-4,3-5,4-6】
        //(1,3),(2,4),(3,5),(4,6)
        val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
        //zip可以类型不一致
        val rdd8 = rdd1.zip(rdd7)
        println(rdd6.collect().mkString(","))

        sc.stop()

    }
}

Key - Value 类型

要求数据流失(key, value)类型

partitionBy

实现改变数据所在分区的位置

val rdd: RDD[(Int, String)] =
 sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
import org.apache.spark.HashPartitioner
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object Spark14_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - (Key - Value类型)
        val rdd = sc.makeRDD(List(1,2,3,4),2)

        val mapRDD:RDD[(Int, Int)] = rdd.map((_,1))
        // RDD => PairRDDFunctions
        // 隐式转换(二次编译)

        // partitionBy根据指定的分区规则对数据进行重分区
        val newRDD = mapRDD.partitionBy(new HashPartitioner(2))
        newRDD.partitionBy(new HashPartitioner(2))

        newRDD.saveAsTextFile("output")
        
        sc.stop()

    }
}

如果重分区的分区器和当前 RDD 的分区器一样(分区器的类型和分区的数量一样)怎么办?
直接返回RDD 不会创建新的RDD

Spark 还有其他分区器吗
hash分区器和range(范围)分区器

reduceByKey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object Spark15_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - (Key - Value类型)

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("a", 3), ("b", 4)
        ))

        // reduceByKey : 相同的key的数据进行value数据的聚合 *** 作
        // scala语言中一般的聚合 *** 作都是两两聚合,spark基于scala开发的,所以它的聚合也是两两聚合
        // 【1,2,3】
        // 【3,3】
        // 【6】
        // reduceByKey中如果key的数据只有一个,是不会参与运算的。
        val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey( (x:Int, y:Int) => {
            println(s"x = ${x}, y = ${y}")
            x + y
        } )

        reduceRDD.collect().foreach(println)
        sc.stop()

    }
}

groupByKey

相同的key放的一个group

groupByky和reduceByKey的区别,reduceByKey有聚合(求相同keyd的数量)的作用,groupByky是把key放在一组
groupByKey:

(a,CompactBuffer(1, 2, 3))
(b,CompactBuffer(4))

educeByKey:

(a,6)
(b,4)

val dataRDD1 =
 sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.groupByKey()
val dataRDD3 = dataRDD1.groupByKey(2)
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark16_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - (Key - Value类型)

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("a", 3), ("b", 4)
        ))

        // groupByKey : 将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组
        //              元组中的第一个元素就是key,
        //              元组中的第二个元素就是相同key的value的集合
        val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()

        groupRDD.collect().foreach(println)

        val groupRDD1: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)

        groupRDD1.collect().foreach(println)
        sc.stop()
    }
}


reduceByKey 和 groupByKey 的区别?

aggregateByKey


先对每个分区内做最大值判断,再分区间做聚合 *** 作

// TODO : 取出每个分区内相同 key 的最大值然后分区间相加
// aggregateByKey 算子是函数柯里化,存在两个参数列表
// 1. 第一个参数列表中的参数表示初始值
// 2. 第二个参数列表中含有两个参数
// 2.1 第一个参数表示分区内的计算规则
// 2.2 第二个参数表示分区间的计算规则
val rdd =
 sc.makeRDD(List(
 ("a",1),("a",2),("c",3),
 ("b",4),("c",5),("c",6)
 ),2)
// 0:("a",1),("a",2),("c",3) => (a,10)(c,10)
// => (a,10)(b,10)(c,20)
// 1:("b",4),("c",5),("c",6) => (b,10)(c,10)
val resultRDD =
 rdd.aggregateByKey(10)(
 (x, y) => math.max(x,y),
 (x, y) => x + y
 )
resultRDD.collect().foreach(println)


package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark17_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - (Key - Value类型)

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("a", 3), ("a", 4)
        ),2)
        // (a,【1,2】), (a, 【3,4】)
        // (a, 2), (a, 4)
        // (a, 6)

        // aggregateByKey存在函数柯里化,有两个参数列表
        // 第一个参数列表,需要传递一个参数,表示为初始值
        //       主要用于当碰见第一个key的时候,和value进行分区内计算
        // 第二个参数列表需要传递2个参数
        //      第一个参数表示分区内计算规则
        //      第二个参数表示分区间计算规则

        // math.min(x, y)
        // math.max(x, y)
        rdd.aggregateByKey(0)(
            (x, y) => math.max(x, y),
            (x, y) => x + y
        ).collect.foreach(println)
        
        sc.stop()

    }
}

(a,6)

reduceByKey和aggregateByKey的区别
reduceByKey:分区内和分区间的计算规则一样
aggregateByKey:分区内和分区间的计算规则不一样


分区内计算规则和分区间计算规则相同怎么办?
aggregateByKey 分区内和分区间的计算规则可以相同也可以不同。

计算规则相同

aggregateByKey最终的返回数据结果应该和初始值的类型保持一致:

获取相同key的数据的平均值

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark18_RDD_Operator_Transform3 {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - (Key - Value类型)

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3),
            ("b", 4), ("b", 5), ("a", 6)
        ),2)

        // aggregateByKey最终的返回数据结果应该和初始值的类型保持一致
        //val aggRDD: RDD[(String, String)] = rdd.aggregateByKey("")(_ + _, _ + _)
        //aggRDD.collect.foreach(println)

        // 获取相同key的数据的平均值 => (a, 3),(b, 4)
        val newRDD : RDD[(String, (Int, Int))] = rdd.aggregateByKey( (0,0) )(
            ( t, v ) => {
                (t._1 + v, t._2 + 1)
            },
            (t1, t2) => {
                (t1._1 + t2._1, t1._2 + t2._2)
            }
        )

        val resultRDD: RDD[(String, Int)] = newRDD.mapValues {
            case (num, cnt) => {
                num / cnt
            }
        }
        resultRDD.collect().foreach(println)
        
        sc.stop()

    }
}

(b,4)
(a,3)
foldByKey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
import org.apache.spark.{SparkConf, SparkContext}

object Spark17_RDD_Operator_Transform2 {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - (Key - Value类型)

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3),
            ("b", 4), ("b", 5), ("a", 6)
        ),2)

        //rdd.aggregateByKey(0)(_+_, _+_).collect.foreach(println)

        // 如果聚合计算时,分区内和分区间计算规则相同,spark提供了简化的方法
        rdd.foldByKey(0)(_+_).collect.foreach(println)
        
        sc.stop()

    }
}

(b,12)
(a,9)
combineByKey

val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), 
("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
 (_, 1),
 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?

  • reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同

  • FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相

  • AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规
    则可以不相同

  • CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区
    内和分区间计算规则不相同。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark20_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - (Key - Value类型)

        val rdd = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("b", 3),
            ("b", 4), ("b", 5), ("a", 6)
        ),2)

        //底层都是调用的相同的方法 combineByKeyWithClassTag
        
        rdd.reduceByKey(_+_) // wordcount
        rdd.aggregateByKey(0)(_+_, _+_) // wordcount
        rdd.foldByKey(0)(_+_) // wordcount
        rdd.combineByKey(v=>v,(x:Int,y)=>x+y,(x:Int,y:Int)=>x+y) // wordcount

        sc.stop()
    }
}

join 谨慎使用OOM的风险

相当于sql表的内连接

key的类型必须相同 value类型可以不相同

val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
rdd.join(rdd1).collect().foreach(println)
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark21_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - (Key - Value类型)

        val rdd1 = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("c", 3)
        ))

        val rdd2 = sc.makeRDD(List(
            ("a", 5), ("c", 6),("a", 4)
        ))

        // join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组
        //        如果两个数据源中key没有匹配上,那么数据不会出现在结果中
        //        如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低。
        val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)

        joinRDD.collect().foreach(println)
        
        sc.stop()

    }
}

两个不同数据源的数据,相同的key的value会连接在一起,形成元组

        // TODO 算子 - (Key - Value类型)

        val rdd1 = sc.makeRDD(List(
            ("a", 1), ("b", 2), ("c", 3)
        ))

        val rdd2 = sc.makeRDD(List(
            ("c", 5), ("b", 6),("a", 4)
        ))

        // join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组
        val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
(a,(1,4))
(b,(2,6))
(c,(3,5))

如果两个数据源中key没有匹配上,那么数据不会出现在结果中

val rdd1 = sc.makeRDD(List(
            ("a", 1), ("b", 2), ("c", 3)
        ))

        val rdd2 = sc.makeRDD(List(
            ("a", 5), ("c", 6),("d", 4)
        ))

        // join : 
        //     如果两个数据源中key没有匹配上,那么数据不会出现在结果中   
  
        val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
(a,(1,5))
(c,(3,6))

如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低。

        val rdd1 = sc.makeRDD(List(
            ("a", 1), ("a", 2), ("c", 3)
        ))

        val rdd2 = sc.makeRDD(List(
            ("a", 5), ("c", 6),("a", 4)
        ))

        //        如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低。
        val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
(a,(1,5))
(a,(1,4))
(a,(2,5))
(a,(2,4))
(c,(3,6))
leftOuterJoin / rightOuterJoin

相当于sql表的外连接,及以一条数据流为主

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark22_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - (Key - Value类型)

        val rdd1 = sc.makeRDD(List(
            ("a", 1), ("b", 2), ("c", 3)
        ))

        val rdd2 = sc.makeRDD(List(
            ("a", 4), ("b", 5)//,("c", 6)
        ))
        val leftJoinRDD = rdd1.leftOuterJoin(rdd2)
       // val rightJoinRDD = rdd1.rightOuterJoin(rdd2)

        leftJoinRDD.collect().foreach(println)
        //rightJoinRDD.collect().foreach(println)
        sc.stop()

    }
}

(a,(1,Some(4)))
(b,(2,Some(5)))
(c,(3,None))
        val rdd1 = sc.makeRDD(List(
            ("a", 1), ("b", 2)//, ("c", 3)
        ))

        val rdd2 = sc.makeRDD(List(
            ("a", 4), ("b", 5),("c", 6)
        ))
       // val leftJoinRDD = rdd1.leftOuterJoin(rdd2)
        val rightJoinRDD = rdd1.rightOuterJoin(rdd2)

        rightJoinRDD.collect().foreach(println)
(a,(Some(1),4))
(b,(Some(2),5))
(c,(None,6))
cogroup

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3)))
val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = 
dataRDD1.cogroup(dataRDD2)
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark23_RDD_Operator_Transform {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - (Key - Value类型)

        val rdd1 = sc.makeRDD(List(
            ("a", 1), ("b", 2)//, ("c", 3)
        ))

        val rdd2 = sc.makeRDD(List(
            ("a", 4), ("b", 5),("c", 6),("c", 7)
        ))

        // cogroup : connect + group (分组,连接)
        val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)

        cgRDD.collect().foreach(println)
        
        sc.stop()
    }
}

即使数据没有也会对其统计,所cogroup不要求两个流中必须存在相同的key

(a,(CompactBuffer(1),CompactBuffer(4)))
(b,(CompactBuffer(2),CompactBuffer(5)))
(c,(CompactBuffer(),CompactBuffer(6, 7)))
案例实 ***


数据agent.log

1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12
1516609143869 2 8 92 9
1516609143869 6 7 84 24
1516609143869 1 8 95 5
1516609143869 8 1 90 29
1516609143869 3 3 36 16
1516609143869 3 3 54 22
1516609143869 7 6 33 5
1516609143869 8 2 91 27
1516609143869 0 5 66 5
1516609143869 1 3 33 6
1516609143869 6 2 97 21
1516609143869 5 2 95 24
1516609143869 8 9 73 11
1516609143869 4 8 62 15
1516609143869 5 5 40 23

整体思路

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark24_RDD_Req {

    def main(args: Array[String]): Unit = {

        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 案例实 *** 

        // 1. 获取原始数据:时间戳,省份,城市,用户,广告
        val dataRDD = sc.textFile("datas/agent.log")

        // 2. 将原始数据进行结构的转换。方便统计
        //    时间戳,省份,城市,用户,广告
        //    =>
        //    ( ( 省份,广告 ), 1 )
        val mapRDD = dataRDD.map(
            line => {
                val datas = line.split(" ")
                (( datas(1), datas(4) ), 1)
            }
        )

        // 3. 将转换结构后的数据,进行分组聚合
        //    ( ( 省份,广告 ), 1 ) => ( ( 省份,广告 ), sum )
        val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_+_)

        // 4. 将聚合的结果进行结构的转换
        //    ( ( 省份,广告 ), sum ) => ( 省份, ( 广告, sum ) )
        val newMapRDD = reduceRDD.map{
            case ( (prv, ad), sum ) => {
                (prv, (ad, sum))
            }
        }

        // 5. 将转换结构后的数据根据省份进行分组
        //    ( 省份, 【( 广告A, sumA ),( 广告B, sumB )】 )
        val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()

        // 6. 将分组后的数据组内排序(降序),取前3名
        val resultRDD = groupRDD.mapValues(
            iter => {
                iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
            }
        )

        // 7. 采集数据打印在控制台
        resultRDD.collect().foreach(println)
        sc.stop()

    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5154808.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-18
下一篇 2022-11-18

发表评论

登录后才能评论

评论列表(0条)

保存