SparkStreaming读取kafka生产的数据,进行累计词频统计后将最新结果存入MySQL数据库

SparkStreaming读取kafka生产的数据,进行累计词频统计后将最新结果存入MySQL数据库,第1张

SparkStreaming读取kafka生产的数据,进行累计词频统计后将最新结果存入MySQL数据库

SparkStreaming读取kafka生产的数据,进行累计词频统计后将最新结果存入MySQL数据库
  • 一、环境准备
  • 二、环境启动
  • 三、编写程序
  • 四、进行测试

读取kafka数据,进行累计词频统计,将结果输出到mysql的数据表中!!!!
关于使用sparkstreaming读取kafka生产者生产的数据,并且将每一次输入的数据进行词频累计统计,然后将最终结果存储到MySQL数据库中。学习记录~

一、环境准备
  • hadoop集群
  • zookeeper
  • kafka
  • 在idea中添加依赖
	
        
            org.apache.spark
            spark-streaming-kafka-0-10_2.11
            2.1.1
        

正常启动后应该有如下进程:

二、环境启动
  • 启动hadoop集群
    start-all.sh
  • 启动zookeeper
    在zookeeper的bin目录下进行启动
    ./zkServer.sh start
  • 启动kafka服务
    在kafka的bin目录下进行启动
    ./kafka-server-start.sh ../config/server.properties
  • 启动kafka生产者
    kafka-console-producer.sh --broker-list ethan002:9092 --topic first
三、编写程序

在idea中编写SparkStreaming代码:

  • 注意点:
    • 在hdfs上添加检查点/spark/checkpoint
    • 编写SQL语言实现存储DStream中的数据
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

case class Word(
               wordName: String,
               count:Int
               )
object KafkaDemo {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaDemo")
    val streamingContext = new StreamingContext(conf,Seconds(5))
    val sc = streamingContext.sparkContext
    streamingContext.checkpoint("hdfs://ethan001:9000/spark/checkpoint")


    val kafkaParams = Map[String,Object](
      "bootstrap.servers" -> "ethan002:9092", //从那些broker消费数据
      "key.deserializer" -> classOf[StringDeserializer], //发序列化的参数,因为写入kafka的数据经过序列化
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream", //指定group.id
      "auto.offset.reset" -> "latest",//指定消费的offset从哪里开始:① earliest:从头开始  ;② latest从消费者启动之后开始
      "enable.auto.commit" -> (false: java.lang.Boolean)
      //是否自动提交偏移量 offset 。默认值就是true【5秒钟更新一次】,
      // true 消费者定期会更新偏移量 groupid,topic,parition -> offset ;
     // "false" 不让kafka自动维护偏移量     手动维护偏移
    )
    //    数组中存放的是在kafka中创建的topic
    val topics = Array("first", "t100")
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams) //订阅主题
    )

    val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value)) //转换格式

    //对从kafka生产的一次消息进行词频统计
    val resultRDD: DStream[(String, Int)] = mapDStream.flatMap(_._2.split(" ")).map((_, 1))//.reduceByKey(_ + _)

/
    //定义函数用于累计每个单词出现的次数
    val addWordFunction = (currentValues:Seq[Int],previousValueState:Option[Int])=>{
      //通过spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和
      val currentCount = currentValues.sum
      //已经进行累加的值
      val previousCount = previousValueState.getOrElse(0)
      //返回累加后的结果,是一个Option[Int]类型
      Some(currentCount+previousCount)
    }

    val result = resultRDD.updateStateByKey(addWordFunction)


    //将DStream中的数据存储到mysql数据库中
    result.foreachRDD(
      rdd=>{
        val url = "jdbc:mysql://localhost:3306/hadoop?useUnicode=true&characterEncoding=UTF-8"
        val user = "root"
        val password = "123456"
        Class.forName("com.mysql.jdbc.Driver").newInstance()
        //截断数据表,将数据表原有的数据进行删除
        var conn1: Connection = DriverManager.getConnection(url,user,password)
        val sql1 = "truncate table word"
        var stmt1 : PreparedStatement = conn1.prepareStatement(sql1)
        stmt1.executeUpdate()
        conn1.close()
      rdd.foreach(
        data=>{
          //将数据库数据更新为最新的RDD中的数据集
          var conn2: Connection = DriverManager.getConnection(url,user,password)
          val sql2 = "insert into word(wordName,count) values(?,?)"
          var stmt2 : PreparedStatement = conn2.prepareStatement(sql2)
          stmt2.setString(1,data._1.toString)
          stmt2.setString(2,data._2.toString)
          stmt2.executeUpdate()
          conn2.close()
      })
    })


    // 打印
    //resultRDD.print()

    result.print()

    // 启动
    streamingContext.start()

    // 等待计算采集器的执行
    streamingContext.awaitTermination()


  }
}



四、进行测试
  • 启动zookeeper
    bin/zkServer.sh start

  • 启动kafka
    bin/kafka-server-start.sh ./config/server.properties

  • 启动kafka的producer进程
    kafka-console-producer.sh --broker-list ethan002:9092 --topic first

  • 运行SparkStreaming程序

  • 在kafka的producer进程输入数据

    [root@ethan002 ~]# kafka-console-producer.sh --broker-list ethan002:9092 --topic first
    >hello world
    >hello world
    >world hello
    >hello world
    
  • 查看结果

    1. idea控制台中查看结果

    2. 在mysql中查看


本篇博客实现SparkStreaming读取kafka的数据进行累计词频统计,这里使用的是对mysql表的截断删除原有的数据表中的数,每次都反复的 *** 作数据库,严重影响了执行效率,如果您有其他的方法,希望您能分析分析

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5611006.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存