package com.biaodian.flink.taskregister; import com.biaodian.flink.constant.Constant; import com.biaodian.flink.dto.RegisterDto; import com.biaodian.flink.function.register.*; import com.biaodian.flink.keyby.RegisterKeyBy; import com.biaodian.flink.keyby.RegisterSiteKeyBy; import com.biaodian.flink.tool.CountOrProcessingTimeTrigger; import com.biaodian.flink.tool.PropertiesTool; import com.biaodian.flink.tool.PropertiesUtil; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Date; import java.util.Properties; import java.util.TimeZone; public class FlinkRegisterUser { public static void main(String[] args) throws Exception { //时区设置 TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai")); System.out.println("当前时间=" + new Date()); //保存点路径 String path = FlinkRegisterUser.class.getClassLoader().getResource("").getPath(); //检查点保存路径 path = Constant.CHECKPOINT_PREFIX + path + Constant.CHECKPOINT_REGISTER_SUFFIX; System.out.println("保存点路径=" + path); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(Constant.ENABLE_CHECKPOINTING); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.setMinPauseBetweenCheckpoints(Constant.MIN_PAUSE_BETWEEN_CHECKPOINTS); checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.setCheckpointStorage(path); //kafka配置信息 String topicName = PropertiesTool.getProperty("kafka.register.topic"); Properties properties = PropertiesUtil.getProperties(); properties.setProperty("group.id", PropertiesTool.getProperty("kafka.register.topic.group-id")); FlinkKafkaConsumerconsumer = new FlinkKafkaConsumer (topicName, new SimpleStringSchema(), properties); //kafka数据源 DataStream source = env.addSource(consumer).uid("kafka"); //数据转化 DataStream map = source.map(new RegisterMapFunction()).uid("map"); //数据过滤 只取今天的数据 DataStream sourceData = map.filter(new RegisterFilterFunction()).uid("filter"); //开窗处理数据 platform pvType siteId DataStream register = sourceData.keyBy(new RegisterKeyBy()) .window(TumblingProcessingTimeWindows.of(Time.days(Constant.INTEGER_ONE), Time.hours(-8))) .trigger(CountOrProcessingTimeTrigger.of(Time.seconds(Constant.INTEGER_ONE), 1000L)) .aggregate(new RegisterAggregateFunction(), new RegisterWindowFunction()).uid("register"); DataStream registerSite = sourceData.keyBy(new RegisterSiteKeyBy()) .window(TumblingProcessingTimeWindows.of(Time.days(Constant.INTEGER_ONE), Time.hours(-8))) .trigger(CountOrProcessingTimeTrigger.of(Time.seconds(Constant.INTEGER_ONE), 1000L)) .aggregate(new RegisterSiteAggregateFunction(), new RegisterSiteWindowFunction()).uid("registerSite"); //添加到redis缓存中 siteId env.execute("registerUserNum"); } }
package com.biaodian.flink.function.register; import cn.hutool.core.date.DateTime; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.biaodian.flink.constant.Constant; import com.biaodian.flink.constant.PlatFormEnum; import com.biaodian.flink.constant.PvTypeEnum; import com.biaodian.flink.dto.RegisterDto; import org.apache.flink.api.common.functions.MapFunction; public class RegisterMapFunction implements MapFunction{ JSonObject jsonObject = null; JSonObject data = null; RegisterDto dto = null; String create_at = null; @Override public RegisterDto map(String value) throws Exception { dto = new RegisterDto(); jsonObject = JSON.parseObject(value.toString()); data = jsonObject.getJSonObject("data"); create_at = new DateTime(data.getDate("created_at").getTime()).toDateStr(); dto.setSourcePlatform(jsonObject.getString("sourcePlatform")); dto.setPlatform(PlatFormEnum.namevalue(data.getString("platform"))); dto.setPvType(PvTypeEnum.namevalue(data.getString("platform"))); dto.setSiteId(data.getString("application_merchant_id")); dto.setEventType(data.getString("event_type")); dto.setUid(data.getString("user_id")); dto.setStatisticsDateNum(Integer.parseInt(create_at.replaceAll(Constant.UNDER_LINE, Constant.EMPTY_STRING))); dto.setNum(Constant.INTEGER_ONE); return dto; } }
package com.biaodian.flink.function.register; import cn.hutool.core.date.DateUtil; import com.biaodian.flink.constant.Constant; import com.biaodian.flink.dto.RegisterDto; import org.apache.flink.api.common.functions.FilterFunction; public class RegisterFilterFunction implements FilterFunction{ Integer dateInt = null; @Override public boolean filter(RegisterDto value) throws Exception { dateInt = Integer.parseInt(DateUtil.date().toDateStr().replaceAll(Constant.UNDER_LINE,Constant.EMPTY_STRING)); return dateInt.equals(value.getStatisticsDateNum()); } }
package com.biaodian.flink.keyby; import com.biaodian.flink.dto.RegisterDto; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple6; public class RegisterKeyBy implements KeySelector> { @Override public Tuple6 getKey(RegisterDto value) throws Exception { return new Tuple6<>(value.getStatisticsDateNum(), value.getSourcePlatform(), value.getPlatform(), value.getPvType(), value.getSiteId(), value.getEventType()); } }
package com.biaodian.flink.tool; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; public class CountOrProcessingTimeTriggerextends Trigger
package com.biaodian.flink.function.register; import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; import com.biaodian.flink.constant.Constant; import com.biaodian.flink.dto.RegisterDto; import com.biaodian.flink.tool.RedisUtil; import org.apache.flink.api.common.functions.AggregateFunction; import org.redisson.api.RBloomFilter; import java.util.concurrent.TimeUnit; public class RegisterAggregateFunction implements AggregateFunction{ private RBloomFilter
package com.biaodian.flink.function.register; import com.biaodian.flink.dto.RegisterDto; import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class RegisterWindowFunction implements WindowFunction, TimeWindow> { @Override public void apply(Tuple6 tuple6, TimeWindow window, Iterable input, Collector out) throws Exception { } }
package com.biaodian.flink.function.register; import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; import com.biaodian.flink.constant.Constant; import com.biaodian.flink.dto.RegisterDto; import com.biaodian.flink.tool.RedisUtil; import org.apache.flink.api.common.functions.AggregateFunction; import org.redisson.api.RBloomFilter; import java.util.concurrent.TimeUnit; public class RegisterSiteAggregateFunction implements AggregateFunction{ private RBloomFilter
package com.biaodian.flink.function.register; import com.biaodian.flink.dto.RegisterDto; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; public class RegisterSiteWindowFunction implements WindowFunction, TimeWindow> { @Override public void apply(Tuple4 tuple4, TimeWindow window, Iterable input, Collector out) throws Exception { } }
package com.biaodian.flink.tool; import java.io.IOException; import java.io.InputStream; import java.util.Properties; public class PropertiesTool { private static final Properties properties = new Properties(); static { InputStream inputStream = PropertiesTool.class.getClassLoader().getResourceAsStream("application.properties"); try { properties.load(inputStream); if ("dev".equals(properties.getProperty("spring.profiles.active"))){ inputStream = PropertiesTool.class.getClassLoader().getResourceAsStream("application-dev.properties"); }else { inputStream = PropertiesTool.class.getClassLoader().getResourceAsStream("application-prod.properties"); } properties.load(inputStream); } catch (IOException e) { e.printStackTrace(); } } private PropertiesTool(){} public static String getProperty(String key){ return properties.getProperty(key); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)