问题:由于数据不连续,导致某个时间段最后一个窗口一直不会触发,一直等到新数据流入才会触发。
解决:通过重写WatermarkStrategy方法进行控制。
思路:每来一条新数据时会触发一次onEvent方法,如无参数控制,onPeriodicEmit会周期性触发。所以可以通过判断最后一条新数据进来的时间与系统时间做比较,一旦x秒数据数据没有进来则触发新的watermark逻辑;
public static class watermarkCust implements WatermarkStrategy{ private Tuple2 state = Tuple2.of(0L,true); @Override public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator () { private long maxWatermark; private long dexl = 3000L; @Override public void onEvent(Order waterSensor, long l, WatermarkOutput watermarkOutput) { maxWatermark = Math.max(maxWatermark,waterSensor.getAmount()); state.f0 = System.currentTimeMillis(); state.f1 = false; } @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) { if (maxWatermark - dexl <=0){ } else { if (System.currentTimeMillis() - state.f0 >= 20000L && state.f1 == false){ watermarkOutput.emitWatermark(new Watermark(maxWatermark - dexl + 10000L)); state.f1 = true; System.out.println("触发窗口"); } else { watermarkOutput.emitWatermark(new Watermark(maxWatermark - dexl)); } } } }; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)