spark学习之SparkStreaming

spark学习之SparkStreaming,第1张

spark学习之SparkStreaming SparkStreaming

Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。

SparkStreaming概述
数据处理延迟方式
	实时:数据处理在毫秒级别,秒
	离线:数据处理延迟以小时,天为单位
数据处理的方式
	流式:一个一个数据进行处理
	批处理:一批一批数据进行处理
	
SparkCore:		 离线数据分析框架|批处理
SparkStreaming:	基于SparkCore来完成实时数据处理分析(执行场景在离线批处理和实时流式之间,可以称为准实				    时,微批次数据处理框架)
	

SparkStreaming架构

背压机制

调整数据采集能力与数据消费能力

Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。

为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。
Dstream入门 WordCount案例实 ***

需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

添加依赖关系


    org.apache.spark
    spark-streaming_2.12
    3.0.0

idea代码

package com.pihao.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCount02 {
  def main(args: Array[String]): Unit = {
    //TODO SparkStreaming流式数据处理

    //TODO 建立环境
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    //SparkStreaming环境对象的第二个参数表示数据采集周期
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    //TODO 执行 *** 作
    //创建采集器(一行一行)
    val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999) //监听本机的9999端口
    //拿到一个一个的单词
    val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))

    val wordToCountDS: DStream[(String, Int)] = wordDS.map((_,1)).reduceByKey(_+_)
    wordToCountDS.print()

    //启动采集器
    ssc.start()
    //等待结束
    ssc.awaitTermination()
  }

}

在hadoop102启动nc服务,然后启动idea

# nc -lk 9999
# nc -lp 9999
[atguigu@hadoop102 ~]$ nc -lk 9999
pihao
hello 
world

Dstream创建 RDD队列

测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。

案例实 ***

循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount

object RDDStream {

  def main(args: Array[String]) {

    //1.初始化Spark配置信息
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")

    //2.初始化SparkStreamingContext
    val ssc = new StreamingContext(conf, Seconds(4))

    //3.创建RDD队列
    val rddQueue = new mutable.Queue[RDD[Int]]()

    //4.创建QueueInputDStream
    val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)

    //5.处理队列中的RDD数据
    val mappedStream = inputStream.map((_,1))
    val reducedStream = mappedStream.reduceByKey(_ + _)

    //6.打印结果
    reducedStream.print()

    //7.启动任务
    ssc.start()

//8.循环创建并向RDD队列中放入RDD
    for (i <- 1 to 5) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
      Thread.sleep(2000)
    }

    ssc.awaitTermination()

  }
}
自定义数据源

自定义一个数据源,获取其中的数据

package com.pihao.streaming

import java.util.concurrent.TimeUnit

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiver

import scala.util.Random

object SparkStreaming_DIY {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    //执行 *** 作
    val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver()) //使用自己定义的接收器
    ds.print()

    ssc.start()
    ssc.awaitTermination()
  }

  
  class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
    private var flag = true

    //开启接受
    override def onStart(): Unit = {
      new Thread(new Runnable {
        override def run(): Unit = {
          while(flag){
            val i: Int = new Random().nextInt(100)
            store(i+"")
            TimeUnit.NANOSECONDS.sleep(500)
          }
        }
      }).start()
    }
    //关闭接收
    override def onStop(): Unit = {
      flag = false
    }
  }

}

Kafka数据源 版本选型
1.ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用

2.DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。(采集和计算都在一个节点,容易控制)
Kafka 0-10 Direct 模式

需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做计算,最终打印到控制台。

提前创建kafka主题

启动zookeeper集群
启动kafka集群

# 创建sparkstreaming的topic
[atguigu@hadoop102 ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic sparkstreaming --partitions 3 --replication-factor 2

[atguigu@hadoop102 ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list
__consumer_offsets
sparkstreaming
[atguigu@hadoop102 ~]$ 

添加依赖


     org.apache.spark
     spark-streaming-kafka-0-10_2.12
     3.0.0



    com.fasterxml.jackson.core
    jackson-core
    2.10.1

编写代码

package com.pihao.streaming

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object KafkaStream {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    //TODO 执行 *** 作
    //定义kafka的连接配置
    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_ConFIG -> "spark",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    //Kafka专门用于实时数据生成,提供了很多封装类
    //ConsumerRecord是一个KV
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
    KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent, //任务与采集器的位置关系,当前表示自动选择,也可以指定哪个broker
      ConsumerStrategies.Subscribe[String, String](Set("sparkstreaming"), kafkaPara))

    //获取ConsumerRecord中的数据
    val kafkaData: DStream[String] = kafkaDStream.map(_.value())
    kafkaData.print()

    ssc.start()
    ssc.awaitTermination()
  }

}

接收成功

Dstream转换

DStream上的 *** 作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换 *** 作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

无状态化 *** 作

无状态转化 *** 作就是把简单的RDD转化 *** 作应用到每个批次上,也就是转化DStream中的每一个RDD。

Transform

Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次(周期性执行)。其实也就是对DStream中的RDD应用转换。

object Transform {

  def main(args: Array[String]): Unit = {

    //创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

    //创建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    //创建DStream
    val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("host", 9999)

    //在某些情况,DS对象不能执行所有的 *** 作
    //如果想要进行特殊 *** 作,那么可以直接通过底层的RDD进行
    val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => {

      val words: RDD[String] = rdd.flatMap(_.split(" "))

      val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

      val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

      value
    })

    //打印
    wordAndCountDStream.print

    //启动
    ssc.start()
    ssc.awaitTermination()

  }
}
join

两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join,与两个RDD的join效果相同。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object JoinTest {

  def main(args: Array[String]): Unit = {

    //1.创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JoinTest")

    //2.创建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    //3.从端口获取数据创建流
    val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
    val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("linux2", 8888)

    //4.将两个流转换为KV类型
    val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_, 1))
    val wordToADStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_, "a"))

    //5.流的JOIN
    val joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToADStream)

    //6.打印
    joinDStream.print()

    //7.启动任务
    ssc.start()
    ssc.awaitTermination()
  }
}
有状态转化 *** 作

有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。需要保存之前每次数据的聚合 *** 作

UpdateStateByKey
package com.pihao.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming_State {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    //检查点应该设置在hdfs中比较稳妥
    ssc.checkpoint("cp")
    //TODO 执行 *** 作
    val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)
    val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))
    val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1))

    //reduceByKey方法属于无状态 *** 作的方法
    //updateStateByKey方法属于有状态 *** 作的方法
    //方法参数中的第一个参数,表示相同key的value序列
    //方法参数中的第二个参数,表示相同key缓冲区的数据值
    val state: DStream[(String, Int)] = wordToOneDS.updateStateByKey(
      (seq: Seq[Int], buffer: Option[Int]) => {
        val newValue: Int = seq.sum + buffer.getOrElse(0)
        Option(newValue)
      }
    )
    state.print()

    ssc.start()
    ssc.awaitTermination()

  }
}

WindowOperations

所谓的窗口 *** 作,其实就是将多个采集周期的数据一次性进行处理,而不是一个采集周期处理一次。

SparkStreaming总的窗口范围不能切断采集周期,可以是采集周期的整数倍

SparkStreaming总的窗口滑动幅度也应该是采集周期的整数倍

package com.pihao.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object SparkStreaming_window{
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    //TODO 执行 *** 作
    val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)
    val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))
    val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1))

    //window方法可以传递一个参数,表示窗口的范围(时间周期,应该为采集周期的整数倍)
    //window方法可以传递两个参数,
      //第一个表示窗口的范围(时间周期,应该为采集周期的整数倍)
      //第二个表示窗口滑动的步长(时间周期,也是采集周期的整数倍,不传默认是一个一个采集周期)
      //窗口计算的时间以窗口步长做基础的
    val windowDS: DStream[(String, Int)] = wordToOneDS.window(Seconds(6),Seconds(3))
    val resultDS: DStream[(String, Int)] = windowDS.reduceByKey(_+_)
    resultDS.print()


    ssc.start()
    ssc.awaitTermination()
  }
}

增量计算

增量计算 = 当前窗口的值+ 新进入到窗口的数据-排除掉窗口的数据

使用场合:适合在窗口范围大,滑动浮动小的场合,这样就会有大量的重复数据

package com.pihao.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object Sparkstream_window2 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    ssc.checkpoint("cp")
    //TODO 执行 *** 作
    val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)
    val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))
    val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1))

    //增量计算
    //需要设定检查点路径
    val windowDS: DStream[(String, Int)] = wordToOneDS.reduceByKeyAndWindow(
      (x: Int, y: Int) => {
        x + y
      },
      (x: Int, y: Int) => {
        x - y
      },
      Seconds(6),
      Seconds(3)
    )
    windowDS.print()


    ssc.start()
    ssc.awaitTermination()
  }
}

Dstream输出

输出 *** 作指定了对流数据经转化 *** 作得到的数据所要执行的 *** 作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出 *** 作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出 *** 作,整个context就都不会启动。

#1 print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的 *** 作叫print()。

#2 saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。

#3 saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。
#4 saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API 中目前不可用。

#5 foreachRDD(func):这是最通用的输出 *** 作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。

通用的输出 *** 作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动 *** 作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。 
注意:
1)连接不能写在driver层面(序列化)
2)如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;
增加foreachPartition,在分区创建(获取)。
优雅的关闭

流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。

使用外部文件系统来控制内部程序关闭。

package com.pihao.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
import org.apache.spark.streaming.dstream.{ ReceiverInputDStream}

object SparkStreaming_Close {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    //TODO 执行 *** 作
    val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)
    socketDS.print()
    //stop()方法主要用于停止数据采集和Driver的调度
    //不可能启动采集后马上停止,所以一般在业务更新或者逻辑更新的场合停止
    //在main线程中停止是不现实的。应该启动一个新的线程来执行停止 *** 作

    new Thread(new Runnable {
      override def run(): Unit = {
        while(true){
          Thread.sleep(10000) //时间
          //这个时间应该是判断数据处理是否可以继续的标记
          //这个标记应该多线程都可以访问
          //这个标记一般情况放在第三方的系统中(mysql,redis)
          val state: StreamingContextState = ssc.getState() //装填
          if (state == StreamingContextState.ACTIVE){
            ssc.stop(true,true) //优雅的关闭
            System.exit(0) //关闭线程
          }
        }
      }
    }).start()


    ssc.start()
    ssc.awaitTermination()
  }
}

案例

模拟实时生成数据,然后统计分析

package com.pihao.main

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import scala.collection.mutable.ListBuffer
import scala.util.Random

object SparkStreaming_MockData {
  def main(args: Array[String]): Unit = {
    // 创建配置对象
    val prop = new Properties()

    // 添加配置
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092")
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String,String](prop)
    val topic = "sparkstreaming"
    val areas = ListBuffer("华北","华东","华南")
    val citys = ListBuffer("北京","上海","深圳")


    while(true){
      Thread.sleep(2000)
      //生成数据
      for (i <- 1 to new Random().nextInt(20)){
        val area: String = areas(new Random().nextInt(3))
        val city: String = citys(new Random().nextInt(3))
        val userId = new Random().nextInt(10)
        val adId = new Random().nextInt(10)

        val message = s"${System.currentTimeMillis()} ${area} ${city} ${userId} ${adId}"
        val record = new ProducerRecord[String,String](topic,message)
        producer.send(record)

      }

    }
  }
}

package com.pihao.main


import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object KafkaStream {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")
    val ssc = new StreamingContext(sparkConf,Seconds(3))
    //TODO 执行 *** 作
    //定义kafka的连接配置
    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_ConFIG -> "spark",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    //Kafka专门用于实时数据生成,提供了很多封装类
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream[String, String](ssc,
        LocationStrategies.PreferConsistent, //任务与采集器的位置关系,当前表示自动选择,也可以指定哪个broker
        ConsumerStrategies.Subscribe[String, String](Set("sparkstreaming"), kafkaPara))

    //获取广告点击的数据
    val kafkaData: DStream[AdvClickData] = kafkaDStream.map(
      data => {
        val kafkaVal: String = data.value()
        val datas: Array[String] = kafkaVal.split(" ")
        AdvClickData(datas(0),datas(1),datas(2),datas(3),datas(4))
      }
    )

    // todo 1.周期性获取数据库中最新的黑名单数据
    val reduceDS: DStream[((String, String, String), Int)] = kafkaData.transform(
      rdd => {
        // todo 2.判断当前周期内的数据是否已经在黑名单中
        // 查询数据库 *** 作select
        val blackList = List[String]()
        val filterRDD: RDD[AdvClickData] = rdd.filter(
          data => !blackList.contains(data.userId)
        )

        //todo 3.将一个周期内的数据进行统计
        filterRDD.map(
          data => {
            //将数据组装返回 timestamp这里要换成日期格式
            ((data.timestamp, data.userId, data.adId), 1)
          }
        ).reduceByKey(_ + _)
      }
    )

    //todo 4.判断统计的结果是否超过阈值
    reduceDS.foreachRDD(
      rdd => {
        rdd.foreach {
          case ((timestamp, userId, adId), sum) => {
              //如果当前采集周期内的sum都大于100,那么将userId就直接拉入黑名单
              if (sum>=100){
                //insert into
              }else{
                val oldStateval = 0 //select 根据userId查询mysql中旧的值
                val newStateval = 0 + sum
                if (newStateval >= 100){
                  // insert into
                }else{
                  // update 根据数据库newStateval
                }
              }
          }
        }
      }
    )


    ssc.start()
    ssc.awaitTermination()
  }


  case class AdvClickData(timestamp: String, area: String, city: String, userId: String, adId: String)
}


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5435810.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-11
下一篇 2022-12-11

发表评论

登录后才能评论

评论列表(0条)

保存