工作中使用到了窗口,进行阶段数据的统计。对于大部分场景是能够满足的,但是还是发现了一些问题。比如,窗口内有数据,却无法触发计算。此处以翻滚时间窗口,事件时间取处理时间为例进行说明,窗口触发的条件都是:
① water_mark时间 >= window_end_time;
② 窗口内有数据;
首先确定窗口内有数据,第②个条件满足。但是需要格外注意第①个条件,就是water_mark时间。water_mark时间的更新,需要后续有数据进来。如果后续没有数据进行,water_mark就无法更新,自然也就无法触发窗口的计算,窗口内的数据就无法被计算到。
定时器的优势只要函数中正确注册了定时器,比如当前时间+5s,那么在后面不管有没有数据,5s之后,定时器都会触发,此时,在对应的方法中,实现自己的逻辑,就可以保证,数据得到计算,不会发生窗口内部分数据无法得到计算的问题。
定时器测试代码package com.ywwl.ywdc.live.main; import com.google.common.base.Strings; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import java.util.Iterator; import java.util.Map; import java.util.Properties; public class MyTimerTest { public static void main(String[] args) throws Exception { System.setProperty("HADOOP_USER_NAME", "hdfs"); Config config = ConfigFactory.load(LiveOrderForLocal.class.getClassLoader()); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 禁用全局任务链 env.disableOperatorChaining(); env.setParallelism(1); String brokers = config.getString("kafka.brokers"); String topic = "simple_test"; String groupId = config.getString("kafka.groupId"); String checkPointPath = config.getString("check.point.path.prefix") + topic; StateBackend backend = new EmbeddedRocksDBStateBackend(true); env.setStateBackend(backend); CheckpointConfig conf = env.getCheckpointConfig(); // 任务流取消和故障应保留检查点 conf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); conf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); conf.setCheckpointInterval(10000);//milliseconds conf.setCheckpointTimeout(10 * 60 * 1000);//milliseconds conf.setMinPauseBetweenCheckpoints(10 * 1000); conf.setCheckpointStorage(checkPointPath); Properties props = new Properties(); props.setProperty("bootstrap.servers", brokers); props.setProperty("group.id", groupId); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); props.put("max.poll.records", 1000); props.put("session.timeout.ms", 90000); props.put("request.timeout.ms", 120000); props.put("enable.auto.commit", true); props.put("auto.commit.interval.ms", 100); FlinkKafkaConsumerconsumer = new FlinkKafkaConsumer(topic, new SimpleStringSchema(), props); consumer.setCommitOffsetsonCheckpoints(true); DataStream source = env.addSource(consumer); DataStream > word2One = source.flatMap(new FlatMapFunction >() { @Override public void flatMap(String value, Collector > out) throws Exception { String[] words = value.split(" "); for (String word : words) { if (!Strings.isNullOrEmpty(word)) { out.collect(new Tuple2<>(word, 1L)); } } } }); KeyedStream , String> keyedWordDataStream = word2One.keyBy(new KeySelector , String>() { @Override public String getKey(Tuple2 value) throws Exception { return value.f0; } }); DataStream resultDataStream = keyedWordDataStream.process(new MyProcess()); resultDataStream.print(); env.execute("Test"); } public static class MyProcess extends KeyedProcessFunction , String> { ValueState timerState; MapState wordCountState; MapState testMapState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ValueStateDescriptor timerStateDesc = new ValueStateDescriptor ("timerStateDesc", Long.class); timerState = getRuntimeContext().getState(timerStateDesc); MapStateDescriptor wordCountStateDesc = new MapStateDescriptor ("wordCountStateDesc", String.class, Long.class); wordCountState = getRuntimeContext().getMapState(wordCountStateDesc); MapStateDescriptor testMapStateDesc = new MapStateDescriptor ("testMapStateDesc", String.class, String.class); testMapState = getRuntimeContext().getMapState(testMapStateDesc); } @Override public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { // 初次使用没初始化时,下面三个的值都是null Long timerStatevalue = timerState.value(); Long aLong = wordCountState.get(value.f0); String tttt = testMapState.get("tttt"); // 判断定时器是否为空,为空的话,则创建新的定时器 if (timerState.value() == null) { timerState.update(ctx.timestamp() + 5 * 1000L); ctx.timerService().registerProcessingTimeTimer(timerState.value()); } String word = value.f0; Long count = wordCountState.contains(word) ? wordCountState.get(word) : 0L; count ++; wordCountState.put(word, count); } @Override public void onTimer(long timestamp, onTimerContext ctx, Collector out) throws Exception { // 定时器触发,结果输出 Iterator > iterator = wordCountState.iterator(); if (iterator.hasNext()) { Map.Entry next = iterator.next(); String key = next.getKey(); Long value = next.getValue(); out.collect(key + " : " + value); } // 清空状态 wordCountState.clear(); timerState.clear(); } } }
逻辑:从kafka中读取字符串,空格分隔;统计5s之内的词频。
以上内容仅供参考,可能个人理解会有些许偏差,欢迎斧正!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)