目录
sparkstreaming+flume+kafka实时流式处理完整流程
一、前期准备
二、实现步骤
1.引入依赖
2.日志收集服务器
3.日志接收服务器
4、spark集群处理接收数据并写入数据库
5、测试结果
sparkstreaming+flume+kafka实时流式处理完整流程 一、前期准备
1、环境准备,四台测试服务器
spark集群三台,hadoop02,hadoop03,hadoop04
kafka集群三台,hadoop02,hadoop03,hadoop04
zookeeper集群三台,hadoop02,hadoop03,hadoop04
日志接收服务器, hadoop02
日志收集流程:
日志收集服务器->日志接收服务器->kafka集群->spark集群处理
说明: 日志收集服务器,在实际生产中很有可能是应用系统服务器,日志接收服务器为大数据服务器中一台,日志通过网络传输到日志接收服务器,再入集群处理。
因为,生产环境中,往往网络只是单向开放给某台服务器的某个端口访问的。
Flume版本:apache-flume-1.9.0-bin.tar.gz,该版本已经较好地集成了对kafka的支持2.1 实时计算
跟实时系统类似(能在严格的时间限制内响应请求的系统),例如在股票交易中,市场数据瞬息万变,决策通常需要秒级甚至毫秒级。通俗来说,就是一个任务需要在非常短的单位时间内计算出来,这个计算通常是多次的。2.2 流式计算
通常指源源不断的数据流过系统,系统能够不停地连续计算。这里对时间上可能没什么特别限制,数据流入系统到产生结果,可能经过很长时间。比如系统中的日志数据、电商中的每日用户访问浏览数据等。2.3 实时流式计算
将实时计算和流式数据结合起来,就是实时流式计算,也就是大数据中通常说的实时流处理。数据源源不断的产生的同时,计算时间上也有了严格的限制。比如,目前电商中的商品推荐,往往在你点了某个商品之后,推荐的商品都是变化的,也就是实时的计算出来推荐给你的。再比如你的手机号,在你话费或者流量快用完时,实时的给你推荐流量包套餐等。
二、实现步骤
1.引入依赖
!注意:需要与自己安装对应版本一致
2.日志收集服务器4.0.0 org.example ScalaProject1.0-SNAPSHOT jar 2.11.8 2.4.8 2.11 2.7.3 commons-io commons-io2.5 org.apache.spark spark-core_${spark.artifact.version}${spark.version} org.apache.hadoop hadoop-client${hadoop.version} org.scala-lang scala-library${scala.version} mysql mysql-connector-java5.1.6 org.apache.spark spark-sql_2.112.4.8 org.apache.spark spark-hive_2.11${spark.version} org.apache.spark spark-streaming_2.112.4.8 org.apache.spark spark-streaming-kafka-0-10_2.112.4.8 org.apache.kafka kafka-clients0.10.0.0 org.apache.flume flume-ng-core1.9.0 src/main/scala src/main/scala org.apache.maven.plugins maven-compiler-plugin3.8.1 1.8 org.apache.maven.plugins maven-assembly-pluginjar-with-dependencies net.alchim31.maven scala-maven-plugin4.5.4 compile testCompile ${scala.version}
配置flume动态收集特定的日志,collect.conf 配置如下:
# Name the components on this agent a1.sources = tailsource-1 a1.sinks = remotesink a1.channels = memoryChnanel-1 # Describe/configure the source a1.sources.tailsource-1.type = exec a1.sources.tailsource-1.command = tail -F /training/data/logs/my1.log a1.sources.tailsource-1.channels = memoryChnanel-1 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.memoryChnanel-1.type = memory a1.channels.memoryChnanel-1.keep-alive = 10 a1.channels.memoryChnanel-1.capacity = 100000 a1.channels.memoryChnanel-1.transactionCapacity = 100000 # Bind the source and sink to the channel a1.sinks.remotesink.type = avro a1.sinks.remotesink.hostname = hadoop02 a1.sinks.remotesink.port = 6666 a1.sinks.remotesink.channel = memoryChnanel-1
注意:需创建/training/data/logs/my1.log文件
启动日志收集端脚本:
3.日志接收服务器bin/flume-ng agent --conf conf --conf-file conf/collect.conf --name a1 -Dflume.root.logger=INFO,console
配置flume实时接收日志,kafka-flume.conf 配置如下:
#agent section producer.sources = s producer.channels = c producer.sinks = r #source section producer.sources.s.type = avro producer.sources.s.bind = hadoop02 producer.sources.s.port = 6666 producer.sources.s.channels = c # Each sink's type must be defined producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink producer.sinks.r.topic = flumetopic producer.sinks.r.brokerList = hadoop02:9092,hadoop03:9092,hadoop04:9092 producer.sinks.r.requiredAcks = 1 producer.sinks.r.batchSize = 20 producer.sinks.r.channel = c1 #Specify the channel the sink should use producer.sinks.r.channel = c # Each channel's type is defined. producer.channels.c.type = org.apache.flume.channel.kafka.KafkaChannel producer.channels.c.capacity = 10000 producer.channels.c.transactionCapacity = 1000 producer.channels.c.brokerList=hadoop02:9092,hadoop03:9092,hadoop04:9092 producer.channels.c.topic=channel1 producer.channels.c.zookeeperConnect=hadoop02:2181,hadoop03:2181,hadoop04:2181
启动接收端脚本:
bin/flume-ng agent --conf conf --conf-file conf/kafka-flume.conf --name producer -Dflume.root.logger=INFO,console
package sparkclass.sparkstreaming import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext import org.apache.log4j.Level import org.apache.log4j.Logger import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import java.sql.{Connection, DriverManager, PreparedStatement} object KafkaDataTest1 { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN); Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR); val conf = new SparkConf().setAppName("stocker").setMaster("local[2]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(3)) val sc1 = ssc.sparkContext ssc.checkpoint("hdfs://hadoop02:9000/spark/checkpoint") // Kafka configurations val topics = Set("flumetopic") val brokers = "hadoop02:9092,hadoop03:9092,hadoop04:9092" val kafkaParams = Map[String,Object]( "bootstrap.servers" -> "hadoop02:9092", //从那些broker消费数据 "key.deserializer" -> classOf[StringDeserializer], //发序列化的参数,因为写入kafka的数据经过序列化 "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "test-consumer-group", //指定group.id "auto.offset.reset" -> "latest",//指定消费的offset从哪里开始:① earliest:从头开始 ;② latest从消费者启动之后开始 "enable.auto.commit" -> (false: java.lang.Boolean) //是否自动提交偏移量 offset 。默认值就是true【5秒钟更新一次】, // true 消费者定期会更新偏移量 groupid,topic,parition -> offset ; // "false" 不让kafka自动维护偏移量 手动维护偏移 ) val kafkaStream = KafkaUtils.createDirectStream[String, String] (ssc,PreferConsistent,Subscribe[String,String](topics, kafkaParams)) val mapDStream: DStream[(String, String)] = kafkaStream.map(record => (record.key,record.value)) val urlClickLogPairsDStream: DStream[(String, Int)] = mapDStream.flatMap(_._2.split(" ")).map((_, 1)) val urlClickCountDaysDStream = urlClickLogPairsDStream.reduceByKeyAndWindow( (v1: Int, v2: Int) => { v1 + v2 }, Seconds(30), Seconds(3) ); //写入数据库 //定义函数用于累计每个单词出现的次数 val addWordFunction = (currentValues:Seq[Int],previousValueState:Option[Int])=>{ //通过spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和 val currentCount = currentValues.sum //已经进行累加的值 val previousCount = previousValueState.getOrElse(0) //返回累加后的结果,是一个Option[Int]类型 Some(currentCount+previousCount) } val result = urlClickLogPairsDStream.updateStateByKey(addWordFunction) //将DStream中的数据存储到mysql数据库中 result.foreachRDD( rdd=>{ val url = "jdbc:mysql://localhost:3306/hadoop?useUnicode=true&characterEncoding=UTF-8" val user = "root" val password = "LIUJIANG1313" Class.forName("com.mysql.jdbc.Driver").newInstance() //截断数据表,将数据表原有的数据进行删除 var conn1: Connection = DriverManager.getConnection(url,user,password) //此句是清空数据表 val sql1 = "truncate table word" var stmt1 : PreparedStatement = conn1.prepareStatement(sql1) stmt1.executeUpdate() conn1.close() rdd.foreach( data=>{ //将数据库数据更新为最新的RDD中的数据集 var conn2: Connection = DriverManager.getConnection(url,user,password) val sql2 = "insert into word(wordName,count) values(?,?)" var stmt2 : PreparedStatement = conn2.prepareStatement(sql2) stmt2.setString(1,data._1.toString) stmt2.setString(2,data._2.toString) stmt2.executeUpdate() conn2.close() }) }) urlClickCountDaysDStream.print(); ssc.start() ssc.awaitTermination() } }5、测试结果
往日志中依次追加三次日志
idea运行结果
查看数据库
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)