在Flink中,当我们需要获取到算子的Processing Time或者Water Mark以及定时器时,可以实现ProcessFunction函数。
目前该函数主要有:K恶业的ProcessFuntion,ProcessFunction,CoPropcessFunction等,核心功能主要如下:
可以使用状态计算,能够在算子中访问Keyed State可以设置定时器侧输出,可以将一部分数据发送到另外一个数据流中,而且输出的两个数据流数据类型可以不一样。
如下自定义实现一个KeyedProcessFunction:
public class MyKeyedProcessFunctionJava extends KeyedProcessFunction{ private ValueState currentTime; private ValueState lastPrice ; private static long intervalMs = 500 ; OutputTag highLevel ; OutputTag middleLevel; OutputTag lowLevel; public MyKeyedProcessFunctionJava(OutputTag highLevel, OutputTag middleLevel, OutputTag lowLevel){ this.lowLevel = lowLevel; this.middleLevel= middleLevel; this.highLevel = highLevel; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); currentTime = getRuntimeContext().getState(new ValueStateDescriptor ("currentTime",Long.class)); lastPrice = getRuntimeContext().getState(new ValueStateDescriptor ("lastPrice",Double.class)); } @Override public void processElement(StockPrice stockPrice, Context context, Collector collector) throws Exception { double price = 0; if(lastPrice.value() != null) { price = lastPrice.value() ; } long currentTimeStamp = 0; if(currentTime.value() != null){ currentTimeStamp = currentTime.value(); } if(price < stockPrice.getPrice()){ context.timerService().deleteEventTimeTimer(currentTimeStamp); }else{ long time = context.timestamp()+intervalMs; context.timerService().registerEventTimeTimer(time); currentTime.update(time); } lastPrice.update(stockPrice.getPrice()); if(price>10000){ context.output(highLevel,stockPrice); }else if(price<= 10000 && price > 3000){ context.output(middleLevel,stockPrice); }else{ context.output(lowLevel,stockPrice); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { SimpleDateFormat format = new SimpleDateFormat("yyy-MM-dd HH:mm:ss"); String timeStr = format.format(timestamp); String warnings = String.format("warnings: time=%s ,key=%s increased",timeStr,ctx.getCurrentKey()); out.collect(warnings); } public static void main(String[] args) { String topic = "test001"; Properties kafkaProps = new Properties(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer consumer = new FlinkKafkaConsumer (topic, new MyKafkaDeserializationSchema(), kafkaProps); OutputTag highLevel = new OutputTag ("highLevel"); OutputTag middleLevel = new OutputTag ("middleLevel"); OutputTag lowLevel = new OutputTag ("lowLevel"); SingleOutputStreamOperator warning = env.addSource(consumer) .keyBy(stockPrice -> stockPrice.getId()) .process(new MyKeyedProcessFunctionJava(highLevel,middleLevel,lowLevel)); DataStream highLevelStream = warning.getSideOutput(highLevel); DataStream middleLevelStream = warning.getSideOutput(middleLevel); DataStream lowLevelStream = warning.getSideOutput(lowLevel); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)