Flink CEP 及 代码演示 -- Flink的复杂事件处理

Flink CEP 及 代码演示 -- Flink的复杂事件处理,第1张

Flink CEP 及 代码演示 -- Flink的复杂事件处理

Flink CEP
  • 参考地址
  • 基本概念
  • pom 依赖
  • code
    • 数据源
    • LoginEvent 实体类
    • 业务逻辑代码
    • 运行效果图

参考地址

flink cep 官网链接

基本概念
FlinkCEP是在Flink上层实现的复杂事件处理库。 它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分。

本页讲述了Flink CEP中可用的API,我们首先讲述模式API,它可以让你指定想在数据流中检测的模式,然后讲述如何检测匹配的事件序列并进行处理。 再然后我们讲述Flink在按照事件时间处理迟到事件时的假设, 以及如何从旧版本的Flink向1.3之后的版本迁移作业。
pom 依赖


    org.apache.flink
    flink-cep_2.11
    1.14.0




    org.apache.flink
    flink-cep-scala_2.11
    1.14.0

code 数据源
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;


public class LoginSource extends RichSourceFunction {

    List loginStatusList;
    List userIdList;
    List userNameList;
    Random random;
    Boolean isRunning;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        loginStatusList = Arrays.asList("failed", "failed");
//        loginStatusList = Arrays.asList("success", "failed");
        userIdList = Arrays.asList(1, 2, 3, 4, 5);
        userNameList = Arrays.asList("zhangsan", "lisi", "wangwu", "maliu", "yanqi");
        random = new Random();
        isRunning = true;
    }

    @Override
    public void run(SourceContext ctx) throws Exception {
        while (isRunning) {
            Long currentTimeStamp = System.currentTimeMillis();
            final int index = random.nextInt(5);
            int id = userIdList.get(index);
            String userName = userNameList.get(index);
            final int statusIndex = random.nextInt(2);
            final String status = loginStatusList.get(statusIndex);
            ctx.collect(new LoginEvent(id, userName, status, currentTimeStamp));
            TimeUnit.SECONDS.sleep(1);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
        loginStatusList = new ArrayList<>();
        userIdList = new ArrayList<>();
        userNameList = new ArrayList<>();
    }
}


LoginEvent 实体类
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class LoginEvent {

    private int userId;
    private String userName;
    private String loginStatus;
    private Long loginTime;

}

业务逻辑代码
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.List;
import java.util.Map;



public class FlinkCEPLoginEvent {
    public static void main(String[] args) throws Exception{
        //获取运行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000L);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //添加数据源
        final DataStreamSource loginEventDataStreamSource = env.addSource(new LoginSource());

        final SingleOutputStreamOperator loginEventSingleOutputStreamOperator = loginEventDataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy
                .forBoundedOutOfOrderness(Duration.ofSeconds(10))
                .withTimestampAssigner((event, timestamp) -> event.getLoginTime()));

        final KeyedStream loginEventIntegerKeyedStream = loginEventSingleOutputStreamOperator.keyBy(new KeySelector() {
            @Override
            public Integer getKey(LoginEvent value) throws Exception {
                return value.getUserId();
            }
        });

        final Pattern pattern = Pattern.begin("start").where(new SimpleCondition() {
            @Override
            public boolean filter(LoginEvent value) throws Exception {
                return value.getLoginStatus().equals("failed");
            }
        }).next("middle").where(new SimpleCondition() {
            @Override
            public boolean filter(LoginEvent value) throws Exception {
                return value.getLoginStatus().equals("failed");
            }
        }).within(Time.seconds(3));

        final PatternStream patternStream = CEP.pattern(loginEventIntegerKeyedStream, pattern);
        loginEventIntegerKeyedStream.print("original: ");
        final SingleOutputStreamOperator afterPDs = patternStream.process(new PatternProcessFunction() {
            @Override
            public void processMatch(Map> map, Context context, Collector collector) throws Exception {
                System.out.println(map.toString());
                final LoginEvent start = map.get("start").get(0);
                final LoginEvent middle = map.get("middle").get(0);

                collector.collect(String.format("{%s} login failed, 1st timeStamp: %s, 2nd: %s.", start.getUserName(), start.getLoginTime(), middle.getLoginTime()));
            }
        });
//        loginEventDataStreamSource.print();
        afterPDs.print("afterPDs: ");
        System.out.println(env.getExecutionPlan());
        env.execute("Test flink CEP");

    }
}


运行效果图

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5653531.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存