Flink(scala)整合MySQL统计UV(unique visitor)

Flink(scala)整合MySQL统计UV(unique visitor),第1张

Flink(scala)整合MySQL统计UV(unique visitor)

数据源是尚硅谷的课件, 需要的话可以私信我

核心代码

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector

import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}
import java.text.SimpleDateFormat
import java.util.Properties



// 每条数据

// 输入样例类
case class UVItem(url: String, ip:String, timestamp: Long)
// 基于WindowEnd分组的样例类
case class UVWindowEnd(url: String, WindowEnd: Long, Count: Long)
// 目标 每五分钟统计这个1小时的每个页面的UV值
object UniqueVisitor {
  def main(args: Array[String]): Unit = {
    // 创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置时间特性为事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // kafka消费数据
    
    // 读取resource的数据文件
    val inputStream = env.readTextFile(getClass.getResource("/apache.log").getPath)
    // 将每行数据用空格切割后 封装成样例类 数据乱序 并指定时间戳 设置Watermark为 30秒
    val dataStream = inputStream
      .map(data=>{
        val arr = data.split(" ")
        val timestamp = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss").parse(arr(3)).getTime
        // (url: String, ip:String, timestamp: Long)
        UVItem(arr(6), arr(0), timestamp)
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[UVItem](Time.seconds(30)) {
      override def extractTimestamp(t: UVItem): Long = t.timestamp
    })

    dataStream
      .keyBy(_.url) // url作为key进行分组
      .timeWindow(Time.hours(1), Time.minutes(5)) // 开滚动窗口 长度1小时 步长5分钟
      .process(new CountUVProcess()) // 自定义类继承ProcessWindowFunction 对每个url进行统计 (url: String, WindowEnd: Long, Count: Long)
      .keyBy(_.WindowEnd) // 窗口结束时间作为key进行分组
      .process(new windowEndProcess()) // 对每个窗口的数据包装成要存到MySQL的元组 (Long, String, Long)(窗口结束时间, ip, 访问次数)
      .addSink(new JDBCSink()) // 往MySQL插入数据

    env.execute()
  }
}
// 自定义RichSinkFunction往MySQL插入数据
class JDBCSink extends RichSinkFunction[(Long, String, Long)]{
  // 定义连接和预处理器
  var conn:Connection = _
  var insertStatement: PreparedStatement = _

  // 在open函数初始化连接和预编译器
  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/pv_uv", "root", "123456")
    insertStatement = conn.prepareStatement("insert into unique_visitor value(?, ? ,?)")
  }
  // 在close函数关闭连接和预编译器
  override def close(): Unit = {
    conn.close()
    insertStatement.close()
  }
  // 在invoke函数指定预处理器的数据和执行插入语句
  override def invoke(value: (Long, String, Long), context: SinkFunction.Context[_]): Unit = {
    // 指定预编译器的数据
    insertStatement.setTimestamp(1, new Timestamp(value._1))
    insertStatement.setString(2, value._2)
    insertStatement.setInt(3, value._3.toInt)
    // 执行预编译器
    insertStatement.execute()
  }
}

// 基于WindowEnd分组后 在该Process中返回要插入数据库的元祖Tuple
class windowEndProcess() extends KeyedProcessFunction[Long, UVWindowEnd, (Long, String, Long)]{
  override def processElement(i: UVWindowEnd, context: KeyedProcessFunction[Long, UVWindowEnd, (Long, String, Long)]#Context, collector: Collector[(Long, String, Long)]): Unit = {
    // 返回(窗口结束时间, 页面路径, 访问次数)
    collector.collect((i.WindowEnd, i.url, i.Count))
  }
}

// 基于url分组并开窗后 在该Process中统计UV值
class CountUVProcess() extends ProcessWindowFunction[UVItem, UVWindowEnd, String, TimeWindow]{
  override def process(key: String, context: Context, elements: Iterable[UVItem], out: Collector[UVWindowEnd]): Unit = {
    // 用Set集合可以去重的特性 一个ip计为一次访问
    var userIpSet = Set[String]()
    for(item <- elements){
      userIpSet += item.ip
    }
    // 返回(访问的url, 窗口结束时间, 访问次数)
    out.collect(UVWindowEnd(key, context.window.getEnd, userIpSet.size))
  }
}

MySQL创建表

插入数据后

依赖


        
            org.apache.flink
            flink-connector-kafka_2.12
            1.10.1
        
        
            org.apache.flink
            flink-scala_2.11
            1.10.2
        
        
            org.apache.flink
            flink-streaming-scala_2.11
            1.10.2
        
        
            org.apache.flink
            flink-connector-kafka-0.11_2.11
            1.10.2
        
        
            org.apache.bahir
            flink-connector-redis_2.11
            1.0
        
        
            mysql
            mysql-connector-java
            8.0.25
        
        
            org.apache.flink
            flink-statebackend-rocksdb_2.12
            1.10.1
        
        
            org.apache.flink
            flink-table-planner_2.11
            1.10.1
        
        
            org.apache.flink
            flink-table-planner-blink_2.11
            1.10.1
        
        
            org.apache.flink
            flink-table-api-scala-bridge_2.11
            1.10.1
        
        
            org.apache.flink
            flink-csv
            1.10.1
        

    
    
        
            
                net.alchim31.maven
                scala-maven-plugin
                3.4.6
                
                    
                        
                            compile
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-assembly-plugin
                3.0.0
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5695806.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存