- 一、前提说明
- 二、实现步骤
- 安装了Flume
- 本案例实现流程图:
- 本案例实现的功能是:实现wordcount功能,并将每次的分析结果保存到数据库中
- 在MySQL创建top表,就只有两个字段:key和value
- 在pom.xml中确保已经添加了MySQL数据库的驱动
- 编写如下代码
import java.net.InetAddress import java.sql.DriverManager import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext} case class Count(key:String,value:Int) object FlumeDemoMySQL { def main(args: Array[String]): Unit = { //创建一个Context对象: StreamingContext (SparkContext, SQLContext) val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]") val ssc = new StreamingContext(conf,Seconds(5)) // 创建FlumeStream, 特别注意,hostname是windows上的虚拟网卡net8的ip val flumeDStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, "niit01", 8888) // 读取数据进行处理,即进行transform变换 val flatMapDStream: DStream[String] = flumeDStream.flatMap(e => { val body = e.event.getBody val str = new String(body.array()) val strings = str.split(" ") strings }) val mapDStream = flatMapDStream.map((_, 1)) val spark = SparkSession.builder().getOrCreate() // 引入spark隐式转换函数 import spark.implicits._ val transformDStream: DStream[Count] = mapDStream.transform(t => { t.map(x => Count(x._1, x._2)) }) // 打印结果 //transformDStream.print() transformDStream.foreachRDD(c => { val dataframe = c.toDF() dataframe.createOrReplaceTempView("top") spark.sql("select key,sum(value) as total from top group by key order by total desc") .foreachPartition(x=>{ //x.foreach(println) val connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/niit?characterEncoding=UTF-8", "root", "123456") val preparedStatement = connection.prepareStatement("insert into realtime values(?,?)") x.foreach(r => { val key = r.get(0).toString val value = r.get(1).toString.toInt println(r.toString()) println("key: " + key + ", value: " + value) val address = InetAddress.getLocalHost.getHostAddress val hostName = InetAddress.getLocalHost.getHostName val threadId = Thread.currentThread().getId val threadName = Thread.currentThread().getName println("HostAddress: " + address + ", HostName: " + hostName + ",threadId: " + threadId + ",threadName: " + threadName) preparedStatement.setString(1,key.toString) preparedStatement.setInt(2,value.toString.toInt) preparedStatement.executeUpdate() }) if (preparedStatement != null) preparedStatement.close() if (connection != null) connection.close() }) }) // 开启 ssc.start() ssc.awaitTermination() // 关闭资源 spark.stop() } }
- 启动程序
- 配置Flume,配置文件内容可参考:Flume之Pull模式
- 编写测试数据到某个文件,保存文件后,将该文件复制到Flume配置文件中所指定的路径
- 查看IDEA程序控制台的结果
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)