用到的数据文件 链接:https://pan.baidu.com/s/1uCk-IF4wWVfUkuuTAKaD0w 提取码:2hmu输入bean
package com.atguigu.networkflow_analysis.beans; public class UserBehavior { public Long userId; public Long itemId; public Integer categoryId; public String behavior; public Long timestamp; public UserBehavior() { } public UserBehavior(Long userId, Long itemId, Integer categoryId, String behavior, Long timestamp) { this.userId = userId; this.itemId = itemId; this.categoryId = categoryId; this.behavior = behavior; this.timestamp = timestamp; } public Long getUserId() { return userId; } public void setUserId(Long userId) { this.userId = userId; } public Long getItemId() { return itemId; } public void setItemId(Long itemId) { this.itemId = itemId; } public Integer getCategoryId() { return categoryId; } public void setCategoryId(Integer categoryId) { this.categoryId = categoryId; } public String getBehavior() { return behavior; } public void setBehavior(String behavior) { this.behavior = behavior; } public Long getTimestamp() { return timestamp; } public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } @Override public String toString() { return "UserBehavior{" + "userId=" + userId + ", itemId=" + itemId + ", categoryId=" + categoryId + ", behavior='" + behavior + ''' + ", timestamp=" + timestamp + '}'; } }输出bean
package com.atguigu.networkflow_analysis.beans; public class PageViewCount { private String url; private Long windowEnd; private Long count; public PageViewCount() { } public PageViewCount(String url, Long windowEnd, Long count) { this.url = url; this.windowEnd = windowEnd; this.count = count; } public String getUrl() { return url; } public Long getWindowEnd() { return windowEnd; } public Long getCount() { return count; } public void setUrl(String url) { this.url = url; } public void setWindowEnd(Long windowEnd) { this.windowEnd = windowEnd; } public void setCount(Long count) { this.count = count; } @Override public String toString() { return "PageViewCount{" + "url='" + url + ''' + ", windowEnd=" + windowEnd + ", count=" + count + '}'; } }PageView(pv)统计
package com.atguigu.networkflow_analysis.Ahotpages; import com.atguigu.networkflow_analysis.beans.PageViewCount; import com.atguigu.networkflow_analysis.beans.UserBehavior; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.net.URL; import java.util.Random; public class PageView { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); URL resource = PageView.class.getResource("/UserBehavior.csv");//读取文件 DataStreamSourceUniqueVisitor(uv)统计inputStream = env.readTextFile(resource.getPath()); DataStream dataStream=inputStream.map( line ->{ String [] words=line.split(","); return new UserBehavior(new Long(words[0]),new Long(words[1]),new Integer(words[2]),new String(words[3]),new Long(words[4])); }) .assignTimestampsAndWatermarks( new AscendingTimestampExtractor () { //升序 @Override public long extractAscendingTimestamp(UserBehavior userBehavior) {//获取事件时间戳,秒级转毫秒级 return userBehavior.getTimestamp()*1000L; } }); //分组聚合得到结果数据 SingleOutputStreamOperator pvCountStream = dataStream .filter(data -> "pv".equals(data.getBehavior())) //过滤“pv”行为 .map(new MapFunction >() {//创建[1,4]随机key,让数据均匀遍布4个分区 @Override public Tuple2 map(UserBehavior value) throws Exception { Random random = new Random(); return new Tuple2<>(random.nextInt(4)+1,1L); } }) .keyBy(data -> data.f0) .timeWindow(Time.minutes(60)) //每1小时更新一次窗口数据 .aggregate(new PvCountAgg(),new PvCountResult()); SingleOutputStreamOperator countPvResult = pvCountStream .keyBy(PageViewCount::getWindowEnd) .process(new TotalPvCount()); //输出并执行 countPvResult.print(); env.execute("hot items analysis"); } //泛型1:输入类型 泛型2:聚合状态类型 泛型3:输出类型 public static class PvCountAgg implements AggregateFunction ,Long,Long> { @Override public Long createAccumulator() { return 0L; } @Override public Long add(Tuple2 value, Long accumulator) { return accumulator+1; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return a+b; } } //参数1:输入类型 参数2:输出类型 参数3:keyBy的返回值键值对中value的类型 参数4: 窗口类型 public static class PvCountResult implements WindowFunction { @Override public void apply(Integer integer, TimeWindow window, Iterable iterable, Collector collector) throws Exception { collector.collect(new PageViewCount(integer.toString(),window.getEnd(),iterable.iterator().next())); } } //参数1:keyBy返回值类型 参数2:输入类型 参数3:输出类型 public static class TotalPvCount extends KeyedProcessFunction { ValueState totalCountState; //保存当前总的count值 @Override public void open(Configuration parameters) throws Exception { totalCountState=getRuntimeContext().getState(new ValueStateDescriptor ("total_count",Long.class,0L)); } @Override public void processElement(PageViewCount pageViewCount, Context context, Collector collector) throws Exception { totalCountState.update(pageViewCount.getCount()+totalCountState.value()); //count致叠加 context.timerService().registerEventTimeTimer(pageViewCount.getWindowEnd());//注册定时器 } @Override public void onTimer(long timestamp, onTimerContext ctx, Collector out) throws Exception { Long totalCount = totalCountState.value(); out.collect(new PageViewCount("pv_count",ctx.getCurrentKey(),totalCount)); //清空状态 totalCountState.clear(); } @Override public void close() throws Exception { totalCountState.clear(); } } }
package com.atguigu.networkflow_analysis.Ahotpages; import com.atguigu.networkflow_analysis.beans.PageViewCount; import com.atguigu.networkflow_analysis.beans.UserBehavior; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import redis.clients.jedis.Jedis; import java.net.URL; public class UniqueVisitorVersion2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); URL resource = UniqueVisitorVersion1.class.getResource("/UserBehavior.csv");//读取文件 DataStreamSource依赖inputStream = env.readTextFile(resource.getPath()); DataStream dataStream=inputStream.map( line ->{ String [] words=line.split(","); return new UserBehavior(new Long(words[0]),new Long(words[1]),new Integer(words[2]),new String(words[3]),new Long(words[4])); }) .assignTimestampsAndWatermarks( new AscendingTimestampExtractor () { //升序 @Override public long extractAscendingTimestamp(UserBehavior userBehavior) {//获取事件时间戳,秒级转毫秒级 return userBehavior.getTimestamp()*1000L; } }); //处理 SingleOutputStreamOperator uvCountStream = dataStream .filter(data -> "pv".equals(data.getBehavior())) .timeWindowAll(Time.minutes(60)) .trigger(new MyTrigger()) .process(new UvCountResultWithBloomFilter()); //执行 uvCountStream.print(); env.execute("统计uv"); } public static class MyTrigger extends Trigger { //每来一个元素触发一次 @Override public TriggerResult onElement(UserBehavior userBehavior, long l, TimeWindow window, TriggerContext triggerContext) throws Exception { return TriggerResult.FIRE_AND_PURGE; //即每来一条数据触发窗口计算又清空窗口数据 } //在处理时间上自定义触发 @Override public TriggerResult onProcessingTime(long l, TimeWindow window, TriggerContext triggerContext) throws Exception { return TriggerResult.CONTINUE;//什么都不干 } //在事件时间上触发 @Override public TriggerResult onEventTime(long l, TimeWindow window, TriggerContext triggerContext) throws Exception { return TriggerResult.CONTINUE;//什么都不干 } //主要用来清除一些自定义状态 @Override public void clear(TimeWindow window, TriggerContext triggerContext) throws Exception { } } public static class MyBloomFilter{ private Integer cap;//位图的大小,一般需要时2的倍数 public MyBloomFilter(Integer cap){ this.cap=cap; } public Long hashCode(String value,Integer seed){ Long result=1L; for(int i=0;i { private Jedis jedis; private MyBloomFilter mybloomFilter; @Override public void open(Configuration parameters) throws Exception { jedis=new Jedis("localhost",6379); mybloomFilter=new MyBloomFilter(1<<29);//约为1亿位 } @Override public void process(Context context, Iterable iterable, Collector collector) throws Exception { Long windowEnd = context.window().getEnd(); String bitMapKey = windowEnd.toString(); String countHashName="uv_count"; String countKey=windowEnd.toString(); //取出当前useId Long userId = iterable.iterator().next().userId; //计算出当前userId在位图中的offset Long offset = mybloomFilter.hashCode(userId.toString(), 61); //用redis的getbit命令,判断对应位置上的值 Boolean isExist = jedis.getbit(bitMapKey, offset); if(!isExist){ //如果对应offset上没有值,则对应位置设为1 jedis.setbit(bitMapKey,offset,true); //更新redis保存的count值 Long uvCount=0L; String uvCountString = jedis.hget(countHashName, countKey);//获取count值 if(uvCountString !=null && !"".equals(uvCountString)){ uvCount = Long.valueOf(uvCountString); } jedis.hset(countHashName,countKey,String.valueOf(uvCount+1L));//设置新的count值 //输出 collector.collect(new PageViewCount("uv",windowEnd,uvCount+1L)); } } @Override public void close() throws Exception { jedis.close(); } } }
4.0.0 com.atguigu UserBehaviorAnalysispom 1.0-SNAPSHOT HotItemsAnalysis NetworkFlowAnalysis KafkaDemo JavaReview 1.10.1 2.11 2.2.0 org.apache.flink flink-clients_${scala.binary.version}${flink.version} org.apache.flink flink-java${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version}${flink.version} org.apache.kafka kafka_${scala.binary.version}${kafka.version} org.apache.flink flink-connector-kafka_${scala.binary.version}${flink.version} org.apache.flink flink-table-planner-blink_${scala.binary.version}${flink.version} org.apache.flink flink-table-planner_${scala.binary.version}${flink.version} org.apache.flink flink-csv${flink.version} org.apache.flink flink-connector-kafka-0.11_${scala.binary.version}${flink.version} org.apache.flink flink-connector-redis_${scala.binary.version}1.1.5 org.apache.flink flink-connector-elasticsearch6_${scala.binary.version}${flink.version} mysql mysql-connector-java5.1.44 org.apache.kafka kafka-clients0.11.0.0 redis.clients jedis2.8.1 maven-compiler-plugin 1.8 UTF-8
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)