2021.12.29SparkStream

2021.12.29SparkStream,第1张

2021.12.29SparkStream
SparkStreamDemo
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("sparkdemo1")

    //定义流处理文件,采集周期为3秒
    val streamingContext = new StreamingContext(conf,Seconds(3))

    //数据源
    val socketLineStream: ReceiverInputDStream[String] =
      streamingContext.socketTextStream("192.168.111.131",7777)

    //处理每三秒钟采集到的数据
    val wordStream: DStream[String] = socketLineStream.flatMap(line=>line.split("\s+"))
    val wordCountStream: DStream[(String, Int)] = wordStream.map(x=>(x,1)).reduceByKey(_+_)

    //打印输出
    wordCountStream.print()




    //启动采集器
    streamingContext.start()
    streamingContext.awaitTermination()
  }

}
  SparkStreamKafkaSource
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamKafkaSource {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setAppName("sparkstreamkafkademo").setMaster("local[*]")
    val streamingContext = new StreamingContext(conf, Seconds(5))
    streamingContext.checkpoint("checkpoint")


    val kafkaParams: Map[String, String] = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "192.168.111.131:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> " org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams)
    )

//    //无状态
//    val value: DStream[(String, Int)] =
//      kafkaStream.flatMap(x => x.value().split("\s+")).map(x=>(x,1)).reduceByKey(_+_)
//    value.print()

    //有状态 保留之前数据
    val value: DStream[(String, Int)] = kafkaStream.flatMap(x => x.value().split("\s+")).map(x => (x, 1)).updateStateByKey {
      case (seq, buffer) => {
        println(seq.toList.toString())
        println(buffer.getOrElse(0).toString)
        val sum = buffer.getOrElse(0) + seq.sum
        Option(sum)
      }

    }
    value.print()




    streamingContext.start()
    streamingContext.awaitTermination()


  }

}
 SparkStreamKafkaSourceToKafkaSink
import java.util

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}


object SparkStreamKafkaSourceToKafkaSink {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("sparkstreamkafkademo").setMaster("local[*]")
    val streamingContext = new StreamingContext(conf, Seconds(5))
    streamingContext.checkpoint("checkpoint")


    val kafkaParams: Map[String, String] = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "192.168.111.131:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> " org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] =
      KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("mystreamin"), kafkaParams)
    )

    println("--------------a------------------")
    kafkaStream.foreachRDD(
      rdd=>{
        println("----------------b--------------------")


//        rdd.foreach(y=>{
//          println("--------------c--------------------")
//          val words: Array[String] = y.value().split("\s+") //y.value() hello java // hello 1 java 1
//

//          val props: util.HashMap[String, Object] = new util.HashMap[String,Object]()
//          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092")
//          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
//          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
//
//          //ack
//
//          val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](props)
//
//          for(word <- words ){
//            val record: ProducerRecord[String, String] = new ProducerRecord[String,String]("mystreamout",word+",1")
//            producer.send(record)
//
//          }
//        })



        rdd.foreachPartition(
          rdds=>{
            println("--------------------------c----------------------")

            val props: util.HashMap[String, Object] = new util.HashMap[String,Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092")
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")


            val producer = new KafkaProducer[String,String](props)

            rdds.foreach(
              y=>{
                println("------------d-------------------")

                val words: Array[String] = y.value().split("\s+")

                for(word <- words ){
                  val record: ProducerRecord[String, String] = new ProducerRecord[String,String]("mystreamout",word+",1")
                  producer.send(record)

                }
              }
            )
          }
        )
      }
    )


    streamingContext.start()
    streamingContext.awaitTermination()

  }

}
 SparkStreamUserFriendRowToUserFriendSpark
import java.util

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamUserFriendRowToUserFriendSpark {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkdemo1")
    val sc: StreamingContext = new StreamingContext(conf,Seconds(3))

    sc.checkpoint("checkpoint")


    val kafkaParams = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "192.168.111.131:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_ConFIG -> "userRowtouserSpark1"),
      (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      sc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("user_friends_rows"), kafkaParams)
    )

    kafkaStream.foreachRDD(
      rdd=>{  //每个分区传一个算法
        rdd.foreachPartition (
          x => {
            val props = new util.HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.131:9092")
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

            //1111,2222 3333 4444 5555
            val producer = new KafkaProducer[String, String](props)
            x.foreach(y => {
              val split: Array[String] = y.value().split(",")
              if (split.length == 2) {
                val userId = split(0)
                val friends: Array[String] = split(1).split("\s+")
                for (friend <- friends) {
                  val content = userId + "," + friend
                  println(content)
                  producer.send(new ProducerRecord[String, String]("user_friends_spark", content))
                }

              }
            })

          }
        )
      }
    )

    sc.start()
    sc.awaitTermination()





  }

}
 SparkStreamEventAttendRowToEventAttendSpark
import java.util

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamEventAttendRowToEventAttendSpark {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkdemo3")
    val sc: StreamingContext = new StreamingContext(conf,Seconds(3))

    sc.checkpoint("checkpoint")


    val kafkaParams = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "192.168.111.131:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_ConFIG -> "EventAttendRowToEventAttendSpark1"),
      (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest")
    )

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      sc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("event_attendess_row"), kafkaParams)
    )

    kafkaStream.foreachRDD(
      rdd=>{  //每个分区传一个算法
        rdd.foreachPartition (
          x => {
            val props = new util.HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.131:9092")
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

            //user ,yes maybe,invited,no
            val producer = new KafkaProducer[String, String](props)
            x.foreach(y => {
              val fields: Array[String] = y.value().split(",")
              val userId:String=fields(0)
              for(i <- 1 until fields.length)
                if(fields(i).trim.length>0){
                  val status: Array[String] = fields(i).trim.split("\s+")
                  for(value<-status){
                    var tag:String=""
                    if(i==1) tag="yes"
                    else if (i==2) tag="maybe"
                    else if (i==3) tag="invited"
                    else if(i==4) tag="no"

                    val content=userId+","+value+","+tag
                    println(content)
                    producer.send(new ProducerRecord[String,String]("event_attend_spark",content))
                  }

                }


            })

          }
        )
      }
    )

    sc.start()
    sc.awaitTermination()





  }

}

 

 

 

 

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5690503.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存