- 参考地址
- 基本概念
- pom 依赖
- code
- 数据源
- LoginEvent 实体类
- 业务逻辑代码
- 运行效果图
flink cep 官网链接
基本概念FlinkCEP是在Flink上层实现的复杂事件处理库。 它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分。 本页讲述了Flink CEP中可用的API,我们首先讲述模式API,它可以让你指定想在数据流中检测的模式,然后讲述如何检测匹配的事件序列并进行处理。 再然后我们讲述Flink在按照事件时间处理迟到事件时的假设, 以及如何从旧版本的Flink向1.3之后的版本迁移作业。pom 依赖
code 数据源org.apache.flink flink-cep_2.111.14.0 org.apache.flink flink-cep-scala_2.111.14.0
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; public class LoginSource extends RichSourceFunctionLoginEvent 实体类{ List loginStatusList; List userIdList; List userNameList; Random random; Boolean isRunning; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); loginStatusList = Arrays.asList("failed", "failed"); // loginStatusList = Arrays.asList("success", "failed"); userIdList = Arrays.asList(1, 2, 3, 4, 5); userNameList = Arrays.asList("zhangsan", "lisi", "wangwu", "maliu", "yanqi"); random = new Random(); isRunning = true; } @Override public void run(SourceContext ctx) throws Exception { while (isRunning) { Long currentTimeStamp = System.currentTimeMillis(); final int index = random.nextInt(5); int id = userIdList.get(index); String userName = userNameList.get(index); final int statusIndex = random.nextInt(2); final String status = loginStatusList.get(statusIndex); ctx.collect(new LoginEvent(id, userName, status, currentTimeStamp)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isRunning = false; loginStatusList = new ArrayList<>(); userIdList = new ArrayList<>(); userNameList = new ArrayList<>(); } }
@Data @ToString @NoArgsConstructor @AllArgsConstructor @EqualsAndHashCode public class LoginEvent { private int userId; private String userName; private String loginStatus; private Long loginTime; }业务逻辑代码
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.functions.PatternProcessFunction; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; import java.time.Duration; import java.util.List; import java.util.Map; public class FlinkCEPLoginEvent { public static void main(String[] args) throws Exception{ //获取运行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(3000L); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //添加数据源 final DataStreamSource运行效果图loginEventDataStreamSource = env.addSource(new LoginSource()); final SingleOutputStreamOperator loginEventSingleOutputStreamOperator = loginEventDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy . forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner((event, timestamp) -> event.getLoginTime())); final KeyedStream loginEventIntegerKeyedStream = loginEventSingleOutputStreamOperator.keyBy(new KeySelector () { @Override public Integer getKey(LoginEvent value) throws Exception { return value.getUserId(); } }); final Pattern pattern = Pattern. begin("start").where(new SimpleCondition () { @Override public boolean filter(LoginEvent value) throws Exception { return value.getLoginStatus().equals("failed"); } }).next("middle").where(new SimpleCondition () { @Override public boolean filter(LoginEvent value) throws Exception { return value.getLoginStatus().equals("failed"); } }).within(Time.seconds(3)); final PatternStream patternStream = CEP.pattern(loginEventIntegerKeyedStream, pattern); loginEventIntegerKeyedStream.print("original: "); final SingleOutputStreamOperator afterPDs = patternStream.process(new PatternProcessFunction () { @Override public void processMatch(Map > map, Context context, Collector collector) throws Exception { System.out.println(map.toString()); final LoginEvent start = map.get("start").get(0); final LoginEvent middle = map.get("middle").get(0); collector.collect(String.format("{%s} login failed, 1st timeStamp: %s, 2nd: %s.", start.getUserName(), start.getLoginTime(), middle.getLoginTime())); } }); // loginEventDataStreamSource.print(); afterPDs.print("afterPDs: "); System.out.println(env.getExecutionPlan()); env.execute("Test flink CEP"); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)