FlinkCEP 介绍及入门案例 (基于flink 1.13.x的源码)

FlinkCEP 介绍及入门案例 (基于flink 1.13.x的源码),第1张

FlinkCEP 介绍及入门案例 (基于flink 1.13.x的源码) 1. 什么是FlinkCEP? 1.1. CEP

CEP【复合事件处理】 通过规则,从不同的事件流中找出相关的事件组合(发生事件),并对发现的做进一步处理(处理发生事件)。

CEP:先捕获各种细微的事件(基础事件或简单事件),然后通过分析整理【事件模式】,找出更有意义的事件(复合事件),最后决定采取什么行动。其中事件的分析整理以找出更有意义的事件,是CEP的核心,也是最困难的地方。 有关CEP概念的理解,请参考轻松理解CEP技术

CEP是一种基于动态环境中事件流的分析技术,事件在这里通常是指采集到的各种数据如交易记录,并且连续不间断。通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件模式,最终分析得到更复杂的复合事件。

1.2 FlinkCEP

FlinkCEP(Complex event processing for Flink) 是基于Flink实现的复杂事件处理库.
在无界流或有界流中检测出事件模式【event pattern】,从而挖掘出数据的价值。

1.3. 特征
  • 目标:从有序的简单事件流中发现复合事件【即定义的事件模式】
  • 输入:一个或多个由简单事件构成的事件流【必须指定watermark,用于时序关系分析】
  • 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
  • 输出:满足规则的复杂事件
1.4. Flink CEP应用场景

CEP的应用场景有很多,如股票曲线预测、网络入侵、物流订单追踪、电商订单、IOT场景等。
大体上分为如下三类:

  • 风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规 *** 作的嫌疑。如当相同的yhk在10分钟内,从两个不同的地方发生刷卡现象,就会触发报警机制,以便于监测xyk盗刷等现象
  • 策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
  • 运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。
  • 趋势分析
  • 欺诈检测等
2. FlinkCEP入门案例 2.1. 导入依赖

    org.apache.flink
    flink-cep_2.11
    1.13.3

2.2. 开发流程
  1. 读取事件流并转换为DataStream
  2. 必须指定水印(watermark)
  3. 定义事件模式(event pattern)
  4. 在指定事件流上应用事件模式
  5. 匹配或选择符合条件的事件,并产生告警

部分代码片段参见如下:

//读取事件流
DataStreamSource source = env.readTextFile("/data/input/events.txt");
DataStreamSource source = env.socketTextStream("bigdata01", 10088);

SingleOutputStreamOperator flatMapStream = source.flatMap((FlatMapFunction) (v, out) -> {
            out.collect(new Event(v.split(",")));
        }).returns(Types.POJO(Event.class));
2.3. 入门代码示例
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;

public class MyCEPTest {
    public static void main(String args[]) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        DataStream dataStream =
                env.fromElements(
                    new EventMsg(1L, LocalDateTime.parse("2020-04-15 08:05:01", dateTimeFormatter), "A", "INFO"),
                    new EventMsg(2L, LocalDateTime.parse("2020-04-15 08:06:11", dateTimeFormatter), "A", "error"),
                    new EventMsg(3L, LocalDateTime.parse("2020-04-15 08:07:21", dateTimeFormatter), "A", "critical"),
                    new EventMsg(4L, LocalDateTime.parse("2020-04-15 08:08:21", dateTimeFormatter), "A", "INFO"),
                    new EventMsg(5L, LocalDateTime.parse("2020-04-15 08:09:21", dateTimeFormatter), "B", "INFO"),
                    new EventMsg(6L, LocalDateTime.parse("2020-04-15 08:11:51", dateTimeFormatter), "B", "error"),
                    new EventMsg(7L, LocalDateTime.parse("2020-04-15 08:12:20", dateTimeFormatter), "B", "critical"),
                    new EventMsg(8L, LocalDateTime.parse("2020-04-15 08:15:22", dateTimeFormatter), "B", "INFO"),
                    new EventMsg(9L, LocalDateTime.parse("2020-04-15 08:17:34", dateTimeFormatter), "B", "error"));

        SingleOutputStreamOperator watermarks = dataStream.assignTimestampsAndWatermarks(
                // 最大乱序程度
                WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(
                                (SerializableTimestampAssigner) (element, recordTimestamp) -> toEpochMilli(element.getEventTime()))
        );
        Pattern pattern = Pattern.begin("start")
                .next("middle").where(new SimpleCondition() {
                    @Override
                    public boolean filter(EventMsg value) throws Exception {
                        return value.getEventType().equals("error");
                    }
                }).followedBy("end").where(new SimpleCondition() {
                    @Override
                    public boolean filter(EventMsg value) throws Exception {
                        return value.getEventType().equals("critical");
                    }
                }).within(Time.seconds(180));

        PatternStream patternStream = CEP.pattern(watermarks, pattern);

        DataStream alerts = patternStream.select(new PatternSelectFunction() {
            @Override
            public String select(Map> msgs) throws Exception {
                StringBuffer sb = new StringBuffer();
                msgs.forEach((k,v)->{
                  sb.append(k+",");
                  sb.append(v.toString()+"n");
                });
                return sb.toString();
            }
        });

        alerts.print();
        env.execute("Flink CEP Test");
    }

    public static final ZoneOffset zoneOffset8 = ZoneOffset.of("+8");

    public static long toEpochMilli(LocalDateTime dt) {
        return dt.toInstant(zoneOffset8).toEpochMilli();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class EventMsg {
        public long eventId;
        public LocalDateTime eventTime;
        public String eventName;
        public String eventType;

        @Override
        public String toString(){
            return String.format("%s-%s-%s-%s",eventId,eventName,eventType,eventTime);
        }
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5669998.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存