CEP【复合事件处理】 通过规则,从不同的事件流中找出相关的事件组合(发生事件),并对发现的做进一步处理(处理发生事件)。
CEP:先捕获各种细微的事件(基础事件或简单事件),然后通过分析整理【事件模式】,找出更有意义的事件(复合事件),最后决定采取什么行动。其中事件的分析整理以找出更有意义的事件,是CEP的核心,也是最困难的地方。 有关CEP概念的理解,请参考轻松理解CEP技术
CEP是一种基于动态环境中事件流的分析技术,事件在这里通常是指采集到的各种数据如交易记录,并且连续不间断。通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件模式,最终分析得到更复杂的复合事件。
1.2 FlinkCEPFlinkCEP(Complex event processing for Flink) 是基于Flink实现的复杂事件处理库.
在无界流或有界流中检测出事件模式【event pattern】,从而挖掘出数据的价值。
- 目标:从有序的简单事件流中发现复合事件【即定义的事件模式】
- 输入:一个或多个由简单事件构成的事件流【必须指定watermark,用于时序关系分析】
- 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
- 输出:满足规则的复杂事件
CEP的应用场景有很多,如股票曲线预测、网络入侵、物流订单追踪、电商订单、IOT场景等。
大体上分为如下三类:
- 风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规 *** 作的嫌疑。如当相同的yhk在10分钟内,从两个不同的地方发生刷卡现象,就会触发报警机制,以便于监测xyk盗刷等现象
- 策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
- 运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。
- 趋势分析
- 欺诈检测等
2.2. 开发流程org.apache.flink flink-cep_2.111.13.3
- 读取事件流并转换为DataStream
- 必须指定水印(watermark)
- 定义事件模式(event pattern)
- 在指定事件流上应用事件模式
- 匹配或选择符合条件的事件,并产生告警
部分代码片段参见如下:
//读取事件流 DataStreamSource2.3. 入门代码示例source = env.readTextFile("/data/input/events.txt"); DataStreamSource source = env.socketTextStream("bigdata01", 10088); SingleOutputStreamOperator flatMapStream = source.flatMap((FlatMapFunction ) (v, out) -> { out.collect(new Event(v.split(","))); }).returns(Types.POJO(Event.class));
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternSelectFunction; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.Map; public class MyCEPTest { public static void main(String args[]) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); DataStreamdataStream = env.fromElements( new EventMsg(1L, LocalDateTime.parse("2020-04-15 08:05:01", dateTimeFormatter), "A", "INFO"), new EventMsg(2L, LocalDateTime.parse("2020-04-15 08:06:11", dateTimeFormatter), "A", "error"), new EventMsg(3L, LocalDateTime.parse("2020-04-15 08:07:21", dateTimeFormatter), "A", "critical"), new EventMsg(4L, LocalDateTime.parse("2020-04-15 08:08:21", dateTimeFormatter), "A", "INFO"), new EventMsg(5L, LocalDateTime.parse("2020-04-15 08:09:21", dateTimeFormatter), "B", "INFO"), new EventMsg(6L, LocalDateTime.parse("2020-04-15 08:11:51", dateTimeFormatter), "B", "error"), new EventMsg(7L, LocalDateTime.parse("2020-04-15 08:12:20", dateTimeFormatter), "B", "critical"), new EventMsg(8L, LocalDateTime.parse("2020-04-15 08:15:22", dateTimeFormatter), "B", "INFO"), new EventMsg(9L, LocalDateTime.parse("2020-04-15 08:17:34", dateTimeFormatter), "B", "error")); SingleOutputStreamOperator watermarks = dataStream.assignTimestampsAndWatermarks( // 最大乱序程度 WatermarkStrategy. forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner( (SerializableTimestampAssigner ) (element, recordTimestamp) -> toEpochMilli(element.getEventTime())) ); Pattern pattern = Pattern. begin("start") .next("middle").where(new SimpleCondition () { @Override public boolean filter(EventMsg value) throws Exception { return value.getEventType().equals("error"); } }).followedBy("end").where(new SimpleCondition () { @Override public boolean filter(EventMsg value) throws Exception { return value.getEventType().equals("critical"); } }).within(Time.seconds(180)); PatternStream patternStream = CEP.pattern(watermarks, pattern); DataStream alerts = patternStream.select(new PatternSelectFunction () { @Override public String select(Map > msgs) throws Exception { StringBuffer sb = new StringBuffer(); msgs.forEach((k,v)->{ sb.append(k+","); sb.append(v.toString()+"n"); }); return sb.toString(); } }); alerts.print(); env.execute("Flink CEP Test"); } public static final ZoneOffset zoneOffset8 = ZoneOffset.of("+8"); public static long toEpochMilli(LocalDateTime dt) { return dt.toInstant(zoneOffset8).toEpochMilli(); } @Data @AllArgsConstructor @NoArgsConstructor public static class EventMsg { public long eventId; public LocalDateTime eventTime; public String eventName; public String eventType; @Override public String toString(){ return String.format("%s-%s-%s-%s",eventId,eventName,eventType,eventTime); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)