Spark常见算子

Spark常见算子,第1张

Spark常见算子 UpdateStateByKey(基于磁盘读写)

UpdateStateBykey会统计全局的key的状态,不管有没有数据输入,它会在每一个批次间隔返回之前的key的状态。updateStateBykey会对已存在的key进行state的状态更新,同时还会对每个新出现的key执行相同的更新函数 *** 作。如果通过更新函数对state更新后返回来为none,此时刻key对应的state状态会删除(state可以是任意类型的数据结构)。
适用场景:
UpdataStateBykey可以用来统计历史数据,每次输出所有的key值。列如统计不同时间段用户平均消费金额,消费次数,消费总额,网站的不同时间段的返回量等指标。
适用实例条件:

  1. 首先会以DStream中的数据进行按key做reduce *** 作,然后再对各个批次的数据进行累加。
  2. updataStateByKey要求必须设置checkpoint点(设置中间结果文件夹)
  3. updataStateByKey方法中updataFunc就要传入的参数,Seq[V]表示当前key对应的所有值,Option[S]是当前key的历史状态,返回的是新的封装的数据。
 def main(args: Array[String]): Unit = {
    //    Logger.getLogger("org").setLevel(Level.WARN)

    val conf = new SparkConf()
      .setAppName("Test")
      .setMaster("local[*]")
      .set("spark.streaming.receiver.writeAheadLog.enable", "true")

    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.checkpoint("D:\mydata\checkpoint\test")

    //必须转换成可迭代类型
    val topics = Array(Test_topic)

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> bootstrapServers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    //    val pattern = new Regex("[0-9]+")
    val pattern = "\d+".r
    //    val str = "44: this is a test_topic 44 kafka data"
    //    println((pattern findAllIn str).mkString(","))

    val wordCountDS: DStream[(String, Int)] = kafkaDStream
      .map(record => (pattern findAllIn record.value).next())
      .map(w => (w, 1))
      .reduceByKey((a, b) => a + b)
      .updateStateByKey(update)
      .transform(rdd => {
        rdd.sortBy(t => t._2, ascending = false)
      })

    wordCountDS.print()

    ssc.start()
    ssc.awaitTermination()

  }

  //new_state=("hello",1),old_state=("hello",3)
  //newValue=(1),oldstate=(3)
  def update(newValue: Seq[Int], oldstate: Option[Int]): Option[Int] = {
    val oldCount = oldstate.getOrElse(0) //oldCount(3)
    println(s"updateStateByKey: ${System.currentTimeMillis()}")
    Some(oldCount + newValue.sum) //newstage(4)
  }
mapWithState(基于磁盘存储+缓存)

mapWithState也是用于对于全局统计key的状态,但是它如果没有数据输入,便不会返回之前的key的状态,类型于增量的感觉。
使用场景
mapWithState可以用于一些实时性较高,延迟较少的一些场景,例如你在某宝上下单买了个东西,付款之后返回你账户里余额信息。
适用实例条件:

  1. 如果有初始化的值得需要,可以使用initialState(RDD)来初始化key的值
  2. 还可以指定timeout函数,该函数的作用是,如果一个key超过timeout设定的时间没有更新值,那么这个key将会失效。这个控制需要在fun中实现,必须使用state.isTimingOut()来判断失效的key值。如果在失效时间之后,这个key又有新的值了,则会重新计算。如果没有使用isTimingOut,则会报错。3. checkpoint不会必须的
def main(args: Array[String]): Unit = {
    //1、创建sparkConf
    val sparkConf = new SparkConf().setAppName("scala wordcount").setMaster("local[*]")
    //2、创建SparkContext
    val sc = new SparkContext(sparkConf)
    //3、创建StreamingContext
    val ssc = new StreamingContext(sc, Seconds(5))

    //4、设置checkpoint地址
    ssc.checkpoint("D:\mydata\checkpoint\test")
    //5、必须转换成可迭代类型
    val topics = Array(Test_topic)
    //6、消费者相关参数配置
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> bootstrapServers,
      ConsumerConfig.GROUP_ID_ConFIG -> groupId,
      ConsumerConfig.AUTO_OFFSET_RESET_ConFIG -> "earliest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_ConFIG -> (false: java.lang.Boolean),
      ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> classOf[StringDeserializer]
    )
    //7、通过KafkaUtils.createDirectStream对接kafka
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    //8、获取topic中的数据
    val topicData: DStream[String] = kafkaDStream.map(_.value())
    //9、切分每一行,每个单词计为1
    val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_, 1))
    //10、统计相同单词
    val wordCount: DStream[(String, Int)] = wordAndOne.reduceByKey((a, b) => a + b)
    //11、统计变更数据
    val mapWithStateDStream: MapWithStateDStream[String, Int, Int, (String, Int)] = wordCount.mapWithState(StateSpec.function(mappingFunc))
    //12、打印预览(MappedType)结果
    //mapWithStateDStream.print()
    //
    mapWithStateDStream.foreachRDD(rdd => {
      rdd.foreachPartition(p => {
        //
        p.foreach(line => {
          println(s"结果: ${line}")
        })
        //
      })
    })
    
    //    //当前状态的键值快照
    //    val stateSnapshots: DStream[(String, Int)] = wordCount.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots()
    //
    //    stateSnapshots.print()

    //13、开启计算
    ssc.start()
    ssc.awaitTermination()

  }

  //  word="hello",value=1,state=("hello",3)
  val mappingFunc = (word: String, value: Option[Int], state: State[Int]) => {
    val sum = value.getOrElse(0) + state.getOption.getOrElse(0) //sum = 4
    //    val output: String = s"${word} -> ${sum}" //("hello-> 4")
    val output: (String, Int) = (word, sum) //("hello-> 4")
    state.update(sum) //更新key="hello"的状态为4
    println(s"mapWithState: ${System.currentTimeMillis()}")
    output //返回("hello",4)
  }
区别:

updataeStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。

mapWithState只返回变化后的key的值,这样做的好处是,我们可以只关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key 的数据。这样的话,即使数据量很大,checkpint也不会updateBykey那样,占用太多的存储,效率比较高(再生产环境中建议使用这个)。

详细使用:https://www.jianshu.com/p/a54b142067e5

map和mapPartition的区别:

map是对RDD的每一个元素使用一个方法 *** 作,mapPartitions是对每个partition的迭代器使用一个方法 *** 作。

MapPartitions的优点:

使用MapPartitions *** 作之后,一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。通常体现在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。SparkSql或Dataframe默认会对程序进行mapPartition的优化。

MapPartitions的缺点:

如果是普通的map *** 作,一次function的执行就处理一条数据,可以将已经处理完的数据从内存里面释放掉。
所以说普通的map *** 作通常不会导致内存的OOM异常。 但是MapPartitions *** 作,对于大量数据来说,如果直接将迭代器中数据取出来放内存,可能就OOM(内存溢出)。

foreach和foreachPartition的区别: foreachPartition:

foreachPartition是spark-core的action算子,foreachPartition是对每个partition中的iterator分别处理,通过将iterator传入function对进行数据的处理,也就是说在foreachPartition中函数处理的是分区迭代器,而非具体的数据,源码中的注释是:Applies a function func to each parition of this RDD.(将函数func应用于此RDD的每个分区)

foreach:

foreach也是spark-core的action算子,与foreachPartition类似的是,foreach也是对每个partition中的iterator分别处理,通过对每个iterator迭代获取数据传给function进行数据的处理,也就是说在foreach中函数处理的是具体的数据,源码中的注释是:Applies a function fun to all elements of this RDD.(将函数func用于此RDD的所有元素).

foreachRDD与上面两个的区别:

foreachRDD是sparkStreaming的OutputOperation算子。但是foreachRDD并不会触发立即处理,必须在碰到sparkcore的foreach或者foreachPartition算子后,才会触发action动作。同时要注意,function的应用在的driver端进行,而不是Executor端进行。

def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[*]")
      .getOrCreate()

    //隐式转换(spark自适应)
    import spark.implicits._

    val sc = spark.sparkContext

    val sourceRDD: RDD[String] = sc.parallelize(List[String](
      "hello", "hello", "hello", "spark", "spark"),
      2)

    //groupBy指定键来标示一个数据,便于之后的聚合
    val groupRDD: RDD[(String, Iterable[String])] = sourceRDD.groupBy(x => x)

    val mapPartRDD = groupRDD.mapPartitions(iterable1 => {
      iterable1.map {
        case (word, words) => {
          (word, words.size)
        }
      }
      
    })

    mapPartRDD.foreach(println)

    val mapRDD = groupRDD.map {
      case (word, iterable) => {
        (word, iterable.size)
      }
    }

    mapRDD.foreachPartition(iterable => {
      iterable.foreach(println)
    })
    
    spark.stop()
  }
GroupByKey和ReduceByKey的区别: GroupByKey(numPartitions=None)

官方解释:将 RDD 中每个键的值分组为一个序列。 使用 numPartitions 分区对生成的 RDD 进行哈希分区。 注意:如果您分组是为了对每个键执行聚合(例如求和或平均值),则使用 reduceByKey 或 aggregateByKey 将提供更好的性能。

– GroupByKey 只是将相同键的给归纳到一个序列,没有其它函数 *** 作 ReduceByKey(func, numPartitions=None)

官方解释:使用关联归约函数合并每个键的值。 这也将在结果发送到reduce端之前在每个映射器上本地执行合并, 输出将使numPartitions 分区进行哈希分区,如果未指定 numPartitions,则使用默认并行级别。

– ReduceByKey shuffle前将键相同的值局部函数聚合,减轻shuffle时的io压力,shuffle后在各自分区继续函数聚合。 aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])

官方解释:zeroValue是给每个分区的初始值(相当于对应键的一个初始元素),map端每个分区数据都会使用第一个函数seqOp进行聚合 *** 作,
shuffle后分区使用第二个函数combOp进行 *** 作,并且最终返回值类型与初始值类型一样(和原有数据类型可以不一致)。

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[*]")
      .getOrCreate()

    //隐式转换(spark自适应)
    import spark.implicits._

    val sc = spark.sparkContext

    val sourceRDD: RDD[String] = sc.parallelize(List[String](
    "hello", "hello", "hello", "spark", "spark"),
    2)

    val wordPairRDD: RDD[(String, Int)] = sourceRDD.map((_, 1))

    println("=" * 50)

    //相同键放在同一分区,值合并为一个序列
    val groupByRDD: RDD[(String, Iterable[Int])] = wordPairRDD.groupByKey()

    groupByRDD.foreach(println)

    println("=" * 50)

    //对各键的序列进行聚合 *** 作
    groupByRDD.map(pair =>{
      val iterablevalue: Iterable[Int] = pair._2
      var sum = 0
      iterablevalue.foreach(num => sum += num)

      (pair._1,sum)
    }).foreach(println)

    println("=" * 50)

    //直接使用函数在shuffle前聚合一遍,减轻IO压力,shuffle后各分区在聚合一遍
    val reduceByKeyRDD = wordPairRDD.reduceByKey(_ + _)

    reduceByKeyRDD.foreach(println)


    println("=" * 50)

    //查看数据所在分区
    wordPairRDD.mapPartitionsWithIndex((index, ite) => {
      ite.map((index, _))
    }).foreach(println)

    println("=" * 50)

    //分区数量
    println("partitions: " + wordPairRDD.partitions.length)

    println("=" * 50)

    //shuffle前分区的聚合函数
    val sepOP = (a: String, b: Int) => {
      a + "|" + b
    }

    //shuffle后分区的聚合函数
    val combOP = (a: String, b: String) => {
      a + "-Shufix-" + b
    }

    //柯里化func(x)(y)
    //zeroValue给每个分区不同键的初始值
    //可以根据初始值类型来返回最终聚合类型
    val aggregateByKeyRDD: RDD[(String, String)] = wordPairRDD.aggregateByKey("Prefix")(sepOP, combOP)

    aggregateByKeyRDD.foreach(println)

    spark.stop()
  }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5676694.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存