“ 使用Java编写Flink程序,统计从注册到下单过程中的不同uv数。”
通过本篇文章做到深入掌握如下知识点,欢迎探讨,共同学习和进步。
1、checkpoing应用
2、kafka数据源连接器构造,包含重要的水位线设置
3、windowAll窗口函数的应用
4、双流interval join应用
01
—
需求介绍
某一保险公司提供了一个h5注册页面供终端客户注册账号,用户注册后可选择通过h5页面下载APP,然后登录APP,最后在APP中购买保险产品。用户任何一步都有可能流失。在某一活动日,公司领导需实时观察一些用户行为指标,主要有注册用户数、app活跃用户数、注册且下载app用户数,注册且在app下单用户数。
注册用户数:实时1分钟统计当天的累计注册用户数
app活跃用户数:实时1分钟统计当天的app活跃用户数,和实时1分钟统计每小时的app活跃用户数
注册且下载app用户数:实时1分钟统计注册且下载app的用户数
注册且在app下单用户数:实时1分钟统计注册且在app下单的用户数
02
—
开发设计
1、用户注册数据接入kafka ods_user_rt topic中,涉及到的字段主要有user_id和register_time
2、用户行为数据接入kafka ods_traffic_rt topic中,涉及到的字段主要有user_id、event_type(行为事件类型,比如下载app和下单)、event_time(事件发生时间)
3、flink使用ods_user_rt和ods_traffic_rt两条流源进行实时计算,将计算结果保存到如下mysql结果表中,可直接使用提供的sinkapi Jdbc.sink()。
CREATE TABLE IF NOT EXISTS `ads_user_stats_rt`( `stats_time` VARCHAr(20) NOT NULL comment '统计时间', `register_cnt` INT comment '注册用户数', `register_download_cnt` INT comment '注册下载用户数', `register_download_order_cnt` INT comment '注册下单用户数', `active_user_uv_per_hour` INT comment 'app内小时累计活跃用户数', `active_user_uv_per_day` INT comment 'app内天累计活跃用户数', PRIMARY KEY ( `stats_time` ))ENGINE=InnoDB DEFAULT CHARSET=utf8;
4、注册用户数计算思路:单条流可完成计算,按天分窗,每个窗口从零点开始,比如2021-11-21 00:00:00,可每分钟提前触发结果,这里主要采用windowAll窗口函数。app内天累计活跃用户数,和app内小时累计活跃用户数计算方式类似。
5、注册下载数计算思路:需使用两条流计算,采用interval join算子,interval join仅支持inner join效果。interval join在这里应用的原理是行为流数据不晚于用户流8小时且不早于用户流1小时。相当于早到的用户流数据会进行状态缓冲,待限定时间范围内的相同用户行为流数据到来后完成join并输出。注册下单数计算思路同理。
6、因各个指标是1分钟触发一次,所以checkpoint执行间隙会大于1分钟,初步设计为5分钟执行一次checkpoint。
03
—
代码实现
// 实时程序代码 public class UserBehaviorStats { private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(300000); // 设置检查点执行间隔为5分 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 设置时间类型 // kafka配置属性 Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "demo"); // 构造用户数据源连接器 FlinkKafkaConsumer011userRegisterConsumer =new FlinkKafkaConsumer011 ( "ods_user_rt", //设置用户源数据主题 new JSonKeyValueDeserializationSchema(false), props); userRegisterConsumer.assignTimestampsAndWatermarks(new UserMessagePeriodicWatermarks()); userRegisterConsumer.setStartFromLatest(); // 构造流量数据源连接器 FlinkKafkaConsumer011 userBehaviorConsumer =new FlinkKafkaConsumer011 ( "ods_traffic_rt", //设置流量源数据主题 new JSonKeyValueDeserializationSchema(false), props); userBehaviorConsumer.assignTimestampsAndWatermarks(new TrafficMessagePeriodicWatermarks()); userBehaviorConsumer.setStartFromLatest(); // 构造用户注册和行为流 DataStreamSource userRegisterDataStreamSource = env.addSource(userRegisterConsumer); DataStreamSource userBehaviorDataStreamSource = env.addSource(userBehaviorConsumer); // 天累计用户注册数 userRegisterDataStreamSource.map(x -> x.get("value").get("user_id").asText()) .windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))) .process(getProcessAllWindowFunction()) .addSink(getMysqlSink("INSERT INTO ads_user_stats_rt(stats_time,register_cnt) VALUES(?,?) ON DUPLICATE KEY UPDATE register_cnt = ?")); // 天累计活跃用户数 userBehaviorDataStreamSource.map(x -> x.get("value").get("user_id").asText()) .windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))) .process(getProcessAllWindowFunction()) .addSink(getMysqlSink("INSERT INTO ads_user_stats_rt(stats_time,active_user_uv_per_day) VALUES(?,?) ON DUPLICATE KEY UPDATE active_user_uv_per_day = ?")); // 按小时累计活跃注册数 userBehaviorDataStreamSource.map(x -> x.get("value").get("user_id").asText()) .windowAll(TumblingEventTimeWindows.of(Time.hours(1))) .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))) .process(getProcessAllWindowFunction()) .addSink(getMysqlSink("INSERT INTO ads_user_stats_rt(stats_time,active_user_uv_per_hour) VALUES(?,?) ON DUPLICATE KEY UPDATE active_user_uv_per_hour = ?")); // 天累计用户注册且下载用户数 userRegisterDataStreamSource.map(x -> x.get("value").get("user_id").asText()) .keyBy(x -> x) .intervalJoin(userBehaviorDataStreamSource.keyBy(x -> x.get("value").get("user_id").asText())) .between(Time.hours(-1), Time.hours(8)) .process(new ProcessJoinFunction () { @Override public void processElement(String s, ObjectNode s2, Context context, Collector collector) throws Exception { if ("download".equals(s2.get("value").get("event_type").asText())) { collector.collect(s2.get("value").get("user_id").asText()); } } }).windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))) .process(getProcessAllWindowFunction()) .addSink(getMysqlSink("INSERT INTO ads_user_stats_rt(stats_time,register_download_cnt) VALUES(?,?) ON DUPLICATE KEY UPDATE register_download_cnt = ?")); // 天累计用户注册且下单用户数 userRegisterDataStreamSource.map(x -> x.get("value").get("user_id").asText()) .keyBy(x -> x) .intervalJoin(userBehaviorDataStreamSource.keyBy(x -> x.get("value").get("user_id").asText())) .between(Time.hours(-1), Time.hours(8)) .process(new ProcessJoinFunction () { @Override public void processElement(String s, ObjectNode s2, Context context, Collector collector) throws Exception { if ("clickOrder".equals(s2.get("value").get("event_type").asText())) { collector.collect(s2.get("value").get("user_id").asText()); } } }).windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .trigger(ContinuousEventTimeTrigger.of(Time.minutes(1))) .process(getProcessAllWindowFunction()) .addSink(getMysqlSink("INSERT INTO ads_user_stats_rt(stats_time,register_download_order_cnt) VALUES(?,?) ON DUPLICATE KEY UPDATE register_download_order_cnt = ?")); env.execute("flink kafka user behavior sample"); } private static ProcessAllWindowFunction getProcessAllWindowFunction() { return new ProcessAllWindowFunction () { @Override public void process(Context context, Iterable iterable, Collector collector) throws Exception { HashSet hashSet = new HashSet<>(); for(String str : iterable) { hashSet.add(str); } AdsUserStatsRt adsUserStatsRt = new AdsUserStatsRt(); adsUserStatsRt.setStatsTime(simpleDateFormat.format(new Date(context.window().getStart()))); adsUserStatsRt.setFieldValue(hashSet.size()); collector.collect(adsUserStatsRt); } }; } private static SinkFunction getMysqlSink(String sql) { return JdbcSink.sink(sql, new JdbcStatementBuilder() { @Override public void accept(PreparedStatement preparedStatement, AdsUserStatsRt adsUserStatsRt) throws SQLException { preparedStatement.setString(1, adsUserStatsRt.getStatsTime()); preparedStatement.setInt(2, adsUserStatsRt.getFieldValue()); preparedStatement.setInt(3, adsUserStatsRt.getFieldValue()); } }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://localhost:3306/traffic?useSSL=false") .withDriverName("com.mysql.jdbc.Driver") .withUsername("root") .withPassword("123456") .build()); } // 用户数据源时间戳分配器 public class UserMessagePeriodicWatermarks implements AssignerWithPeriodicWatermarks { private long lastTs = Long.MIN_VALUE; @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(lastTs); } @Override public long extractTimestamp(ObjectNode jsonNodes, long l) { lastTs = jsonNodes.get("value").get("register_time").asLong(); return lastTs; } } // 用户行为数据源时间戳分配器 public class TrafficMessagePeriodicWatermarks implements AssignerWithPeriodicWatermarks { private long lastTs = Long.MIN_VALUE; @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(lastTs); } @Override public long extractTimestamp(ObjectNode jsonNodes, long l) { lastTs = jsonNodes.get("value").get("event_time").asLong(); return lastTs; } } // 用于mysql sink的结果pojo public class AdsUserStatsRt { private String statsTime; private String fieldName; private Integer fieldValue; public String getStatsTime() { return statsTime; } public void setStatsTime(String stats_time) { this.statsTime = stats_time; } public String getFieldName() { return fieldName; } public void setFieldName(String fieldName) { this.fieldName = fieldName; } public Integer getFieldValue() { return fieldValue; } public void setFieldValue(Integer fieldValue) { this.fieldValue = fieldValue; } } public class UserBehaviorProducerTest { private static volatile int flag1 = 0, flag2 = 0; private static volatile List users = new ArrayList (); public static void main(String[] args) throws Exception { // 构造kafka producer Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 5000); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer
04
—
遇到的坑
1、因数据源连接器的水位线直接取于消息中的时间字段值,当用户注册消息发送到1000个用户后,不再发送消息,而行为消息还在不断发送,那么实时统计的与用户流相关的指标都会计算错误,不会再输出一个更新值。如用户注册数、用户注册下载数、用户注册下单数。主要原因在于用户注册流水位线处于停止状态。为了解决这个问题,当发送1000个用户注册消息后,需要继续发送一个假的用户注册消息数据来不断更新水位值。
// 用户注册消息线程 Thread userRegisterThread = new Thread(new Runnable() { @Override public void run() { int totalUserCount = 5000; // 设置注册用户数阀值, String user_id = ""; for (int i = 0; i < totalUserCount; i++) { JSonObject jsonObject = new JSonObject(); if (i < 1000) { user_id = UUID.randomUUID().toString(); } else { user_id = "fake_id"; // 满1000后不再生成新的用户,用固定用户消息来保持flink里用户流的水位线在不断推进 } users.add(user_id);
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)