SparkStreamDemo
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkStreamDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("sparkdemo1") //定义流处理文件,采集周期为3秒 val streamingContext = new StreamingContext(conf,Seconds(3)) //数据源 val socketLineStream: ReceiverInputDStream[String] = streamingContext.socketTextStream("192.168.111.131",7777) //处理每三秒钟采集到的数据 val wordStream: DStream[String] = socketLineStream.flatMap(line=>line.split("\s+")) val wordCountStream: DStream[(String, Int)] = wordStream.map(x=>(x,1)).reduceByKey(_+_) //打印输出 wordCountStream.print() //启动采集器 streamingContext.start() streamingContext.awaitTermination() } }
SparkStreamKafkaSource
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkStreamKafkaSource { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("sparkstreamkafkademo").setMaster("local[*]") val streamingContext = new StreamingContext(conf, Seconds(5)) streamingContext.checkpoint("checkpoint") val kafkaParams: Map[String, String] = Map( (ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "192.168.111.131:9092"), (ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> " org.apache.kafka.common.serialization.StringDeserializer"), (ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"), (ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1") ) val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams) ) // //无状态 // val value: DStream[(String, Int)] = // kafkaStream.flatMap(x => x.value().split("\s+")).map(x=>(x,1)).reduceByKey(_+_) // value.print() //有状态 保留之前数据 val value: DStream[(String, Int)] = kafkaStream.flatMap(x => x.value().split("\s+")).map(x => (x, 1)).updateStateByKey { case (seq, buffer) => { println(seq.toList.toString()) println(buffer.getOrElse(0).toString) val sum = buffer.getOrElse(0) + seq.sum Option(sum) } } value.print() streamingContext.start() streamingContext.awaitTermination() } }
SparkStreamKafkaSourceToKafkaSink
import java.util import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} object SparkStreamKafkaSourceToKafkaSink { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("sparkstreamkafkademo").setMaster("local[*]") val streamingContext = new StreamingContext(conf, Seconds(5)) streamingContext.checkpoint("checkpoint") val kafkaParams: Map[String, String] = Map( (ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "192.168.111.131:9092"), (ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> " org.apache.kafka.common.serialization.StringDeserializer"), (ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"), (ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup1") ) val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe(Set("mystreamin"), kafkaParams) ) println("--------------a------------------") kafkaStream.foreachRDD( rdd=>{ println("----------------b--------------------") // rdd.foreach(y=>{ // println("--------------c--------------------") // val words: Array[String] = y.value().split("\s+") //y.value() hello java // hello 1 java 1 // // val props: util.HashMap[String, Object] = new util.HashMap[String,Object]() // props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092") // props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // // //ack // // val producer: KafkaProducer[String, String] = new KafkaProducer[String,String](props) // // for(word <- words ){ // val record: ProducerRecord[String, String] = new ProducerRecord[String,String]("mystreamout",word+",1") // producer.send(record) // // } // }) rdd.foreachPartition( rdds=>{ println("--------------------------c----------------------") val props: util.HashMap[String, Object] = new util.HashMap[String,Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.111.131:9092") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String,String](props) rdds.foreach( y=>{ println("------------d-------------------") val words: Array[String] = y.value().split("\s+") for(word <- words ){ val record: ProducerRecord[String, String] = new ProducerRecord[String,String]("mystreamout",word+",1") producer.send(record) } } ) } ) } ) streamingContext.start() streamingContext.awaitTermination() } }
SparkStreamUserFriendRowToUserFriendSpark
import java.util import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkStreamUserFriendRowToUserFriendSpark { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkdemo1") val sc: StreamingContext = new StreamingContext(conf,Seconds(3)) sc.checkpoint("checkpoint") val kafkaParams = Map( (ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "192.168.111.131:9092"), (ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"), (ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"), (ConsumerConfig.GROUP_ID_ConFIG -> "userRowtouserSpark1"), (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest") ) val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( sc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe(Set("user_friends_rows"), kafkaParams) ) kafkaStream.foreachRDD( rdd=>{ //每个分区传一个算法 rdd.foreachPartition ( x => { val props = new util.HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.131:9092") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") //1111,2222 3333 4444 5555 val producer = new KafkaProducer[String, String](props) x.foreach(y => { val split: Array[String] = y.value().split(",") if (split.length == 2) { val userId = split(0) val friends: Array[String] = split(1).split("\s+") for (friend <- friends) { val content = userId + "," + friend println(content) producer.send(new ProducerRecord[String, String]("user_friends_spark", content)) } } }) } ) } ) sc.start() sc.awaitTermination() } }
SparkStreamEventAttendRowToEventAttendSpark
import java.util import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkStreamEventAttendRowToEventAttendSpark { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkdemo3") val sc: StreamingContext = new StreamingContext(conf,Seconds(3)) sc.checkpoint("checkpoint") val kafkaParams = Map( (ConsumerConfig.BOOTSTRAP_SERVERS_ConFIG -> "192.168.111.131:9092"), (ConsumerConfig.VALUE_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"), (ConsumerConfig.KEY_DESERIALIZER_CLASS_ConFIG -> "org.apache.kafka.common.serialization.StringDeserializer"), (ConsumerConfig.GROUP_ID_ConFIG -> "EventAttendRowToEventAttendSpark1"), (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest") ) val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( sc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe(Set("event_attendess_row"), kafkaParams) ) kafkaStream.foreachRDD( rdd=>{ //每个分区传一个算法 rdd.foreachPartition ( x => { val props = new util.HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.131:9092") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") //user ,yes maybe,invited,no val producer = new KafkaProducer[String, String](props) x.foreach(y => { val fields: Array[String] = y.value().split(",") val userId:String=fields(0) for(i <- 1 until fields.length) if(fields(i).trim.length>0){ val status: Array[String] = fields(i).trim.split("\s+") for(value<-status){ var tag:String="" if(i==1) tag="yes" else if (i==2) tag="maybe" else if (i==3) tag="invited" else if(i==4) tag="no" val content=userId+","+value+","+tag println(content) producer.send(new ProducerRecord[String,String]("event_attend_spark",content)) } } }) } ) } ) sc.start() sc.awaitTermination() } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)