sparkstreaming+flume+kafka实时流式处理完整流程

sparkstreaming+flume+kafka实时流式处理完整流程,第1张

sparkstreaming+flume+kafka实时流式处理完整流程

目录

sparkstreaming+flume+kafka实时流式处理完整流程

一、前期准备

二、实现步骤

1.引入依赖

2.日志收集服务器

3.日志接收服务器

4、spark集群处理接收数据并写入数据库

5、测试结果


sparkstreaming+flume+kafka实时流式处理完整流程 一、前期准备

1、环境准备,四台测试服务器

spark集群三台,hadoop02,hadoop03,hadoop04

kafka集群三台,hadoop02,hadoop03,hadoop04

zookeeper集群三台,hadoop02,hadoop03,hadoop04

日志接收服务器, hadoop02

日志收集流程:

日志收集服务器->日志接收服务器->kafka集群->spark集群处理

说明: 日志收集服务器,在实际生产中很有可能是应用系统服务器,日志接收服务器为大数据服务器中一台,日志通过网络传输到日志接收服务器,再入集群处理。

因为,生产环境中,往往网络只是单向开放给某台服务器的某个端口访问的。


Flume版本:apache-flume-1.9.0-bin.tar.gz,该版本已经较好地集成了对kafka的支持

2.1 实时计算
跟实时系统类似(能在严格的时间限制内响应请求的系统),例如在股票交易中,市场数据瞬息万变,决策通常需要秒级甚至毫秒级。通俗来说,就是一个任务需要在非常短的单位时间内计算出来,这个计算通常是多次的。

2.2 流式计算
通常指源源不断的数据流过系统,系统能够不停地连续计算。这里对时间上可能没什么特别限制,数据流入系统到产生结果,可能经过很长时间。比如系统中的日志数据、电商中的每日用户访问浏览数据等。

2.3 实时流式计算
将实时计算和流式数据结合起来,就是实时流式计算,也就是大数据中通常说的实时流处理。数据源源不断的产生的同时,计算时间上也有了严格的限制。比如,目前电商中的商品推荐,往往在你点了某个商品之后,推荐的商品都是变化的,也就是实时的计算出来推荐给你的。再比如你的手机号,在你话费或者流量快用完时,实时的给你推荐流量包套餐等。
 



二、实现步骤

1.引入依赖

!注意:需要与自己安装对应版本一致



    4.0.0

    org.example
    ScalaProject
    1.0-SNAPSHOT

    jar

    
        2.11.8
        2.4.8
        2.11
        2.7.3
    

    
        
            commons-io
            commons-io
            2.5
        

        
            org.apache.spark
            spark-core_${spark.artifact.version}
            ${spark.version}
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
        

        
            org.scala-lang
            scala-library
            ${scala.version}
        

        
            mysql
            mysql-connector-java
            5.1.6
        
        
            org.apache.spark
            spark-sql_2.11
            2.4.8
        
        
            org.apache.spark
            spark-hive_2.11
            ${spark.version}
            
        

        
            org.apache.spark
            spark-streaming_2.11
            2.4.8
        
        
            org.apache.spark
            spark-streaming-kafka-0-10_2.11
            2.4.8
        
        
            org.apache.kafka
            kafka-clients
            0.10.0.0
        
        
            org.apache.flume
            flume-ng-core
            1.9.0
        

    

    
        
        src/main/scala
        src/main/scala

        
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.8.1
                
                    1.8
                    1.8
                
            
            
                org.apache.maven.plugins
                maven-assembly-plugin
                
                    
                        jar-with-dependencies
                    
                
            
            
            
                net.alchim31.maven
                scala-maven-plugin
                4.5.4
                
                    
                        
                            compile
                            testCompile
                        
                    
                
                
                    ${scala.version}
                
            
        
    




2.日志收集服务器

配置flume动态收集特定的日志,collect.conf  配置如下:

# Name the components on this agent
a1.sources = tailsource-1
a1.sinks = remotesink
a1.channels = memoryChnanel-1
 
# Describe/configure the source
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.command = tail -F /training/data/logs/my1.log
 
a1.sources.tailsource-1.channels = memoryChnanel-1
 
# Describe the sink
a1.sinks.k1.type = logger
 
# Use a channel which buffers events in memory
a1.channels.memoryChnanel-1.type = memory
a1.channels.memoryChnanel-1.keep-alive = 10
a1.channels.memoryChnanel-1.capacity = 100000
a1.channels.memoryChnanel-1.transactionCapacity = 100000
 
# Bind the source and sink to the channel
a1.sinks.remotesink.type = avro
a1.sinks.remotesink.hostname = hadoop02
a1.sinks.remotesink.port = 6666
a1.sinks.remotesink.channel = memoryChnanel-1

注意:需创建/training/data/logs/my1.log文件

启动日志收集端脚本:

bin/flume-ng agent --conf conf --conf-file conf/collect.conf --name a1 -Dflume.root.logger=INFO,console

3.日志接收服务器

配置flume实时接收日志,kafka-flume.conf  配置如下:

#agent section  
producer.sources = s  
producer.channels = c  
producer.sinks = r  
  
#source section  
producer.sources.s.type = avro
producer.sources.s.bind = hadoop02
producer.sources.s.port = 6666
 
producer.sources.s.channels = c  
  
# Each sink's type must be defined  
producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.r.topic = flumetopic
producer.sinks.r.brokerList = hadoop02:9092,hadoop03:9092,hadoop04:9092
producer.sinks.r.requiredAcks = 1
producer.sinks.r.batchSize = 20
producer.sinks.r.channel = c1
 
#Specify the channel the sink should use  
producer.sinks.r.channel = c  
  
# Each channel's type is defined.  
producer.channels.c.type   = org.apache.flume.channel.kafka.KafkaChannel
producer.channels.c.capacity = 10000
producer.channels.c.transactionCapacity = 1000
producer.channels.c.brokerList=hadoop02:9092,hadoop03:9092,hadoop04:9092
producer.channels.c.topic=channel1
producer.channels.c.zookeeperConnect=hadoop02:2181,hadoop03:2181,hadoop04:2181

启动接收端脚本:

bin/flume-ng agent --conf conf --conf-file conf/kafka-flume.conf --name producer -Dflume.root.logger=INFO,console

4、spark集群处理接收数据并写入数据库
package sparkclass.sparkstreaming

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import java.sql.{Connection, DriverManager, PreparedStatement}


object KafkaDataTest1 {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);

    val conf = new SparkConf().setAppName("stocker").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(3))

    val sc1 = ssc.sparkContext
    ssc.checkpoint("hdfs://hadoop02:9000/spark/checkpoint")

    // Kafka configurations

    val topics = Set("flumetopic")

    val brokers = "hadoop02:9092,hadoop03:9092,hadoop04:9092"

    val kafkaParams = Map[String,Object](
          "bootstrap.servers" -> "hadoop02:9092", //从那些broker消费数据
          "key.deserializer" -> classOf[StringDeserializer], //发序列化的参数,因为写入kafka的数据经过序列化
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "test-consumer-group", //指定group.id
          "auto.offset.reset" -> "latest",//指定消费的offset从哪里开始:① earliest:从头开始  ;② latest从消费者启动之后开始
          "enable.auto.commit" -> (false: java.lang.Boolean)
          //是否自动提交偏移量 offset 。默认值就是true【5秒钟更新一次】,
          // true 消费者定期会更新偏移量 groupid,topic,parition -> offset ;
          // "false" 不让kafka自动维护偏移量     手动维护偏移
        )
              
    val kafkaStream = KafkaUtils.createDirectStream[String, String] 
    (ssc,PreferConsistent,Subscribe[String,String](topics, kafkaParams))

    val mapDStream: DStream[(String, String)] = kafkaStream.map(record =>             
    (record.key,record.value))
    val urlClickLogPairsDStream: DStream[(String, Int)] = 
    mapDStream.flatMap(_._2.split(" ")).map((_, 1))

    val urlClickCountDaysDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
      (v1: Int, v2: Int) => {
        v1 + v2
      },
      Seconds(30),
      Seconds(3)
    );

    //写入数据库
    //定义函数用于累计每个单词出现的次数
    val addWordFunction = (currentValues:Seq[Int],previousValueState:Option[Int])=>{
      //通过spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和
      val currentCount = currentValues.sum
      //已经进行累加的值
      val previousCount = previousValueState.getOrElse(0)
      //返回累加后的结果,是一个Option[Int]类型
      Some(currentCount+previousCount)
    }

    val result = urlClickLogPairsDStream.updateStateByKey(addWordFunction)


    //将DStream中的数据存储到mysql数据库中
    result.foreachRDD(
      rdd=>{
        val url = "jdbc:mysql://localhost:3306/hadoop?useUnicode=true&characterEncoding=UTF-8"
        val user = "root"
        val password = "LIUJIANG1313"
        Class.forName("com.mysql.jdbc.Driver").newInstance()
        //截断数据表,将数据表原有的数据进行删除
        var conn1: Connection = DriverManager.getConnection(url,user,password)
        //此句是清空数据表
        val sql1 = "truncate table word"
        var stmt1 : PreparedStatement = conn1.prepareStatement(sql1)
        stmt1.executeUpdate()
        conn1.close()
        rdd.foreach(
          data=>{
            //将数据库数据更新为最新的RDD中的数据集
            var conn2: Connection = DriverManager.getConnection(url,user,password)
            val sql2 = "insert into word(wordName,count) values(?,?)"
            var stmt2 : PreparedStatement = conn2.prepareStatement(sql2)
            stmt2.setString(1,data._1.toString)
            stmt2.setString(2,data._2.toString)
            stmt2.executeUpdate()
            conn2.close()
          })
      })

    urlClickCountDaysDStream.print();

    ssc.start()
    ssc.awaitTermination()
  }
}

5、测试结果

往日志中依次追加三次日志 

idea运行结果

查看数据库

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5654471.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存