- 一、环境准备
- 二、环境启动
- 三、编写程序
- 四、进行测试
读取kafka数据,进行累计词频统计,将结果输出到mysql的数据表中!!!!
关于使用sparkstreaming读取kafka生产者生产的数据,并且将每一次输入的数据进行词频累计统计,然后将最终结果存储到MySQL数据库中。学习记录~ 一、环境准备
- hadoop集群
- zookeeper
- kafka
- 在idea中添加依赖
org.apache.spark spark-streaming-kafka-0-10_2.112.1.1
正常启动后应该有如下进程:
- 启动hadoop集群
start-all.sh - 启动zookeeper
在zookeeper的bin目录下进行启动
./zkServer.sh start - 启动kafka服务
在kafka的bin目录下进行启动
./kafka-server-start.sh ../config/server.properties - 启动kafka生产者
kafka-console-producer.sh --broker-list ethan002:9092 --topic first
在idea中编写SparkStreaming代码:
- 注意点:
- 在hdfs上添加检查点/spark/checkpoint
- 编写SQL语言实现存储DStream中的数据
import java.sql.{Connection, DriverManager, PreparedStatement} import java.util.Properties import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe case class Word( wordName: String, count:Int ) object KafkaDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaDemo") val streamingContext = new StreamingContext(conf,Seconds(5)) val sc = streamingContext.sparkContext streamingContext.checkpoint("hdfs://ethan001:9000/spark/checkpoint") val kafkaParams = Map[String,Object]( "bootstrap.servers" -> "ethan002:9092", //从那些broker消费数据 "key.deserializer" -> classOf[StringDeserializer], //发序列化的参数,因为写入kafka的数据经过序列化 "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", //指定group.id "auto.offset.reset" -> "latest",//指定消费的offset从哪里开始:① earliest:从头开始 ;② latest从消费者启动之后开始 "enable.auto.commit" -> (false: java.lang.Boolean) //是否自动提交偏移量 offset 。默认值就是true【5秒钟更新一次】, // true 消费者定期会更新偏移量 groupid,topic,parition -> offset ; // "false" 不让kafka自动维护偏移量 手动维护偏移 ) // 数组中存放的是在kafka中创建的topic val topics = Array("first", "t100") val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams) //订阅主题 ) val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value)) //转换格式 //对从kafka生产的一次消息进行词频统计 val resultRDD: DStream[(String, Int)] = mapDStream.flatMap(_._2.split(" ")).map((_, 1))//.reduceByKey(_ + _) / //定义函数用于累计每个单词出现的次数 val addWordFunction = (currentValues:Seq[Int],previousValueState:Option[Int])=>{ //通过spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和 val currentCount = currentValues.sum //已经进行累加的值 val previousCount = previousValueState.getOrElse(0) //返回累加后的结果,是一个Option[Int]类型 Some(currentCount+previousCount) } val result = resultRDD.updateStateByKey(addWordFunction) //将DStream中的数据存储到mysql数据库中 result.foreachRDD( rdd=>{ val url = "jdbc:mysql://localhost:3306/hadoop?useUnicode=true&characterEncoding=UTF-8" val user = "root" val password = "123456" Class.forName("com.mysql.jdbc.Driver").newInstance() //截断数据表,将数据表原有的数据进行删除 var conn1: Connection = DriverManager.getConnection(url,user,password) val sql1 = "truncate table word" var stmt1 : PreparedStatement = conn1.prepareStatement(sql1) stmt1.executeUpdate() conn1.close() rdd.foreach( data=>{ //将数据库数据更新为最新的RDD中的数据集 var conn2: Connection = DriverManager.getConnection(url,user,password) val sql2 = "insert into word(wordName,count) values(?,?)" var stmt2 : PreparedStatement = conn2.prepareStatement(sql2) stmt2.setString(1,data._1.toString) stmt2.setString(2,data._2.toString) stmt2.executeUpdate() conn2.close() }) }) // 打印 //resultRDD.print() result.print() // 启动 streamingContext.start() // 等待计算采集器的执行 streamingContext.awaitTermination() } }四、进行测试
-
启动zookeeper
bin/zkServer.sh start -
启动kafka
bin/kafka-server-start.sh ./config/server.properties
-
启动kafka的producer进程
kafka-console-producer.sh --broker-list ethan002:9092 --topic first
-
运行SparkStreaming程序
-
在kafka的producer进程输入数据
[root@ethan002 ~]# kafka-console-producer.sh --broker-list ethan002:9092 --topic first >hello world >hello world >world hello >hello world
-
查看结果
-
idea控制台中查看结果
-
在mysql中查看
-
本篇博客实现SparkStreaming读取kafka的数据进行累计词频统计,这里使用的是对mysql表的截断删除原有的数据表中的数,每次都反复的 *** 作数据库,严重影响了执行效率,如果您有其他的方法,希望您能分析分析
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)