package com. gu.networkflow_analysis
import java.sql.Timestamp
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
// 输入数据样例类
case class ApacheLogEvent( ip: String, userId: String, eventTime: Long, method: String, url: String)
// 窗口聚合结果样例类
case class UrlViewCount( url: String, windowEnd: Long, count: Long )
object NetworkFlow {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// val dataStream = env.readTextFile(“D:ProjectsBigDataUserBehaviorAnalysisNetworkFlowAnalysissrcmainresourcesapache.log”)
val dataStream = env.socketTextStream(“localhost”, 7777)
.map( data => {
val dataArray = data.split(" ")
// 定义时间转换
val simpleDateFormat = new SimpleDateFormat(“dd/MM/yyyy:HH:mm:ss”)
val timestamp = simpleDateFormat.parse(dataArray(3).trim).getTime
ApacheLogEvent( dataArray(0).trim, dataArray(1).trim, timestamp, dataArray(5).trim, dataArray(6).trim )
} )
.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractorApacheLogEvent {
override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime
} )
.keyBy(_.url)
.timeWindow(Time.minutes(10), Time.seconds(5))
.allowedLateness(Time.seconds(60))
.aggregate( new CountAgg(), new WindowResult() )
val processedStream = dataStream .keyBy(_.windowEnd) .process( new TopNHotUrls(5) ) dataStream.print("aggregate") processedStream.print("process") env.execute("network flow job")
}
}
// 自定义预聚合函数
class CountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long]{
override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator + 1
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
// 自定义窗口处理函数
class WindowResult() extends WindowFunction[Long, UrlViewCount, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
out.collect( UrlViewCount( key, window.getEnd, input.iterator.next() ) )
}
}
// 自定义排序输出处理函数
class TopNHotUrls(topSize: Int) extends KeyedProcessFunction[Long, UrlViewCount, String]{
lazy val urlState: MapState[String, Long] = getRuntimeContext.getMapState( new MapStateDescriptor[String, Long](“url-state”, classOf[String], classOf[Long] ) )
override def processElement(value: UrlViewCount, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#Context, out: Collector[String]): Unit = {
urlState.put(value.url, value.count)
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
// 从状态中拿到数据
val allUrlViews: ListBuffer[(String, Long)] = new ListBuffer(String, Long)
val iter = urlState.entries().iterator()
while(iter.hasNext){
val entry = iter.next()
allUrlViews += (( entry.getKey, entry.getValue ))
}
// urlState.clear()
val sortedUrlViews = allUrlViews.sortWith(_._2 > _._2).take(topSize) // 格式化结果输出 val result: StringBuilder = new StringBuilder() result.append("时间:").append( new Timestamp( timestamp - 1 ) ).append("n") for( i <- sortedUrlViews.indices ){ val currentUrlView = sortedUrlViews(i) result.append("NO").append(i + 1).append(":") .append(" URL=").append(currentUrlView._1) .append(" 访问量=").append(currentUrlView._2).append("n") } result.append("=============================") Thread.sleep(1000) out.collect(result.toString())
}
}
package com. gu.networkflow_analysis
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
// 定义输入数据的样例类
case class UserBehavior( userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long )
object PageView {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 用相对路径定义数据源 val resource = getClass.getResource("/UserBehavior.csv") val dataStream = env.readTextFile(resource.getPath) .map( data => { val dataArray = data.split(",") UserBehavior( dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong ) } ) .assignAscendingTimestamps(_.timestamp * 1000L) .filter( _.behavior == "pv" ) // 只统计pv *** 作 .map( data => ("pv", 1) ) .keyBy(_._1) .timeWindow(Time.hours(1)) .sum(1) dataStream.print("pv count") env.execute("page view jpb")
}
}
package com. gu.networkflow_analysis
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
case class UvCount( windowEnd: Long, uvCount: Long )
object UniqueVisitor {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 用相对路径定义数据源 val resource = getClass.getResource("/UserBehavior.csv") val dataStream = env.readTextFile(resource.getPath) .map( data => { val dataArray = data.split(",") UserBehavior( dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong ) } ) .assignAscendingTimestamps(_.timestamp * 1000L) .filter( _.behavior == "pv" ) // 只统计pv *** 作 .timeWindowAll( Time.hours(1) ) .apply( new UvCountByWindow() ) dataStream.print() env.execute("uv job")
}
}
class UvCountByWindow() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow]{
override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
// 定义一个scala set,用于保存所有的数据userId并去重
var idSet = SetLong
// 把当前窗口所有数据的ID收集到set中,最后输出set的大小
for( userBehavior <- input ){
idSet += userBehavior.userId
}
out.collect( UvCount( window.getEnd, idSet.size ) )
}
}
package com. gu.networkflow_analysis
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis
object UvWithBloom {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 用相对路径定义数据源
val resource = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile(resource.getPath)
.map(data => {
val dataArray = data.split(",")
UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
})
.assignAscendingTimestamps(.timestamp * 1000L)
.filter(.behavior == “pv”) // 只统计pv *** 作
.map(data => (“dummyKey”, data.userId))
.keyBy(_._1)
.timeWindow(Time.hours(1))
.trigger(new MyTrigger())
.process(new UvCountWithBloom())
dataStream.print()
env.execute(“uv with bloom job”)
}
}
// 自定义窗口触发器
class MyTrigger() extends Trigger[(String, Long), TimeWindow] {
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
// 每来一条数据,就直接触发窗口 *** 作,并清空所有窗口状态
TriggerResult.FIRE_AND_PURGE
}
}
// 定义一个布隆过滤器
class Bloom(size: Long) extends Serializable {
// 位图的总大小,默认16M
private val cap = if (size > 0) size else 1 << 27
// 定义hash函数
def hash(value: String, seed: Int): Long = {
var result = 0L
for( i <- 0 until value.length ){
result = result * seed + value.charAt(i)
}
result & ( cap - 1 )
}
}
class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{
// 定义redis连接
lazy val jedis = new Jedis(“localhost”, 6379)
lazy val bloom = new Bloom(1<<29)
override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
// 位图的存储方式,key是windowEnd,value是bitmap
val storeKey = context.window.getEnd.toString
var count = 0L
// 把每个窗口的uv count值也存入名为count的redis表,存放内容为(windowEnd -> uvCount),所以要先从redis中读取
if( jedis.hget(“count”, storeKey) != null ){
count = jedis.hget(“count”, storeKey).toLong
}
// 用布隆过滤器判断当前用户是否已经存在
val userId = elements.last._2.toString
val offset = bloom.hash(userId, 61)
// 定义一个标识位,判断reids位图中有没有这一位
val isExist = jedis.getbit(storeKey, offset)
if(!isExist){
// 如果不存在,位图对应位置1,count + 1
jedis.setbit(storeKey, offset, true)
jedis.hset(“count”, storeKey, (count + 1).toString)
out.collect( UvCount(storeKey.toLong, count + 1) )
} else {
out.collect( UvCount(storeKey.toLong, count) )
}
}
}
package com. gu.marketanalysis
import java.sql.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
// 输入的广告点击事件样例类
case class AdClickEvent( userId: Long, adId: Long, province: String, city: String, timestamp: Long )
// 按照省份统计的输出结果样例类
case class CountByProvince( windowEnd: String, province: String, count: Long )
// 输出的黑名单报警信息
case class BlackListWarning( userId: Long, adId: Long, msg: String )
object AdStatisticsByGeo {
// 定义侧输出流的tag
val blackListOutputTag: OutputTag[BlackListWarning] = new OutputTagBlackListWarning
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 读取数据并转换成AdClickEvent val resource = getClass.getResource("/AdClickLog.csv") val adEventStream = env.readTextFile(resource.getPath) .map( data => { val dataArray = data.split(",") AdClickEvent( dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim, dataArray(3).trim, dataArray(4).trim.toLong ) } ) .assignAscendingTimestamps(_.timestamp * 1000L) // 自定义process function,过滤大量刷点击的行为 val filterBlackListStream = adEventStream .keyBy( data => (data.userId, data.adId) ) .process( new FilterBlackListUser(100) ) // 根据省份做分组,开窗聚合 val adCountStream = filterBlackListStream .keyBy(_.province) .timeWindow( Time.hours(1), Time.seconds(5) ) .aggregate( new AdCountAgg(), new AdCountResult() ) adCountStream.print("count") filterBlackListStream.getSideOutput(blackListOutputTag).print("blacklist") env.execute("ad statistics job")
}
class FilterBlackListUser(maxCount: Int) extends KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]{
// 定义状态,保存当前用户对当前广告的点击量
lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long](“count-state”, classOf[Long]))
// 保存是否发送过黑名单的状态
lazy val isSentBlackList: ValueState[Boolean] = getRuntimeContext.getState( new ValueStateDescriptor[Boolean](“issent-state”, classOf[Boolean]) )
// 保存定时器触发的时间戳
lazy val resetTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long](“resettime-state”, classOf[Long]) )
override def processElement(value: AdClickEvent, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#Context, out: Collector[AdClickEvent]): Unit = { // 取出count状态 val curCount = countState.value() // 如果是第一次处理,注册定时器,每天00:00触发 if( curCount == 0 ){ val ts = ( ctx.timerService().currentProcessingTime()/(1000*60*60*24) + 1) * (1000*60*60*24) resetTimer.update(ts) ctx.timerService().registerProcessingTimeTimer(ts) } // 判断计数是否达到上限,如果到达则加入黑名单 if( curCount >= maxCount ){ // 判断是否发送过黑名单,只发送一次 if( !isSentBlackList.value() ){ isSentBlackList.update(true) // 输出到侧输出流 ctx.output( blackListOutputTag, BlackListWarning(value.userId, value.adId, "Click over " + maxCount + " times today.") ) } return } // 计数状态加1,输出数据到主流 countState.update( curCount + 1 ) out.collect( value ) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#OnTimerContext, out: Collector[AdClickEvent]): Unit = { // 定时器触发时,清空状态 if( timestamp == resetTimer.value() ){ isSentBlackList.clear() countState.clear() resetTimer.clear() } }
}
}
// 自定义预聚合函数
class AdCountAgg() extends AggregateFunction[AdClickEvent, Long, Long]{
override def add(value: AdClickEvent, accumulator: Long): Long = accumulator + 1
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
// 自定义窗口处理函数
class AdCountResult() extends WindowFunction[Long, CountByProvince, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[CountByProvince]): Unit = {
out.collect( CountByProvince( new Timestamp(window.getEnd).toString, key, input.iterator.next() ) )
}
}
package com. gu.marketanalysis
import java.sql.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object AppMarketing {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.addSource( new SimulatedEventSource() )
.assignAscendingTimestamps(_.timestamp)
.filter( .behavior != “UNINSTALL” )
.map( data => {
( “dummyKey”, 1L )
} )
.keyBy(._1) // 以渠道和行为类型作为key分组
.timeWindow( Time.hours(1), Time.seconds(10) )
.aggregate( new CountAgg(), new MarketingCountTotal() )
dataStream.print()
env.execute(“app marketing job”)
}
}
class CountAgg() extends AggregateFunction[(String, Long), Long, Long]{
override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
class MarketingCountTotal() extends WindowFunction[Long, MarketingViewCount, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[MarketingViewCount]): Unit = {
val startTs = new Timestamp(window.getStart).toString
val endTs = new Timestamp(window.getEnd).toString
val count = input.iterator.next()
out.collect( MarketingViewCount(startTs, endTs, “app marketing”, “total”, count) )
}
}
package com. gu.marketanalysis
import java.sql.Timestamp
import java.util.UUID
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.util.Random
// 输入数据样例类
case class MarketingUserBehavior( userId: String, behavior: String, channel: String, timestamp: Long )
// 输出结果样例类
case class MarketingViewCount( windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long )
object AppMarketingByChannel {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.addSource( new SimulatedEventSource() ) .assignAscendingTimestamps(_.timestamp) .filter( _.behavior != "UNINSTALL" ) .map( data => { ( (data.channel, data.behavior), 1L ) } ) .keyBy(_._1) // 以渠道和行为类型作为key分组 .timeWindow( Time.hours(1), Time.seconds(10) ) .process( new MarketingCountByChannel() ) dataStream.print() env.execute("app marketing by channel job")
}
}
// 自定义数据源
class SimulatedEventSource() extends RichSourceFunction[MarketingUserBehavior]{
// 定义是否运行的标识位
var running = true
// 定义用户行为的集合
val behaviorTypes: Seq[String] = Seq(“CLICK”, “DOWNLOAD”, “INSTALL”, “UNINSTALL”)
// 定义渠道的集合
val channelSets: Seq[String] = Seq(“wechat”, “weibo”, “appstore”, “huaweistore”)
// 定义一个随机数发生器
val rand: Random = new Random()
override def cancel(): Unit = running = false
override def run(ctx: SourceFunction.SourceContext[MarketingUserBehavior]): Unit = {
// 定义一个生成数据的上限
val maxElements = Long.MaxValue
var count = 0L
// 随机生成所有数据 while( running && count < maxElements ){ val id = UUID.randomUUID().toString val behavior = behaviorTypes(rand.nextInt(behaviorTypes.size)) val channel = channelSets(rand.nextInt(channelSets.size)) val ts = System.currentTimeMillis() ctx.collect( MarketingUserBehavior( id, behavior, channel, ts ) ) count += 1 TimeUnit.MILLISECONDS.sleep(10L) }
}
}
// 自定义处理函数
class MarketingCountByChannel() extends ProcessWindowFunction[((String, String), Long), MarketingViewCount, (String, String), TimeWindow]{
override def process(key: (String, String), context: Context, elements: Iterable[((String, String), Long)], out: Collector[MarketingViewCount]): Unit = {
val startTs = new Timestamp(context.window.getStart).toString
val endTs = new Timestamp(context.window.getEnd).toString
val channel = key._1
val behavior = key._2
val count = elements.size
out.collect( MarketingViewCount(startTs, endTs, channel, behavior, count) )
}
}
package com. gu.apitest.sinktest
import java.util
import com. gu.apitest.SensorReading
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
object EsSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// source
val inputStream = env.readTextFile(“D:ProjectsBigDataFlinkTutorialsrcmainresourcessensor.txt”)
// transform
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
}
)
val httpHosts = new util.ArrayListHttpHost
httpHosts.add(new HttpHost(“localhost”, 9200))
// 创建一个esSink 的builder
val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
httpHosts,
new ElasticsearchSinkFunction[SensorReading] {
override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
println("saving data: " + element)
// 包装成一个Map或者JsonObject
val json = new util.HashMapString, String
json.put(“sensor_id”, element.id)
json.put(“temperature”, element.temperature.toString)
json.put(“ts”, element.timestamp.toString)
// 创建index request,准备发送数据 val indexRequest = Requests.indexRequest() .index("sensor") .`type`("readingdata") .source(json) // 利用index发送请求,写入数据 indexer.add(indexRequest) println("data saved.") }
}
)
// sink
dataStream.addSink( esSinkBuilder.build() )
env.execute(“es sink test”)
}
}
package com. gu.apitest.sinktest
import java.sql.{Connection, DriverManager, PreparedStatement}
import com. gu.apitest.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
object JdbcSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// source
val inputStream = env.readTextFile(“D:ProjectsBigDataFlinkTutorialsrcmainresourcessensor.txt”)
// transform
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
// sink
dataStream.addSink( new MyJdbcSink() )
env.execute(“jdbc sink test”)
}
}
class MyJdbcSink() extends RichSinkFunction[SensorReading]{
// 定义sql连接、预编译器
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
// 初始化,创建连接和预编译语句
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection(“jdbc:mysql://localhost:3306/test”, “root”, “123456”)
insertStmt = conn.prepareStatement(“INSERT INTO temperatures (sensor, temp) VALUES (?,?)”)
updateStmt = conn.prepareStatement(“UPDATE temperatures SET temp = ? WHERe sensor = ?”)
}
// 调用连接,执行sql
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
// 执行更新语句
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
// 如果update没有查到数据,那么执行插入语句
if( updateStmt.getUpdateCount == 0 ){
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}
// 关闭时做清理工作
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
package com. gu.apitest.sinktest
import com. gu.apitest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object RedisSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// source
val inputStream = env.readTextFile(“D:ProjectsBigDataFlinkTutorialsrcmainresourcessensor.txt”)
// transform
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
}
)
val conf = new FlinkJedisPoolConfig.Builder()
.setHost(“localhost”)
.setPort(6379)
.build()
// sink
dataStream.addSink( new RedisSink(conf, new MyRedisMapper()) )
env.execute(“redis sink test”)
}
}
class MyRedisMapper() extends RedisMapper[SensorReading]{
// 定义保存数据到redis的命令
override def getCommandDescription: RedisCommandDescription = {
// 把传感器id和温度值保存成哈希表 HSET key field value
new RedisCommandDescription( RedisCommand.HSET, “sensor_temperature” )
}
// 定义保存到redis的value
override def getValueFromData(t: SensorReading): String = t.temperature.toString
// 定义保存到redis的key
override def getKeyFromData(t: SensorReading): String = t.id
}
package com. gu.apitest.sinktest
import java.util.Properties
import com. gu.apitest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
object KafkaSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// source
// val inputStream = env.readTextFile(“D:ProjectsBigDataFlinkTutorialsrcmainresourcessensor.txt”)
val properties = new Properties()
properties.setProperty(“bootstrap.servers”, “localhost:9092”)
properties.setProperty(“group.id”, “consumer-group”)
properties.setProperty(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
properties.setProperty(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
properties.setProperty(“auto.offset.reset”, “latest”)
val inputStream = env.addSource(new FlinkKafkaConsumer011[String](“sensor”, new SimpleStringSchema(), properties))
// Transform *** 作
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString // 转成String方便序列化输出
}
)
// sink
dataStream.addSink( new FlinkKafkaProducer011[String]( “sinkTest”, new SimpleStringSchema(), properties) )
dataStream.print()
env.execute(“kafka sink test”)
}
}
package com. gu.apitest
import org.apache.flink.api.common.functions.{RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
object ProcessFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(60000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(100000)
env.getCheckpointConfig.setFailonCheckpointingErrors(false)
// env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(100)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETe_ON_CANCELLATION)
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(300), org.apache.flink.api.common.time.Time.seconds(10)))
// env.setStateBackend( new RocksDBStateBackend("") )
val stream = env.socketTextStream("localhost", 7777) val dataStream = stream.map(data => { val dataArray = data.split(",") SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading]( Time.seconds(1) ) { override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000 } ) val processedStream = dataStream.keyBy(_.id) .process( new TempIncrealert() ) val processedStream2 = dataStream.keyBy(_.id)
// .process( new TempChangealert(10.0) )
.flatMap( new TempChangealert(10.0) )
val processedStream3 = dataStream.keyBy(_.id) .flatMapWithState[(String, Double, Double), Double]{ // 如果没有状态的话,也就是没有数据来过,那么就将当前数据温度值存入状态 case ( input: SensorReading, None ) => ( List.empty, Some(input.temperature) ) // 如果有状态,就应该与上次的温度值比较差值,如果大于阈值就输出报警 case ( input: SensorReading, lastTemp: Some[Double] ) => val diff = ( input.temperature - lastTemp.get ).abs if( diff > 10.0 ){ ( List((input.id, lastTemp.get, input.temperature)), Some(input.temperature) ) } else ( List.empty, Some(input.temperature) ) } dataStream.print("input data") processedStream3.print("processed data") env.execute("process function test")
}
}
class TempIncrealert() extends KeyedProcessFunction[String, SensorReading, String]{
// 定义一个状态,用来保存上一个数据的温度值
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]) )
// 定义一个状态,用来保存定时器的时间戳
lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long](“currentTimer”, classOf[Long]) )
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
// 先取出上一个温度值
val preTemp = lastTemp.value()
// 更新温度值
lastTemp.update( value.temperature )
val curTimerTs = currentTimer.value() if( value.temperature < preTemp || preTemp == 0.0 ){ // 如果温度下降,或是第一条数据,删除定时器并清空状态 ctx.timerService().deleteProcessingTimeTimer( curTimerTs ) currentTimer.clear() } else if ( value.temperature > preTemp && curTimerTs == 0 ){ // 温度上升且没有设过定时器,则注册定时器 val timerTs = ctx.timerService().currentProcessingTime() + 5000L ctx.timerService().registerProcessingTimeTimer( timerTs ) currentTimer.update( timerTs ) }
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
// 输出报警信息
out.collect( ctx.getCurrentKey + " 温度连续上升" )
currentTimer.clear()
}
}
class TempChangealert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{
private var lastTempState: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
// 初始化的时候声明state变量
lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]))
}
override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
// 获取上次的温度值
val lastTemp = lastTempState.value()
// 用当前的温度值和上次的求差,如果大于阈值,输出报警信息
val diff = (value.temperature - lastTemp).abs
if(diff > threshold){
out.collect( (value.id, lastTemp, value.temperature) )
}
lastTempState.update(value.temperature)
}
}
class TempChangealert2(threshold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)]{
// 定义一个状态变量,保存上次的温度值
lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]) )
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = {
// 获取上次的温度值
val lastTemp = lastTempState.value()
// 用当前的温度值和上次的求差,如果大于阈值,输出报警信息
val diff = (value.temperature - lastTemp).abs
if(diff > threshold){
out.collect( (value.id, lastTemp, value.temperature) )
}
lastTempState.update(value.temperature)
}
}
package com. gu.apitest
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
object SideOutputTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.socketTextStream(“localhost”, 7777)
val dataStream = stream.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractorSensorReading {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000
} )
val processedStream = dataStream
.process( new Freezingalert() )
// dataStream.print(“input data”)
processedStream.print(“processed data”)
processedStream.getSideOutput( new OutputTag[String](“freezing alert”) ).print(“alert data”)
env.execute("side output test")
}
}
// 冰点报警,如果小于32F,输出报警信息到侧输出流
class Freezingalert() extends ProcessFunction[SensorReading, SensorReading]{
// lazy val alertOutput: OutputTag[String] = new OutputTag[String]( “freezing alert” )
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if( value.temperature < 32.0 ){
ctx.output( new OutputTag[String]( “freezing alert” ), "freezing alert for " + value.id )
}
out.collect( value )
}
}
package com. gu.apitest
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import scala.util.Random
// 定义传感器数据样例类
case class SensorReading( id: String, timestamp: Long, temperature: Double )
object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 1. 从集合中读取数据 val stream1 = env.fromCollection(List( SensorReading("sensor_1", 1547718199, 35.80018327300259), SensorReading("sensor_6", 1547718201, 15.402984393403084), SensorReading("sensor_7", 1547718202, 6.720945201171228), SensorReading("sensor_10", 1547718205, 38.101067604893444) ))
// env.fromElements(“flink”, 1, 32, 3213, 0.324).print(“test”)
// 2. 从文件中读取数据 val stream2 = env.readTextFile("D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt") // 3. 从kafka中读取数据 // 创建kafka相关的配置 val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties)) // 4. 自定义数据源 val stream4 = env.addSource(new SensorSource()) // sink输出 stream4.print("stream4") env.execute("source api test")
}
}
class SensorSource() extends SourceFunction[SensorReading]{
// 定义一个flag:表示数据源是否还在正常运行
var running: Boolean = true
override def cancel(): Unit = running = false
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
// 创建一个随机数发生器
val rand = new Random()
// 随机初始换生成10个传感器的温度数据,之后在它基础随机波动生成流数据 var curTemp = 1.to(10).map( i => ( "sensor_" + i, 60 + rand.nextGaussian() * 20 ) ) // 无限循环生成流数据,除非被cancel while(running){ // 更新温度值 curTemp = curTemp.map( t => (t._1, t._2 + rand.nextGaussian()) ) // 获取当前的时间戳 val curTime = System.currentTimeMillis() // 包装成SensorReading,输出 curTemp.foreach( t => ctx.collect( SensorReading(t._1, curTime, t._2) ) ) // 间隔100ms Thread.sleep(100) }
}
}
package com. gu.apitest
import org.apache.flink.api.common.functions.{FilterFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
object TransformTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 读入数据
val inputStream = env.readTextFile(“D:ProjectsBigDataFlinkTutorialsrcmainresourcessensor.txt”)
// Transform *** 作
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
}
)
// 1. 聚合 *** 作
val stream1 = dataStream
.keyBy(“id”)
// .sum(“temperature”)
.reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature + 10) )
// 2. 分流,根据温度是否大于30度划分
val splitStream = dataStream
.split( sensorData => {
if( sensorData.temperature > 30 ) Seq(“high”) else Seq(“low”)
} )
val highTempStream = splitStream.select(“high”)
val lowTempStream = splitStream.select(“low”)
val allTempStream = splitStream.select(“high”, “low”)
// 3. 合并两条流
val warningStream = highTempStream.map( sensorData => (sensorData.id, sensorData.temperature) )
val connectedStreams = warningStream.connect(lowTempStream)
val coMapStream = connectedStreams.map(
warningData => ( warningData._1, warningData._2, “high temperature warning” ),
lowData => ( lowData.id, “healthy” )
)
val unionStream = highTempStream.union(lowTempStream)
// 函数类
dataStream.filter( new MyFilter() ).print()
// 输出数据
// dataStream.print()
// highTempStream.print(“high”)
// lowTempStream.print(“low”)
// allTempStream.print(“all”)
// unionStream.print(“union”)
env.execute(“transform test job”)
}
}
class MyFilter() extends FilterFunction[SensorReading]{
override def filter(value: SensorReading): Boolean = {
value.id.startsWith(“sensor_1”)
}
}
class MyMapper() extends RichMapFunction[SensorReading, String]{
override def map(value: SensorReading): String = {
“flink”
}
override def open(parameters: Configuration): Unit = super.open(parameters)
}
package com. gu.apitest
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, KeyedProcessFunction}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
object WindowTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 设置事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(500)
// 读入数据
// val inputStream = env.readTextFile(“D:ProjectsBigDataFlinkTutorialsrcmainresourcessensor.txt”)
val inputStream = env.socketTextStream(“localhost”, 7777)
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
// .assignAscendingTimestamps(.timestamp * 1000L)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorSensorReading {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
// .assignTimestampsAndWatermarks( new MyAssigner() )
.map(data => (data.id, data.temperature))
.keyBy(._1)
// .process( new MyProcess() )
.timeWindow(Time.seconds(10), Time.seconds(3))
.reduce((result, data) => (data._1, result._2.min(data._2))) // 统计10秒内的最低温度值
dataStream.print()
env.execute(“window api test”)
}
}
class MyAssigner() extends AssignerWithPeriodicWatermarks[SensorReading]{
// 定义固定延迟为3秒
val bound: Long = 3 * 1000L
// 定义当前收到的最大的时间戳
var maxTs: Long = Long.MinValue
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
maxTs = maxTs.max(element.timestamp * 1000L)
element.timestamp * 1000L
}
}
class MyAssigner2() extends AssignerWithPunctuatedWatermarks[SensorReading]{
val bound: Long = 1000L
override def checkAndGetNextWatermark(lastElement: SensorReading, extractedTimestamp: Long): Watermark = {
if( lastElement.id == “sensor_1” ){
new Watermark(extractedTimestamp - bound)
}else{
null
}
}
override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
element.timestamp * 1000L
}
}
package com. gu.wc
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object StreamWordCount {
def main(args: Array[String]): Unit = {
val params = ParameterTool.fromArgs(args)
val host: String = params.get(“host”)
val port: Int = params.getInt(“port”)
// 创建一个流处理的执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.setParallelism(1)
// env.disableOperatorChaining()
// 接收socket数据流
val textDataStream = env.socketTextStream(host, port)
// 逐一读取数据,分词之后进行wordcount
val wordCountDataStream = textDataStream.flatMap(.split("s"))
.filter(.nonEmpty).startNewChain()
.map( (_, 1) )
.keyBy(0)
.sum(1)
// 打印输出
wordCountDataStream.print().setParallelism(1)
// 执行任务
env.execute(“stream word count job”)
}
}
package com. gu.wc
import org.apache.flink.api.scala._
// 批处理代码
object WordCount {
def main(args: Array[String]): Unit = {
// 创建一个批处理的执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 从文件中读取数据 val inputPath = "D:\Projects\BigData\FlinkTutorial\src\main\resources\hello.txt" val inputDataSet = env.readTextFile(inputPath) // 分词之后做count val wordCountDataSet = inputDataSet.flatMap(_.split(" ")) .map( (_, 1) ) .groupBy(0) .sum(1) // 打印输出 wordCountDataSet.print()
}
}
package streaming.sink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class SinkForRedisDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource text = env.socketTextStream(“192.168.167.254”, 8888, “n”);
//lpsuh l_words word
//对数据进行组装,把string转化为tuple2
DataStream
@Override
public Tuple2
return new Tuple2<>(“b”, value);
}
});
//创建redis的配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(“192.168.167.254”).setPort(6379).build();
//创建redissink RedisSink> redisSink = new RedisSink<>(conf, new MyRedisMapper()); l_wordsData.addSink(redisSink); env.execute("StreamingDemoToRedis");
}
public static class MyRedisMapper implements RedisMapper
//表示从接收的数据中获取需要 *** 作的redis key
@Override
public String getKeyFromData(Tuple2
return data.f0;
}
//表示从接收的数据中获取需要 *** 作的redis value
@Override
public String getValueFromData(Tuple2
return data.f1;
}
@Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.LPUSH); }
}
}
4.0.0
com. .flink Propom 1.0-SNAPSHOT ETL Report 1.9.0 2.11.8 org.apache.flink flink-java${flink.version} org.apache.flink flink-streaming-java_2.11${flink.version} org.apache.flink flink-scala_2.11${flink.version} org.apache.flink flink-streaming-scala_2.11${flink.version} org.apache.bahir flink-connector-redis_2.111.0 org.apache.flink flink-statebackend-rocksdb_2.11${flink.version} org.apache.flink flink-connector-kafka-0.11_2.11${flink.version} org.apache.kafka kafka-clients0.11.0.3 org.slf4j slf4j-api1.7.25 org.slf4j slf4j-log4j121.7.25 redis.clients jedis2.9.0 com.alibaba fastjson1.2.44 org.apache.flink flink-connector-elasticsearch6_2.11${flink.version} org.apache.maven.plugins maven-compiler-plugin3.1 1.8 /src/test
public class DataClean {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//我们是从Kafka里面读取数据,所以这儿就是topic有多少个partition,那么就设置几个并行度。
env.setParallelism(3);
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//注释,我们这儿其实需要设置state backed类型,我们要把checkpoint的数据存储到
//rocksdb里面//第一步:从Kafka里面读取数据 消费者 数据源需要kafka //topic的取名还是有讲究的,最好就是让人根据这个名字就能知道里面有什么数据。 //xxxx_xxx_xxx_xxx String topic="allData"; Properties consumerProperties = new Properties(); consumerProperties.put("bootstrap.servers","192.168.167.254:9092"); consumerProperties.put("group.id","allTopic_consumer"); FlinkKafkaConsumer011consumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), consumerProperties); //{"dt":"2019-11-24 19:54:23","countryCode":"PK","data":[{"type":"s4","score":0.8,"level":"C"},{"type":"s5","score":0.2,"level":"C"}]} DataStreamSource allData = env.addSource(consumer); //设置为广播变量 DataStream > mapData = env.addSource(new RedisSource()).broadcast(); SingleOutputStreamOperator etlData = allData.connect(mapData).flatMap(new CoFlatMapFunction , String>() { HashMap allMap = new HashMap (); //里面处理的是kafka的数据 @Override public void flatMap1(String line, Collector out) throws Exception { JSonObject jsonObject = JSONObject.parseObject(line); String dt = jsonObject.getString("dt"); String countryCode = jsonObject.getString("countryCode"); //可以根据countryCode获取大区的名字 String area = allMap.get(countryCode); JSonArray data = jsonObject.getJSonArray("data"); for (int i = 0; i < data.size(); i++) { JSonObject dataObject = data.getJSonObject(i); System.out.println("大区:"+area); dataObject.put("dt", dt); dataObject.put("area", area); //下游获取到数据的时候,也就是一个json格式的数据 out.collect(dataObject.toJSonString()); } } //里面处理的是redis里面的数据 @Override public void flatMap2(HashMap map, Collector collector) throws Exception { System.out.println(map.toString()); allMap = map; } }); //ETL -> load kafka etlData.print().setParallelism(1); // String outputTopic=“allDataClean”;
// Properties producerProperties = new Properties();
// producerProperties.put(“bootstrap.servers”,“192.168.167.254:9092”);
// FlinkKafkaProducer011 producer = new FlinkKafkaProducer011<>(outputTopic,
// new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),
// producerProperties);
//
// //搞一个Kafka的生产者
// etlData.addSink(producer);env.execute("DataClean"); }}
package com. .producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
public class kafkaProducer {public static void main(String[] args) throws Exception{
Properties prop = new Properties();
//指定kafka broker地址
prop.put(“bootstrap.servers”, “192.168.167.254:9092”);
//指定key value的序列化方式
prop.put(“key.serializer”, StringSerializer.class.getName());
prop.put(“value.serializer”, StringSerializer.class.getName());
//指定topic名称
String topic = “allData”;//创建producer链接 KafkaProducerproducer = new KafkaProducer (prop); //{"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]} while(true){ String message = "{"dt":""+getCurrentTime()+"","countryCode":""+getCountryCode()+"","data":[{"type":""+getRandomType()+"","score":"+getRandomScore()+","level":""+getRandomLevel()+""},{"type":""+getRandomType()+"","score":"+getRandomScore()+","level":""+getRandomLevel()+""}]}"; System.out.println(message); //同步的方式,往Kafka里面生产数据 producer.send(new ProducerRecord (topic,message)); Thread.sleep(2000); } //关闭链接 //producer.close(); }
public static String getCurrentTime(){
SimpleDateFormat sdf = new SimpleDateFormat(“YYYY-MM-dd HH:mm:ss”);
return sdf.format(new Date());
}public static String getCountryCode(){
String[] types = {“US”,“TW”,“HK”,“PK”,“KW”,“SA”,“IN”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}public static String getRandomType(){
String[] types = {“s1”,“s2”,“s3”,“s4”,“s5”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}public static double getRandomScore(){
double[] types = {0.3,0.2,0.1,0.5,0.8};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}public static String getRandomLevel(){
String[] types = {“A”,“A+”,“B”,“C”,“D”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}}
package com. .source;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;import java.util.HashMap;
import java.util.Map;
public class RedisSource implements SourceFunction> { private Logger logger=LoggerFactory.getLogger( RedisSource.class); private Jedis jedis; private boolean isRunning=true; @Override public void run(SourceContext> cxt) throws Exception { this.jedis = new Jedis("192.168.167.254",6379); HashMap map = new HashMap<>(); while(isRunning){ try{ map.clear(); Map areas = jedis.hgetAll("areas"); for(Map.Entry entry: areas.entrySet()){ String area = entry.getKey(); String value = entry.getValue(); String[] fields = value.split(","); for(String country:fields){ map.put(country,area); } } if(map.size() > 0 ){ cxt.collect(map); } Thread.sleep(60000); }catch (JedisConnectionException e){ logger.error("redis连接异常",e.getCause()); this.jedis = new Jedis("192.168.167.254",6379); }catch (Exception e){ logger.error("数据源异常",e.getCause()); } } } @Override public void cancel() { isRunning=false; if(jedis != null){ jedis.close(); } } }
Pro
com. .flink
1.0-SNAPSHOT
4.0.0Reportorg.apache.flink flink-javaorg.apache.flink flink-streaming-java_2.11org.apache.flink flink-scala_2.11org.apache.flink flink-streaming-scala_2.11org.apache.bahir flink-connector-redis_2.11org.apache.flink flink-statebackend-rocksdb_2.11org.apache.flink flink-connector-kafka-0.11_2.11org.apache.kafka kafka-clientsorg.slf4j slf4j-apiorg.slf4j slf4j-log4j12redis.clients jediscom.alibaba fastjsonpackage com. .core;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com. .function.MySumFuction;
import com. .watermark.MyWaterMark;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
public class DataReport {public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//env.setStateBackend(new RocksDBStateBackend(""));
//设置time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
String topic=“auditLog”;
Properties consumerProperties = new Properties();
consumerProperties.put(“bootstrap.servers”,“192.168.167.254:9092”);
consumerProperties.put(“group.id”,“auditLog_consumer”);
//读取Kafka里面对数据
//{“dt”:“2019-11-24 21:19:47”,“type”:“child_unshelf”,“username”:“shenhe1”,“area”:“AREA_ID”}
FlinkKafkaConsumer011 consumer =
new FlinkKafkaConsumer011(topic,new SimpleStringSchema(),consumerProperties);
DataStreamSource data = env.addSource(consumer);
Logger logger= LoggerFactory.getLogger(DataReport.class);
//对数据进行处理
SingleOutputStreamOperator> preData = data.map(new MapFunction >() {
@Override
public Tuple3map(String line) throws Exception {
JSonObject jsonObject = JSON.parseObject(line);
String dt = jsonObject.getString(“dt”);
String type = jsonObject.getString(“type”);
String area = jsonObject.getString(“area”);
long time = 0;
try {
SimpleDateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”);
time = sdf.parse(dt).getTime();
} catch (ParseException e) {
logger.error(“时间解析失败,dt:” + dt, e.getCause());
}
return Tuple3.of(time, type, area);
}
});SingleOutputStreamOperator> filterData = preData.filter(tuple3 -> tuple3.f0 != 0); OutputTag > outputTag= new OutputTag >("late-date"){}; SingleOutputStreamOperator > resultData = filterData.assignTimestampsAndWatermarks(new MyWaterMark()) .keyBy(1, 2) .window(TumblingEventTimeWindows.of(Time.seconds(30))) .sideOutputLateData(outputTag) .apply(new MySumFuction()); SingleOutputStreamOperator sideOutput = //java8 resultData.getSideOutput(outputTag).map(line -> line.toString()); // String outputTopic=“lateData”;
// Properties producerProperties = new Properties();
// producerProperties.put(“bootstrap.servers”,“192.168.167.254:9092”);
// FlinkKafkaProducer011 producer = new FlinkKafkaProducer011<>(outputTopic,
// new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),
// producerProperties);
// sideOutput.addSink(producer);resultData.print(); env.execute("DataReport"); }}
package com. .function;import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;
import java.util.Date;
public class MySumFuction implements WindowFunction,
Tuple4,Tuple,TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable> input,
Collector> out) {
//获取分组字段信息
String type = tuple.getField(0).toString();
String area = tuple.getField(1).toString();java.util.Iterator> iterator = input.iterator(); long count=0; while(iterator.hasNext()){ iterator.next(); count++; } SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String time = sdf.format(new Date(timeWindow.getEnd())); Tuple4 result = new Tuple4 (time, type, area, count); out.collect(result); } }
package com. .source;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
public class ProducerDataReport {public static void main(String[] args) throws Exception{
Properties prop = new Properties();
//指定kafka broker地址
prop.put(“bootstrap.servers”, “192.168.167.254:9092”);
//指定key value的序列化方式
prop.put(“key.serializer”, StringSerializer.class.getName());
prop.put(“value.serializer”, StringSerializer.class.getName());
//指定topic名称
String topic = “auditLog”;//创建producer链接
KafkaProducerproducer = new KafkaProducer (prop);
//{“dt”:“2018-01-01 10:11:22”,“type”:“shelf”,“username”:“shenhe1”,“area”:“AREA_US”}
//生产消息
while(true){
String message = “{“dt”:”"+getCurrentTime()+"",“type”:""+getRandomType()+"",“username”:""+getRandomUsername()+"",“area”:""+getRandomArea()+""}";
System.out.println(message);
producer.send(new ProducerRecord(topic,message));
Thread.sleep(500);
}
//关闭链接
//producer.close();
}
public static String getCurrentTime(){
SimpleDateFormat sdf = new SimpleDateFormat(“YYYY-MM-dd HH:mm:ss”);
return sdf.format(new Date());
}
public static String getRandomArea(){
String[] types = {“AREA_US”,“AREA_CT”,“AREA_AR”,“AREA_IN”,“AREA_ID”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}
public static String getRandomType(){
String[] types = {“shelf”,“unshelf”,“black”,“chlid_shelf”,“child_unshelf”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}
public static String getRandomUsername(){
String[] types = {“shenhe1”,“shenhe2”,“shenhe3”,“shenhe4”,“shenhe5”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}}
package com. .watermark;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;import javax.annotation.Nullable;
public class MyWaterMark
implements AssignerWithPeriodicWatermarks> {
long currentMaxTimestamp=0L;
final long maxOutputOfOrderness=20000L;//允许乱序时间。
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutputOfOrderness);
}
@Override
public long extractTimestamp(Tuple3
element, long l) {
Long timeStamp = element.f0;
currentMaxTimestamp=Math.max(timeStamp,currentMaxTimestamp);
return timeStamp;
}
}欢迎分享,转载请注明来源:内存溢出
评论列表(0条)