Flink CEP开发流程介绍

Flink CEP开发流程介绍,第1张

Flink CEP开发流程介绍 FlinkCEP 1.CEP

CEP全称 Complex event processing 复杂事件处理
FlinkCEP 是在 Flink 之上实现的复杂事件处理(CEP)库
擅长高吞吐、低延迟的处理,市场上有多种CEP的解决方案,例如Spark,但是Flink专门类库更方便使用
官网链接:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/cep/

2.应用场景

检测和发现无边界事件流中多个记录的关联规则,得到满足规则的复杂事件
允许业务定义要从输入流中提取的复杂模式序列

3.使用流程
  1. 定义pattern
  2. pattern应用到数据流,得到模式流
  3. 从模式流 获取结果
DataStream input = ...
​
Pattern pattern = Pattern.begin("start").where(
        new SimpleCondition() {
            @Override
            public boolean filter(Event event) {
                return event.getId() == 42;
            }
        }
    ).next("middle").subtype(SubEvent.class).where(
        new SimpleCondition() {
            @Override
            public boolean filter(SubEvent subEvent) {
                return subEvent.getVolume() >= 10.0;
            }
        }
    ).followedBy("end").where(
         new SimpleCondition() {
            @Override
            public boolean filter(Event event) {
                return event.getName().equals("end");
            }
         }
    );
​
PatternStream patternStream = CEP.pattern(input, pattern);
​
DataStream result = patternStream.process(
    new PatternProcessFunction() {
        @Override
        public void processMatch(
                Map> pattern,
                Context ctx,
                Collector out) throws Exception {
            out.collect(createalertFrom(pattern));
        }
    });

CEP并不包含在flink中,使用前需要自己导入

 
            org.apache.flink
            flink-cep_${scala.version}
            ${flink.version}

4.模式(Pattern):定义处理事件的规则 三种模式PatternAPI
  • 个体模式(Individual Patterns)
    组成复杂规则的每一个单独的模式定义
  • 组合模式(Combining Patterns)
    很多个体模式组合起来,形成组合模式
  • 模式组(Groups of Patterns)
    将一个组合模式作为条件嵌套在个体模式里
近邻模式
  • 严格近邻

    期望所有匹配事件严格地一个接一个出现,中间没有任何不匹配的事件, API是.next()

  • 宽松近邻

允许中间出现不匹配的事件,API是.followedBy()

  • 非确定性宽松近邻

    可以忽略已经匹配的条件,API是followedByAny()

  • 指定时间约束

    指定模式在多长时间内匹配有效,API是within

  • 如果您不希望事件类型直接跟随另一个,notNext()

  • 如果您不希望事件类型介于其他两种事件类型之间,notFollowedBy()

模式分类
  • 单次模式
    接收一次一个事件
  • 循环模式
    接收一个或多个事件

其他参数
times:指定固定的循环执行次数
greedy:贪婪模式,尽可能多触发
oneOrMore:指定触发一次或多次
timesOrMore:指定触发固定以上的次数
optional:要么不触发要么触发指定的次数

5.代码示例: 需求:

同个账号,在5秒内连续登录失败2次,则认为存在而已登录问题
数据格式 jack,2021-12-23 10:00:01,-2

public static void main(String[] args) throws Exception {
​
        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
​
        env.setParallelism(1);
​
​
        DataStream ds = env.socketTextStream("127.0.0.1", 8888);
​
​
        DataStream> flatMapDS = ds.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String value, Collector> out) throws Exception {
                String[] arr = value.split(",");
                out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
            }
        });
​
​
SingleOutputStreamOperator> watermakerDS = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy
​
                //延迟策略去掉了延迟时间,时间是单调递增,event中的时间戳充当了水印
                .>forMonotonousTimestamps()
                //生成一个延迟3s的固定水印
                //.>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(
                        (event, timestamp) -> {
                            //指定POJO的事件时间列
                            return TimeUtil.strToDate(event.f1).getTime();
                        }
                ));
​
​
​
        KeyedStream, String> keyedStream = watermakerDS.keyBy(new KeySelector, String>() {
            @Override
            public String getKey(Tuple3 value) throws Exception {
                return value.f0;
            }
        });
​
        //定义模式
        Pattern, Tuple3> pattern = Pattern
                .>begin("firstTimeLogin")
​
                .where(new SimpleCondition>() {
                    @Override
                    public boolean filter(Tuple3 value) throws Exception {
​
                        // -2 是登录失败错误码
                        return value.f2 == -2;
                    }
                })//.times(2).within(Time.seconds(10));//不是严格近邻
                .next("secondTimeLogin")
                .where(new SimpleCondition>() {
                    @Override
                    public boolean filter(Tuple3 value) throws Exception {
                        return value.f2 == -2;
                    }
                }).within(Time.seconds(5));
​
​
        //匹配检查
        PatternStream> patternStream = CEP.pattern(keyedStream, pattern);
​
        SingleOutputStreamOperator> select = patternStream.select(new PatternSelectFunction, Tuple3>() {
            @Override
            public Tuple3 select(Map>> map) throws Exception {
​
                Tuple3 firstLoginFail =  map.get("firstTimeLogin").get(0);
​
                Tuple3 secondLoginFail =  map.get("secondTimeLogin").get(0);
​
                return Tuple3.of(firstLoginFail.f0,firstLoginFail.f1,secondLoginFail.f1);
            }
        });
​
        select.print("匹配结果");
​
        env.execute("CEP job");
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5683599.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存