CEP全称 Complex event processing 复杂事件处理
FlinkCEP 是在 Flink 之上实现的复杂事件处理(CEP)库
擅长高吞吐、低延迟的处理,市场上有多种CEP的解决方案,例如Spark,但是Flink专门类库更方便使用
官网链接:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/cep/
检测和发现无边界事件流中多个记录的关联规则,得到满足规则的复杂事件
允许业务定义要从输入流中提取的复杂模式序列
- 定义pattern
- pattern应用到数据流,得到模式流
- 从模式流 获取结果
DataStreaminput = ... Pattern pattern = Pattern. begin("start").where( new SimpleCondition () { @Override public boolean filter(Event event) { return event.getId() == 42; } } ).next("middle").subtype(SubEvent.class).where( new SimpleCondition () { @Override public boolean filter(SubEvent subEvent) { return subEvent.getVolume() >= 10.0; } } ).followedBy("end").where( new SimpleCondition () { @Override public boolean filter(Event event) { return event.getName().equals("end"); } } ); PatternStream patternStream = CEP.pattern(input, pattern); DataStream result = patternStream.process( new PatternProcessFunction () { @Override public void processMatch( Map > pattern, Context ctx, Collector out) throws Exception { out.collect(createalertFrom(pattern)); } });
CEP并不包含在flink中,使用前需要自己导入
4.模式(Pattern):定义处理事件的规则 三种模式PatternAPIorg.apache.flink flink-cep_${scala.version}${flink.version}
- 个体模式(Individual Patterns)
组成复杂规则的每一个单独的模式定义 - 组合模式(Combining Patterns)
很多个体模式组合起来,形成组合模式 - 模式组(Groups of Patterns)
将一个组合模式作为条件嵌套在个体模式里
-
严格近邻
期望所有匹配事件严格地一个接一个出现,中间没有任何不匹配的事件, API是.next()
-
宽松近邻
允许中间出现不匹配的事件,API是.followedBy()
-
非确定性宽松近邻
可以忽略已经匹配的条件,API是followedByAny()
-
指定时间约束
指定模式在多长时间内匹配有效,API是within
-
如果您不希望事件类型直接跟随另一个,notNext()
-
如果您不希望事件类型介于其他两种事件类型之间,notFollowedBy()
- 单次模式
接收一次一个事件 - 循环模式
接收一个或多个事件
其他参数
times:指定固定的循环执行次数
greedy:贪婪模式,尽可能多触发
oneOrMore:指定触发一次或多次
timesOrMore:指定触发固定以上的次数
optional:要么不触发要么触发指定的次数
同个账号,在5秒内连续登录失败2次,则认为存在而已登录问题
数据格式 jack,2021-12-23 10:00:01,-2
public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamds = env.socketTextStream("127.0.0.1", 8888); DataStream > flatMapDS = ds.flatMap(new FlatMapFunction >() { @Override public void flatMap(String value, Collector > out) throws Exception { String[] arr = value.split(","); out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2]))); } }); SingleOutputStreamOperator > watermakerDS = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy //延迟策略去掉了延迟时间,时间是单调递增,event中的时间戳充当了水印 . >forMonotonousTimestamps() //生成一个延迟3s的固定水印 //. >forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner( (event, timestamp) -> { //指定POJO的事件时间列 return TimeUtil.strToDate(event.f1).getTime(); } )); KeyedStream , String> keyedStream = watermakerDS.keyBy(new KeySelector , String>() { @Override public String getKey(Tuple3 value) throws Exception { return value.f0; } }); //定义模式 Pattern , Tuple3 > pattern = Pattern . >begin("firstTimeLogin") .where(new SimpleCondition >() { @Override public boolean filter(Tuple3 value) throws Exception { // -2 是登录失败错误码 return value.f2 == -2; } })//.times(2).within(Time.seconds(10));//不是严格近邻 .next("secondTimeLogin") .where(new SimpleCondition >() { @Override public boolean filter(Tuple3 value) throws Exception { return value.f2 == -2; } }).within(Time.seconds(5)); //匹配检查 PatternStream > patternStream = CEP.pattern(keyedStream, pattern); SingleOutputStreamOperator > select = patternStream.select(new PatternSelectFunction , Tuple3 >() { @Override public Tuple3 select(Map >> map) throws Exception { Tuple3 firstLoginFail = map.get("firstTimeLogin").get(0); Tuple3 secondLoginFail = map.get("secondTimeLogin").get(0); return Tuple3.of(firstLoginFail.f0,firstLoginFail.f1,secondLoginFail.f1); } }); select.print("匹配结果"); env.execute("CEP job"); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)