flinkScala

flinkScala,第1张

flinkScala

package com. gu.networkflow_analysis

import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

// 输入数据样例类
case class ApacheLogEvent( ip: String, userId: String, eventTime: Long, method: String, url: String)

// 窗口聚合结果样例类
case class UrlViewCount( url: String, windowEnd: Long, count: Long )

object NetworkFlow {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// val dataStream = env.readTextFile(“D:ProjectsBigDataUserBehaviorAnalysisNetworkFlowAnalysissrcmainresourcesapache.log”)
val dataStream = env.socketTextStream(“localhost”, 7777)
.map( data => {
val dataArray = data.split(" ")
// 定义时间转换
val simpleDateFormat = new SimpleDateFormat(“dd/MM/yyyy:HH:mm:ss”)
val timestamp = simpleDateFormat.parse(dataArray(3).trim).getTime
ApacheLogEvent( dataArray(0).trim, dataArray(1).trim, timestamp, dataArray(5).trim, dataArray(6).trim )
} )
.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractorApacheLogEvent {
override def extractTimestamp(element: ApacheLogEvent): Long = element.eventTime
} )
.keyBy(_.url)
.timeWindow(Time.minutes(10), Time.seconds(5))
.allowedLateness(Time.seconds(60))
.aggregate( new CountAgg(), new WindowResult() )

val processedStream = dataStream
  .keyBy(_.windowEnd)
  .process( new TopNHotUrls(5) )

dataStream.print("aggregate")
processedStream.print("process")

env.execute("network flow job")

}
}

// 自定义预聚合函数
class CountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long]{
override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator + 1

override def createAccumulator(): Long = 0L

override def getResult(accumulator: Long): Long = accumulator

override def merge(a: Long, b: Long): Long = a + b
}

// 自定义窗口处理函数
class WindowResult() extends WindowFunction[Long, UrlViewCount, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
out.collect( UrlViewCount( key, window.getEnd, input.iterator.next() ) )
}
}

// 自定义排序输出处理函数
class TopNHotUrls(topSize: Int) extends KeyedProcessFunction[Long, UrlViewCount, String]{
lazy val urlState: MapState[String, Long] = getRuntimeContext.getMapState( new MapStateDescriptor[String, Long](“url-state”, classOf[String], classOf[Long] ) )

override def processElement(value: UrlViewCount, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#Context, out: Collector[String]): Unit = {
urlState.put(value.url, value.count)
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
// 从状态中拿到数据
val allUrlViews: ListBuffer[(String, Long)] = new ListBuffer(String, Long)
val iter = urlState.entries().iterator()
while(iter.hasNext){
val entry = iter.next()
allUrlViews += (( entry.getKey, entry.getValue ))
}

// urlState.clear()

val sortedUrlViews = allUrlViews.sortWith(_._2 > _._2).take(topSize)

// 格式化结果输出
val result: StringBuilder = new StringBuilder()
result.append("时间:").append( new Timestamp( timestamp - 1 ) ).append("n")
for( i <- sortedUrlViews.indices ){
  val currentUrlView = sortedUrlViews(i)
  result.append("NO").append(i + 1).append(":")
    .append(" URL=").append(currentUrlView._1)
    .append(" 访问量=").append(currentUrlView._2).append("n")
}
result.append("=============================")
Thread.sleep(1000)
out.collect(result.toString())

}
}
package com. gu.networkflow_analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

// 定义输入数据的样例类
case class UserBehavior( userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long )

object PageView {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

// 用相对路径定义数据源
val resource = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile(resource.getPath)
  .map( data => {
    val dataArray = data.split(",")
    UserBehavior( dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong )
  } )
  .assignAscendingTimestamps(_.timestamp * 1000L)
  .filter( _.behavior == "pv" )    // 只统计pv *** 作
  .map( data => ("pv", 1) )
  .keyBy(_._1)
  .timeWindow(Time.hours(1))
  .sum(1)

dataStream.print("pv count")

env.execute("page view jpb")

}
}
package com. gu.networkflow_analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector


case class UvCount( windowEnd: Long, uvCount: Long )

object UniqueVisitor {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

// 用相对路径定义数据源
val resource = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile(resource.getPath)
  .map( data => {
    val dataArray = data.split(",")
    UserBehavior( dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong )
  } )
  .assignAscendingTimestamps(_.timestamp * 1000L)
  .filter( _.behavior == "pv" )    // 只统计pv *** 作
  .timeWindowAll( Time.hours(1) )
  .apply( new UvCountByWindow() )

dataStream.print()
env.execute("uv job")

}
}

class UvCountByWindow() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow]{
override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
// 定义一个scala set,用于保存所有的数据userId并去重
var idSet = SetLong
// 把当前窗口所有数据的ID收集到set中,最后输出set的大小
for( userBehavior <- input ){
idSet += userBehavior.userId
}
out.collect( UvCount( window.getEnd, idSet.size ) )
}
}
package com. gu.networkflow_analysis

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis


object UvWithBloom {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

// 用相对路径定义数据源
val resource = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile(resource.getPath)
.map(data => {
val dataArray = data.split(",")
UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
})
.assignAscendingTimestamps(.timestamp * 1000L)
.filter(
.behavior == “pv”) // 只统计pv *** 作
.map(data => (“dummyKey”, data.userId))
.keyBy(_._1)
.timeWindow(Time.hours(1))
.trigger(new MyTrigger())
.process(new UvCountWithBloom())

dataStream.print()

env.execute(“uv with bloom job”)
}
}

// 自定义窗口触发器
class MyTrigger() extends Trigger[(String, Long), TimeWindow] {
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}

override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
// 每来一条数据,就直接触发窗口 *** 作,并清空所有窗口状态
TriggerResult.FIRE_AND_PURGE
}
}

// 定义一个布隆过滤器
class Bloom(size: Long) extends Serializable {
// 位图的总大小,默认16M
private val cap = if (size > 0) size else 1 << 27

// 定义hash函数
def hash(value: String, seed: Int): Long = {
var result = 0L
for( i <- 0 until value.length ){
result = result * seed + value.charAt(i)
}
result & ( cap - 1 )
}
}

class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{
// 定义redis连接
lazy val jedis = new Jedis(“localhost”, 6379)
lazy val bloom = new Bloom(1<<29)

override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
// 位图的存储方式,key是windowEnd,value是bitmap
val storeKey = context.window.getEnd.toString
var count = 0L
// 把每个窗口的uv count值也存入名为count的redis表,存放内容为(windowEnd -> uvCount),所以要先从redis中读取
if( jedis.hget(“count”, storeKey) != null ){
count = jedis.hget(“count”, storeKey).toLong
}
// 用布隆过滤器判断当前用户是否已经存在
val userId = elements.last._2.toString
val offset = bloom.hash(userId, 61)
// 定义一个标识位,判断reids位图中有没有这一位
val isExist = jedis.getbit(storeKey, offset)
if(!isExist){
// 如果不存在,位图对应位置1,count + 1
jedis.setbit(storeKey, offset, true)
jedis.hset(“count”, storeKey, (count + 1).toString)
out.collect( UvCount(storeKey.toLong, count + 1) )
} else {
out.collect( UvCount(storeKey.toLong, count) )
}
}
}

package com. gu.marketanalysis

import java.sql.Timestamp

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector


// 输入的广告点击事件样例类
case class AdClickEvent( userId: Long, adId: Long, province: String, city: String, timestamp: Long )
// 按照省份统计的输出结果样例类
case class CountByProvince( windowEnd: String, province: String, count: Long )
// 输出的黑名单报警信息
case class BlackListWarning( userId: Long, adId: Long, msg: String )

object AdStatisticsByGeo {
// 定义侧输出流的tag
val blackListOutputTag: OutputTag[BlackListWarning] = new OutputTagBlackListWarning

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)

// 读取数据并转换成AdClickEvent
val resource = getClass.getResource("/AdClickLog.csv")
val adEventStream = env.readTextFile(resource.getPath)
  .map( data => {
    val dataArray = data.split(",")
    AdClickEvent( dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim, dataArray(3).trim, dataArray(4).trim.toLong )
  } )
  .assignAscendingTimestamps(_.timestamp * 1000L)

// 自定义process function,过滤大量刷点击的行为
val filterBlackListStream = adEventStream
  .keyBy( data => (data.userId, data.adId) )
  .process( new FilterBlackListUser(100) )

// 根据省份做分组,开窗聚合
val adCountStream = filterBlackListStream
  .keyBy(_.province)
  .timeWindow( Time.hours(1), Time.seconds(5) )
  .aggregate( new AdCountAgg(), new AdCountResult() )

adCountStream.print("count")
filterBlackListStream.getSideOutput(blackListOutputTag).print("blacklist")

env.execute("ad statistics job")

}

class FilterBlackListUser(maxCount: Int) extends KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]{
// 定义状态,保存当前用户对当前广告的点击量
lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long](“count-state”, classOf[Long]))
// 保存是否发送过黑名单的状态
lazy val isSentBlackList: ValueState[Boolean] = getRuntimeContext.getState( new ValueStateDescriptor[Boolean](“issent-state”, classOf[Boolean]) )
// 保存定时器触发的时间戳
lazy val resetTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long](“resettime-state”, classOf[Long]) )

override def processElement(value: AdClickEvent, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#Context, out: Collector[AdClickEvent]): Unit = {
  // 取出count状态
  val curCount = countState.value()

  // 如果是第一次处理,注册定时器,每天00:00触发
  if( curCount == 0 ){
    val ts = ( ctx.timerService().currentProcessingTime()/(1000*60*60*24) + 1) * (1000*60*60*24)
    resetTimer.update(ts)
    ctx.timerService().registerProcessingTimeTimer(ts)
  }

  // 判断计数是否达到上限,如果到达则加入黑名单
  if( curCount >= maxCount ){
    // 判断是否发送过黑名单,只发送一次
    if( !isSentBlackList.value() ){
      isSentBlackList.update(true)
      // 输出到侧输出流
      ctx.output( blackListOutputTag, BlackListWarning(value.userId, value.adId, "Click over " + maxCount + " times today.") )
    }
    return
  }
  // 计数状态加1,输出数据到主流
  countState.update( curCount + 1 )
  out.collect( value )
}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#OnTimerContext, out: Collector[AdClickEvent]): Unit = {
  // 定时器触发时,清空状态
  if( timestamp == resetTimer.value() ){
    isSentBlackList.clear()
    countState.clear()
    resetTimer.clear()
  }
}

}
}

// 自定义预聚合函数
class AdCountAgg() extends AggregateFunction[AdClickEvent, Long, Long]{
override def add(value: AdClickEvent, accumulator: Long): Long = accumulator + 1

override def createAccumulator(): Long = 0L

override def getResult(accumulator: Long): Long = accumulator

override def merge(a: Long, b: Long): Long = a + b
}

// 自定义窗口处理函数
class AdCountResult() extends WindowFunction[Long, CountByProvince, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[CountByProvince]): Unit = {
out.collect( CountByProvince( new Timestamp(window.getEnd).toString, key, input.iterator.next() ) )
}
}
package com. gu.marketanalysis

import java.sql.Timestamp

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector


object AppMarketing {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val dataStream = env.addSource( new SimulatedEventSource() )
.assignAscendingTimestamps(_.timestamp)
.filter( .behavior != “UNINSTALL” )
.map( data => {
( “dummyKey”, 1L )
} )
.keyBy(
._1) // 以渠道和行为类型作为key分组
.timeWindow( Time.hours(1), Time.seconds(10) )
.aggregate( new CountAgg(), new MarketingCountTotal() )

dataStream.print()
env.execute(“app marketing job”)
}
}

class CountAgg() extends AggregateFunction[(String, Long), Long, Long]{
override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1

override def createAccumulator(): Long = 0L

override def getResult(accumulator: Long): Long = accumulator

override def merge(a: Long, b: Long): Long = a + b
}

class MarketingCountTotal() extends WindowFunction[Long, MarketingViewCount, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[MarketingViewCount]): Unit = {
val startTs = new Timestamp(window.getStart).toString
val endTs = new Timestamp(window.getEnd).toString
val count = input.iterator.next()
out.collect( MarketingViewCount(startTs, endTs, “app marketing”, “total”, count) )
}
}
package com. gu.marketanalysis

import java.sql.Timestamp
import java.util.UUID
import java.util.concurrent.TimeUnit

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

// 输入数据样例类
case class MarketingUserBehavior( userId: String, behavior: String, channel: String, timestamp: Long )
// 输出结果样例类
case class MarketingViewCount( windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long )

object AppMarketingByChannel {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val dataStream = env.addSource( new SimulatedEventSource() )
  .assignAscendingTimestamps(_.timestamp)
  .filter( _.behavior != "UNINSTALL" )
  .map( data => {
    ( (data.channel, data.behavior), 1L )
  } )
  .keyBy(_._1)     // 以渠道和行为类型作为key分组
  .timeWindow( Time.hours(1), Time.seconds(10) )
  .process( new MarketingCountByChannel() )

dataStream.print()
env.execute("app marketing by channel job")

}
}

// 自定义数据源
class SimulatedEventSource() extends RichSourceFunction[MarketingUserBehavior]{
// 定义是否运行的标识位
var running = true
// 定义用户行为的集合
val behaviorTypes: Seq[String] = Seq(“CLICK”, “DOWNLOAD”, “INSTALL”, “UNINSTALL”)
// 定义渠道的集合
val channelSets: Seq[String] = Seq(“wechat”, “weibo”, “appstore”, “huaweistore”)
// 定义一个随机数发生器
val rand: Random = new Random()

override def cancel(): Unit = running = false

override def run(ctx: SourceFunction.SourceContext[MarketingUserBehavior]): Unit = {
// 定义一个生成数据的上限
val maxElements = Long.MaxValue
var count = 0L

// 随机生成所有数据
while( running && count < maxElements ){
  val id = UUID.randomUUID().toString
  val behavior = behaviorTypes(rand.nextInt(behaviorTypes.size))
  val channel = channelSets(rand.nextInt(channelSets.size))
  val ts = System.currentTimeMillis()

  ctx.collect( MarketingUserBehavior( id, behavior, channel, ts ) )

  count += 1
  TimeUnit.MILLISECONDS.sleep(10L)
}

}
}

// 自定义处理函数
class MarketingCountByChannel() extends ProcessWindowFunction[((String, String), Long), MarketingViewCount, (String, String), TimeWindow]{
override def process(key: (String, String), context: Context, elements: Iterable[((String, String), Long)], out: Collector[MarketingViewCount]): Unit = {
val startTs = new Timestamp(context.window.getStart).toString
val endTs = new Timestamp(context.window.getEnd).toString
val channel = key._1
val behavior = key._2
val count = elements.size
out.collect( MarketingViewCount(startTs, endTs, channel, behavior, count) )
}
}

package com. gu.apitest.sinktest

import java.util

import com. gu.apitest.SensorReading
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests


object EsSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// source
val inputStream = env.readTextFile(“D:ProjectsBigDataFlinkTutorialsrcmainresourcessensor.txt”)

// transform
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
}
)

val httpHosts = new util.ArrayListHttpHost
httpHosts.add(new HttpHost(“localhost”, 9200))

// 创建一个esSink 的builder
val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
httpHosts,
new ElasticsearchSinkFunction[SensorReading] {
override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
println("saving data: " + element)
// 包装成一个Map或者JsonObject
val json = new util.HashMapString, String
json.put(“sensor_id”, element.id)
json.put(“temperature”, element.temperature.toString)
json.put(“ts”, element.timestamp.toString)

  // 创建index request,准备发送数据
  val indexRequest = Requests.indexRequest()
    .index("sensor")
    .`type`("readingdata")
    .source(json)

  // 利用index发送请求,写入数据
  indexer.add(indexRequest)
  println("data saved.")
}

}
)

// sink
dataStream.addSink( esSinkBuilder.build() )

env.execute(“es sink test”)
}
}
package com. gu.apitest.sinktest

import java.sql.{Connection, DriverManager, PreparedStatement}

import com. gu.apitest.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._


object JdbcSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// source
val inputStream = env.readTextFile(“D:ProjectsBigDataFlinkTutorialsrcmainresourcessensor.txt”)

// transform
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)

// sink
dataStream.addSink( new MyJdbcSink() )

env.execute(“jdbc sink test”)
}
}

class MyJdbcSink() extends RichSinkFunction[SensorReading]{
// 定义sql连接、预编译器
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _

// 初始化,创建连接和预编译语句
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection(“jdbc:mysql://localhost:3306/test”, “root”, “123456”)
insertStmt = conn.prepareStatement(“INSERT INTO temperatures (sensor, temp) VALUES (?,?)”)
updateStmt = conn.prepareStatement(“UPDATE temperatures SET temp = ? WHERe sensor = ?”)
}

// 调用连接,执行sql
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
// 执行更新语句
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
// 如果update没有查到数据,那么执行插入语句
if( updateStmt.getUpdateCount == 0 ){
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}

// 关闭时做清理工作
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
package com. gu.apitest.sinktest

import com. gu.apitest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}


object RedisSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// source
val inputStream = env.readTextFile(“D:ProjectsBigDataFlinkTutorialsrcmainresourcessensor.txt”)

// transform
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
}
)

val conf = new FlinkJedisPoolConfig.Builder()
.setHost(“localhost”)
.setPort(6379)
.build()

// sink
dataStream.addSink( new RedisSink(conf, new MyRedisMapper()) )

env.execute(“redis sink test”)
}
}

class MyRedisMapper() extends RedisMapper[SensorReading]{

// 定义保存数据到redis的命令
override def getCommandDescription: RedisCommandDescription = {
// 把传感器id和温度值保存成哈希表 HSET key field value
new RedisCommandDescription( RedisCommand.HSET, “sensor_temperature” )
}

// 定义保存到redis的value
override def getValueFromData(t: SensorReading): String = t.temperature.toString

// 定义保存到redis的key
override def getKeyFromData(t: SensorReading): String = t.id
}
package com. gu.apitest.sinktest

import java.util.Properties

import com. gu.apitest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}


object KafkaSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// source
// val inputStream = env.readTextFile(“D:ProjectsBigDataFlinkTutorialsrcmainresourcessensor.txt”)
val properties = new Properties()
properties.setProperty(“bootstrap.servers”, “localhost:9092”)
properties.setProperty(“group.id”, “consumer-group”)
properties.setProperty(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
properties.setProperty(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
properties.setProperty(“auto.offset.reset”, “latest”)

val inputStream = env.addSource(new FlinkKafkaConsumer011[String](“sensor”, new SimpleStringSchema(), properties))

// Transform *** 作

val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString // 转成String方便序列化输出
}
)

// sink
dataStream.addSink( new FlinkKafkaProducer011[String]( “sinkTest”, new SimpleStringSchema(), properties) )
dataStream.print()

env.execute(“kafka sink test”)
}
}
package com. gu.apitest

import org.apache.flink.api.common.functions.{RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector


object ProcessFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.enableCheckpointing(60000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(100000)
env.getCheckpointConfig.setFailonCheckpointingErrors(false)
// env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(100)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETe_ON_CANCELLATION)

env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(300), org.apache.flink.api.common.time.Time.seconds(10)))

// env.setStateBackend( new RocksDBStateBackend("") )

val stream = env.socketTextStream("localhost", 7777)

val dataStream = stream.map(data => {
  val dataArray = data.split(",")
  SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
  .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading]( Time.seconds(1) ) {
  override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000
} )

val processedStream = dataStream.keyBy(_.id)
  .process( new TempIncrealert() )

val processedStream2 = dataStream.keyBy(_.id)

// .process( new TempChangealert(10.0) )
.flatMap( new TempChangealert(10.0) )

val processedStream3 = dataStream.keyBy(_.id)
  .flatMapWithState[(String, Double, Double), Double]{
  // 如果没有状态的话,也就是没有数据来过,那么就将当前数据温度值存入状态
  case ( input: SensorReading, None ) => ( List.empty, Some(input.temperature) )
  // 如果有状态,就应该与上次的温度值比较差值,如果大于阈值就输出报警
  case ( input: SensorReading, lastTemp: Some[Double] ) =>
    val diff = ( input.temperature - lastTemp.get ).abs
    if( diff > 10.0 ){
      ( List((input.id, lastTemp.get, input.temperature)), Some(input.temperature) )
    } else
      ( List.empty, Some(input.temperature) )
}

dataStream.print("input data")
processedStream3.print("processed data")

env.execute("process function test")

}
}

class TempIncrealert() extends KeyedProcessFunction[String, SensorReading, String]{

// 定义一个状态,用来保存上一个数据的温度值
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]) )
// 定义一个状态,用来保存定时器的时间戳
lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long](“currentTimer”, classOf[Long]) )

override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
// 先取出上一个温度值
val preTemp = lastTemp.value()
// 更新温度值
lastTemp.update( value.temperature )

val curTimerTs = currentTimer.value()


if( value.temperature < preTemp || preTemp == 0.0 ){
  // 如果温度下降,或是第一条数据,删除定时器并清空状态
  ctx.timerService().deleteProcessingTimeTimer( curTimerTs )
  currentTimer.clear()
} else if ( value.temperature > preTemp && curTimerTs == 0 ){
  // 温度上升且没有设过定时器,则注册定时器
  val timerTs = ctx.timerService().currentProcessingTime() + 5000L
  ctx.timerService().registerProcessingTimeTimer( timerTs )
  currentTimer.update( timerTs )
}

}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
// 输出报警信息
out.collect( ctx.getCurrentKey + " 温度连续上升" )
currentTimer.clear()
}
}

class TempChangealert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{

private var lastTempState: ValueState[Double] = _

override def open(parameters: Configuration): Unit = {
// 初始化的时候声明state变量
lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]))
}

override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
// 获取上次的温度值
val lastTemp = lastTempState.value()
// 用当前的温度值和上次的求差,如果大于阈值,输出报警信息
val diff = (value.temperature - lastTemp).abs
if(diff > threshold){
out.collect( (value.id, lastTemp, value.temperature) )
}
lastTempState.update(value.temperature)
}

}

class TempChangealert2(threshold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)]{
// 定义一个状态变量,保存上次的温度值
lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]) )

override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = {
// 获取上次的温度值
val lastTemp = lastTempState.value()
// 用当前的温度值和上次的求差,如果大于阈值,输出报警信息
val diff = (value.temperature - lastTemp).abs
if(diff > threshold){
out.collect( (value.id, lastTemp, value.temperature) )
}
lastTempState.update(value.temperature)
}
}
package com. gu.apitest

import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector


object SideOutputTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream = env.socketTextStream(“localhost”, 7777)

val dataStream = stream.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractorSensorReading {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000
} )

val processedStream = dataStream
.process( new Freezingalert() )

// dataStream.print(“input data”)
processedStream.print(“processed data”)
processedStream.getSideOutput( new OutputTag[String](“freezing alert”) ).print(“alert data”)

env.execute("side output test")

}
}

// 冰点报警,如果小于32F,输出报警信息到侧输出流
class Freezingalert() extends ProcessFunction[SensorReading, SensorReading]{

// lazy val alertOutput: OutputTag[String] = new OutputTag[String]( “freezing alert” )

override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if( value.temperature < 32.0 ){
ctx.output( new OutputTag[String]( “freezing alert” ), "freezing alert for " + value.id )
}
out.collect( value )
}
}
package com. gu.apitest

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import scala.util.Random

// 定义传感器数据样例类
case class SensorReading( id: String, timestamp: Long, temperature: Double )

object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// 1. 从集合中读取数据
val stream1 = env.fromCollection(List(
  SensorReading("sensor_1", 1547718199, 35.80018327300259),
  SensorReading("sensor_6", 1547718201, 15.402984393403084),
  SensorReading("sensor_7", 1547718202, 6.720945201171228),
  SensorReading("sensor_10", 1547718205, 38.101067604893444)
))

// env.fromElements(“flink”, 1, 32, 3213, 0.324).print(“test”)

// 2. 从文件中读取数据
val stream2 = env.readTextFile("D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt")

// 3. 从kafka中读取数据
// 创建kafka相关的配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")

val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))

// 4. 自定义数据源
val stream4 = env.addSource(new SensorSource())

// sink输出
stream4.print("stream4")

env.execute("source api test")

}
}

class SensorSource() extends SourceFunction[SensorReading]{
// 定义一个flag:表示数据源是否还在正常运行
var running: Boolean = true
override def cancel(): Unit = running = false

override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
// 创建一个随机数发生器
val rand = new Random()

// 随机初始换生成10个传感器的温度数据,之后在它基础随机波动生成流数据
var curTemp = 1.to(10).map(
  i => ( "sensor_" + i, 60 + rand.nextGaussian() * 20 )
)

// 无限循环生成流数据,除非被cancel
while(running){
  // 更新温度值
  curTemp = curTemp.map(
    t => (t._1, t._2 + rand.nextGaussian())
  )
  // 获取当前的时间戳
  val curTime = System.currentTimeMillis()
  // 包装成SensorReading,输出
  curTemp.foreach(
    t => ctx.collect( SensorReading(t._1, curTime, t._2) )
  )
  // 间隔100ms
  Thread.sleep(100)
}

}
}
package com. gu.apitest

import org.apache.flink.api.common.functions.{FilterFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._


object TransformTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

// 读入数据
val inputStream = env.readTextFile(“D:ProjectsBigDataFlinkTutorialsrcmainresourcessensor.txt”)

// Transform *** 作

val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
}
)

// 1. 聚合 *** 作
val stream1 = dataStream
.keyBy(“id”)
// .sum(“temperature”)
.reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature + 10) )

// 2. 分流,根据温度是否大于30度划分
val splitStream = dataStream
.split( sensorData => {
if( sensorData.temperature > 30 ) Seq(“high”) else Seq(“low”)
} )

val highTempStream = splitStream.select(“high”)
val lowTempStream = splitStream.select(“low”)
val allTempStream = splitStream.select(“high”, “low”)

// 3. 合并两条流
val warningStream = highTempStream.map( sensorData => (sensorData.id, sensorData.temperature) )
val connectedStreams = warningStream.connect(lowTempStream)

val coMapStream = connectedStreams.map(
warningData => ( warningData._1, warningData._2, “high temperature warning” ),
lowData => ( lowData.id, “healthy” )
)

val unionStream = highTempStream.union(lowTempStream)

// 函数类
dataStream.filter( new MyFilter() ).print()

// 输出数据
// dataStream.print()
// highTempStream.print(“high”)
// lowTempStream.print(“low”)
// allTempStream.print(“all”)
// unionStream.print(“union”)

env.execute(“transform test job”)
}
}

class MyFilter() extends FilterFunction[SensorReading]{
override def filter(value: SensorReading): Boolean = {
value.id.startsWith(“sensor_1”)
}
}

class MyMapper() extends RichMapFunction[SensorReading, String]{
override def map(value: SensorReading): String = {
“flink”
}

override def open(parameters: Configuration): Unit = super.open(parameters)
}
package com. gu.apitest

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, KeyedProcessFunction}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector


object WindowTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 设置事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(500)

// 读入数据
// val inputStream = env.readTextFile(“D:ProjectsBigDataFlinkTutorialsrcmainresourcessensor.txt”)

val inputStream = env.socketTextStream(“localhost”, 7777)

val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
// .assignAscendingTimestamps(.timestamp * 1000L)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorSensorReading {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
// .assignTimestampsAndWatermarks( new MyAssigner() )
.map(data => (data.id, data.temperature))
.keyBy(
._1)
// .process( new MyProcess() )
.timeWindow(Time.seconds(10), Time.seconds(3))
.reduce((result, data) => (data._1, result._2.min(data._2))) // 统计10秒内的最低温度值

dataStream.print()

env.execute(“window api test”)
}
}

class MyAssigner() extends AssignerWithPeriodicWatermarks[SensorReading]{
// 定义固定延迟为3秒
val bound: Long = 3 * 1000L
// 定义当前收到的最大的时间戳
var maxTs: Long = Long.MinValue

override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}

override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
maxTs = maxTs.max(element.timestamp * 1000L)
element.timestamp * 1000L
}
}

class MyAssigner2() extends AssignerWithPunctuatedWatermarks[SensorReading]{
val bound: Long = 1000L

override def checkAndGetNextWatermark(lastElement: SensorReading, extractedTimestamp: Long): Watermark = {
if( lastElement.id == “sensor_1” ){
new Watermark(extractedTimestamp - bound)
}else{
null
}
}

override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
element.timestamp * 1000L
}
}
package com. gu.wc

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._


object StreamWordCount {
def main(args: Array[String]): Unit = {

val params = ParameterTool.fromArgs(args)
val host: String = params.get(“host”)
val port: Int = params.getInt(“port”)

// 创建一个流处理的执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.setParallelism(1)
// env.disableOperatorChaining()

// 接收socket数据流
val textDataStream = env.socketTextStream(host, port)

// 逐一读取数据,分词之后进行wordcount
val wordCountDataStream = textDataStream.flatMap(.split("s"))
.filter(
.nonEmpty).startNewChain()
.map( (_, 1) )
.keyBy(0)
.sum(1)

// 打印输出
wordCountDataStream.print().setParallelism(1)

// 执行任务
env.execute(“stream word count job”)
}
}
package com. gu.wc

import org.apache.flink.api.scala._

// 批处理代码
object WordCount {
def main(args: Array[String]): Unit = {
// 创建一个批处理的执行环境
val env = ExecutionEnvironment.getExecutionEnvironment

// 从文件中读取数据
val inputPath = "D:\Projects\BigData\FlinkTutorial\src\main\resources\hello.txt"
val inputDataSet = env.readTextFile(inputPath)

// 分词之后做count
val wordCountDataSet = inputDataSet.flatMap(_.split(" "))
  .map( (_, 1) )
  .groupBy(0)
  .sum(1)

// 打印输出
wordCountDataSet.print()

}
}
package streaming.sink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;


public class SinkForRedisDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource text = env.socketTextStream(“192.168.167.254”, 8888, “n”);
//lpsuh l_words word
//对数据进行组装,把string转化为tuple2
DataStream> l_wordsData = text.map(new MapFunction>() {
@Override
public Tuple2 map(String value) throws Exception {
return new Tuple2<>(“b”, value);
}
});
//创建redis的配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(“192.168.167.254”).setPort(6379).build();

 //创建redissink
 RedisSink> redisSink = new RedisSink<>(conf, new MyRedisMapper());
 l_wordsData.addSink(redisSink);
 env.execute("StreamingDemoToRedis");

}

public static class MyRedisMapper implements RedisMapper> {
//表示从接收的数据中获取需要 *** 作的redis key
@Override
public String getKeyFromData(Tuple2 data) {
return data.f0;
}
//表示从接收的数据中获取需要 *** 作的redis value
@Override
public String getValueFromData(Tuple2 data) {
return data.f1;
}

 @Override
 public RedisCommandDescription getCommandDescription() {
     return new RedisCommandDescription(RedisCommand.LPUSH);
 }

}
}


4.0.0

com.    .flink
    Pro
pom
1.0-SNAPSHOT

    ETL
    Report




    1.9.0
    2.11.8




    
        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_2.11
            ${flink.version}
        
        
            org.apache.flink
            flink-scala_2.11
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-scala_2.11
            ${flink.version}
        

        
            org.apache.bahir
            flink-connector-redis_2.11
            1.0
        

        
            org.apache.flink
            flink-statebackend-rocksdb_2.11
            ${flink.version}
        

        
            org.apache.flink
            flink-connector-kafka-0.11_2.11
            ${flink.version}
        

        
            org.apache.kafka
            kafka-clients
            0.11.0.3
        
        
        
            org.slf4j
            slf4j-api
            1.7.25
        

        
            org.slf4j
            slf4j-log4j12
            1.7.25
        
        
        
            redis.clients
            jedis
            2.9.0
        
        
        
            com.alibaba
            fastjson
            1.2.44
        

        
        
            org.apache.flink
            flink-connector-elasticsearch6_2.11
            ${flink.version}
        

    




    
        
            org.apache.maven.plugins
            maven-compiler-plugin
            3.1
            
                1.8
                1.8
                
                    /src/test
public class DataClean {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//我们是从Kafka里面读取数据,所以这儿就是topic有多少个partition,那么就设置几个并行度。
env.setParallelism(3);
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//注释,我们这儿其实需要设置state backed类型,我们要把checkpoint的数据存储到
//rocksdb里面

 //第一步:从Kafka里面读取数据 消费者 数据源需要kafka
 //topic的取名还是有讲究的,最好就是让人根据这个名字就能知道里面有什么数据。
 //xxxx_xxx_xxx_xxx
 String topic="allData";
 Properties consumerProperties = new Properties();
 consumerProperties.put("bootstrap.servers","192.168.167.254:9092");
 consumerProperties.put("group.id","allTopic_consumer");

 
 FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011<>(topic,
         new SimpleStringSchema(),
         consumerProperties);
 //{"dt":"2019-11-24 19:54:23","countryCode":"PK","data":[{"type":"s4","score":0.8,"level":"C"},{"type":"s5","score":0.2,"level":"C"}]}
 DataStreamSource allData = env.addSource(consumer);
 //设置为广播变量
 DataStream> mapData = env.addSource(new     RedisSource()).broadcast();
 SingleOutputStreamOperator etlData = allData.connect(mapData).flatMap(new CoFlatMapFunction, String>() {
     HashMap allMap = new HashMap();

     //里面处理的是kafka的数据
     @Override
     public void flatMap1(String line, Collector out) throws Exception {
         JSonObject jsonObject = JSONObject.parseObject(line);
         String dt = jsonObject.getString("dt");
         String countryCode = jsonObject.getString("countryCode");
         //可以根据countryCode获取大区的名字
         String area = allMap.get(countryCode);
         JSonArray data = jsonObject.getJSonArray("data");
         for (int i = 0; i < data.size(); i++) {
             JSonObject dataObject = data.getJSonObject(i);
             System.out.println("大区:"+area);
             dataObject.put("dt", dt);
             dataObject.put("area", area);
             //下游获取到数据的时候,也就是一个json格式的数据
             out.collect(dataObject.toJSonString());
         }


     }

     //里面处理的是redis里面的数据
     @Override
     public void flatMap2(HashMap map,
                          Collector collector) throws Exception {
         System.out.println(map.toString());
         allMap = map;

     }
 });

 //ETL -> load kafka


 etlData.print().setParallelism(1);

 

// String outputTopic=“allDataClean”;
// Properties producerProperties = new Properties();
// producerProperties.put(“bootstrap.servers”,“192.168.167.254:9092”);
// FlinkKafkaProducer011 producer = new FlinkKafkaProducer011<>(outputTopic,
// new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),
// producerProperties);
//
// //搞一个Kafka的生产者
// etlData.addSink(producer);

    env.execute("DataClean");


}

}
package com. .producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;


public class kafkaProducer {

public static void main(String[] args) throws Exception{
Properties prop = new Properties();
//指定kafka broker地址
prop.put(“bootstrap.servers”, “192.168.167.254:9092”);
//指定key value的序列化方式
prop.put(“key.serializer”, StringSerializer.class.getName());
prop.put(“value.serializer”, StringSerializer.class.getName());
//指定topic名称
String topic = “allData”;

 //创建producer链接
 KafkaProducer producer = new KafkaProducer(prop);

 //{"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]}


 while(true){
     String message = "{"dt":""+getCurrentTime()+"","countryCode":""+getCountryCode()+"","data":[{"type":""+getRandomType()+"","score":"+getRandomScore()+","level":""+getRandomLevel()+""},{"type":""+getRandomType()+"","score":"+getRandomScore()+","level":""+getRandomLevel()+""}]}";
     System.out.println(message);
     //同步的方式,往Kafka里面生产数据
    producer.send(new ProducerRecord(topic,message));
     Thread.sleep(2000);
 }
 //关闭链接
 //producer.close();

}

public static String getCurrentTime(){
SimpleDateFormat sdf = new SimpleDateFormat(“YYYY-MM-dd HH:mm:ss”);
return sdf.format(new Date());
}

public static String getCountryCode(){
String[] types = {“US”,“TW”,“HK”,“PK”,“KW”,“SA”,“IN”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}

public static String getRandomType(){
String[] types = {“s1”,“s2”,“s3”,“s4”,“s5”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}

public static double getRandomScore(){
double[] types = {0.3,0.2,0.1,0.5,0.8};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}

public static String getRandomLevel(){
String[] types = {“A”,“A+”,“B”,“C”,“D”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}

}
package com. .source;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.util.HashMap;
import java.util.Map;


public class RedisSource implements SourceFunction> {

private Logger logger=LoggerFactory.getLogger(    RedisSource.class);

private Jedis jedis;
private boolean isRunning=true;

@Override
public void run(SourceContext> cxt) throws Exception {
    this.jedis = new Jedis("192.168.167.254",6379);
    HashMap map = new HashMap<>();
    while(isRunning){
      try{
          map.clear();
          Map areas = jedis.hgetAll("areas");
          for(Map.Entry entry: areas.entrySet()){
              String area = entry.getKey();
              String value = entry.getValue();
              String[] fields = value.split(",");
              for(String country:fields){
                  map.put(country,area);
              }

          }
          if(map.size() > 0 ){
              cxt.collect(map);
          }
          Thread.sleep(60000);
      }catch (JedisConnectionException e){
          logger.error("redis连接异常",e.getCause());
          this.jedis = new Jedis("192.168.167.254",6379);
      }catch (Exception e){
          logger.error("数据源异常",e.getCause());
      }

    }

}

@Override
public void cancel() {
    isRunning=false;
    if(jedis != null){
        jedis.close();
    }

}

}



Pro
com. .flink
1.0-SNAPSHOT

4.0.0

Report

    
        org.apache.flink
        flink-java
    
    
        org.apache.flink
        flink-streaming-java_2.11
    
    
        org.apache.flink
        flink-scala_2.11
    
    
        org.apache.flink
        flink-streaming-scala_2.11
    

    
        org.apache.bahir
        flink-connector-redis_2.11
    

    
        org.apache.flink
        flink-statebackend-rocksdb_2.11
    

    
        org.apache.flink
        flink-connector-kafka-0.11_2.11
    

    
        org.apache.kafka
        kafka-clients
    
    
    
        org.slf4j
        slf4j-api
    

    
        org.slf4j
        slf4j-log4j12
    
    
    
        redis.clients
        jedis
    
    
    
        com.alibaba
        fastjson
    


package com. .core;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com. .function.MySumFuction;
import com. .watermark.MyWaterMark;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;


public class DataReport {

public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//env.setStateBackend(new RocksDBStateBackend(""));
//设置time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
String topic=“auditLog”;
Properties consumerProperties = new Properties();
consumerProperties.put(“bootstrap.servers”,“192.168.167.254:9092”);
consumerProperties.put(“group.id”,“auditLog_consumer”);
//读取Kafka里面对数据
//{“dt”:“2019-11-24 21:19:47”,“type”:“child_unshelf”,“username”:“shenhe1”,“area”:“AREA_ID”}
FlinkKafkaConsumer011 consumer =
new FlinkKafkaConsumer011(topic,new SimpleStringSchema(),consumerProperties);
DataStreamSource data = env.addSource(consumer);
Logger logger= LoggerFactory.getLogger(DataReport.class);
//对数据进行处理
SingleOutputStreamOperator> preData = data.map(new MapFunction>() {

@Override
public Tuple3 map(String line) throws Exception {
JSonObject jsonObject = JSON.parseObject(line);
String dt = jsonObject.getString(“dt”);
String type = jsonObject.getString(“type”);
String area = jsonObject.getString(“area”);
long time = 0;
try {
SimpleDateFormat sdf = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”);
time = sdf.parse(dt).getTime();
} catch (ParseException e) {
logger.error(“时间解析失败,dt:” + dt, e.getCause());
}
return Tuple3.of(time, type, area);
}
});

 
 SingleOutputStreamOperator> filterData = preData.filter(tuple3 -> tuple3.f0 != 0);


 
 OutputTag> outputTag=
         new OutputTag>("late-date"){};
 

 SingleOutputStreamOperator> resultData = filterData.assignTimestampsAndWatermarks(new MyWaterMark())
         .keyBy(1, 2)
         .window(TumblingEventTimeWindows.of(Time.seconds(30)))
         .sideOutputLateData(outputTag)
         .apply(new MySumFuction());


 

 SingleOutputStreamOperator sideOutput =
         //java8
         resultData.getSideOutput(outputTag).map(line -> line.toString());

// String outputTopic=“lateData”;
// Properties producerProperties = new Properties();
// producerProperties.put(“bootstrap.servers”,“192.168.167.254:9092”);
// FlinkKafkaProducer011 producer = new FlinkKafkaProducer011<>(outputTopic,
// new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),
// producerProperties);
// sideOutput.addSink(producer);

    

    resultData.print();


    env.execute("DataReport");

}

}
package com. .function;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Date;


public class MySumFuction implements WindowFunction,
Tuple4,Tuple,TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable> input,
Collector> out) {
//获取分组字段信息
String type = tuple.getField(0).toString();
String area = tuple.getField(1).toString();

    java.util.Iterator> iterator = input.iterator();
    long count=0;
    while(iterator.hasNext()){
        iterator.next();
        count++;
    }
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String time = sdf.format(new Date(timeWindow.getEnd()));


    Tuple4 result =
            new Tuple4(time, type, area, count);
    out.collect(result);
}

}
package com. .source;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;


public class ProducerDataReport {

public static void main(String[] args) throws Exception{
Properties prop = new Properties();
//指定kafka broker地址
prop.put(“bootstrap.servers”, “192.168.167.254:9092”);
//指定key value的序列化方式
prop.put(“key.serializer”, StringSerializer.class.getName());
prop.put(“value.serializer”, StringSerializer.class.getName());
//指定topic名称
String topic = “auditLog”;

//创建producer链接
KafkaProducer producer = new KafkaProducer(prop);
//{“dt”:“2018-01-01 10:11:22”,“type”:“shelf”,“username”:“shenhe1”,“area”:“AREA_US”}
//生产消息
while(true){
String message = “{“dt”:”"+getCurrentTime()+"",“type”:""+getRandomType()+"",“username”:""+getRandomUsername()+"",“area”:""+getRandomArea()+""}";
System.out.println(message);
producer.send(new ProducerRecord(topic,message));
Thread.sleep(500);
}
//关闭链接
//producer.close();
}
public static String getCurrentTime(){
SimpleDateFormat sdf = new SimpleDateFormat(“YYYY-MM-dd HH:mm:ss”);
return sdf.format(new Date());
}
public static String getRandomArea(){
String[] types = {“AREA_US”,“AREA_CT”,“AREA_AR”,“AREA_IN”,“AREA_ID”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}
public static String getRandomType(){
String[] types = {“shelf”,“unshelf”,“black”,“chlid_shelf”,“child_unshelf”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}
public static String getRandomUsername(){
String[] types = {“shenhe1”,“shenhe2”,“shenhe3”,“shenhe4”,“shenhe5”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}

}
package com. .watermark;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

import javax.annotation.Nullable;


public class MyWaterMark
implements AssignerWithPeriodicWatermarks> {
long currentMaxTimestamp=0L;
final long maxOutputOfOrderness=20000L;//允许乱序时间。
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutputOfOrderness);
}
@Override
public long extractTimestamp(Tuple3
element, long l) {
Long timeStamp = element.f0;
currentMaxTimestamp=Math.max(timeStamp,currentMaxTimestamp);
return timeStamp;
}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5653115.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存