- flink分为流式和批式,流式因为是来一个数据处理一个,所以会保存之前处理的中间结果。但是有可能我并不需要这些结果。所以我完全可以用批式来处理海量数据嘛
学习链接
条件- class必须是独立的(不能是内部类),public的
- 必须要有无参数的构造方法。可以再重载一个带参数的构造方法
- 里面必须要有字段,可以是private,但是必须要有public的getter和setter方法
package flink; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String filePath = "/home/10313588@zte.intra/Desktop/pycharmProject/Drain/logflash/base/rawdata/vnpm_0702/1120/cf-vnpm.log.2"; DataStreaminputStream = env.readTextFile(filePath); // 对数据及进行处理,按空格分词展开,转换成(word,1)的二元组Tuple2 进行词频统计 DataStream > resultStream = inputStream.flatMap(new MyFlatMapper()) .keyBy(0) // keyBy转换,按key重分区 .sum(1);//(在当前分区内)按照第二个位置来进行加和 resultStream.print();// 如果只是在这里就执行的话是没有结果的。在流式架构中,是先把任务定义好,然后去等数据,来一个处理一个所以这里也只是定义了任务print,但是还没有执行 //这里才是执行任务,启动任务。来一个数据处理一次 env.execute(); } public static class MyFlatMapper implements FlatMapFunction >{ @Override public void flatMap(String value, Collector > out) throws Exception { //分词 String[] words = value.split("\s+"); // 遍历数组,包装成tuple2 for(String word:words){ out.collect(new Tuple2<>(word,1)); } } } }
输出结果:
。。。
4> (rc-sts-dep-in-stp-sta-up-roll-suc-ms-rc-1-zws49,2)
4> ([114],9219)
4> (cluster(22.20.0.20:2601),10800)
4> (pod,12360)
4> ([114],9220)
4> (cluster(22.20.0.20:2601),10801)
4> (pod,12361)
4> (rc-sts-dep-in-stp-sta-up-roll-suc-ms-rc-1-zws49,3)
4> ([114],9221)
4> (cluster(22.20.0.20:2601),10802)
4> (pod,12362)
4> ([114],9222)
4> (rc-sts-dep-in-stp-sta-up-roll-suc-ms-rc-1-zws49,4)
4> (01:44:32.908,1)
4> (pod,12363)
4> (pod.,2638)
4> (db_pod:,1059)
4> (‘db_pod_network’:,1059)
4> (‘if_reg’:,1059)
4> (running):,1059)
4> (None,1738)
4> ([625],1059)
。。。
4> 代表的是并行子任务分区号、进程号(并行的进程号)
课程还讲了一个流式数据源测试,即真正的流式肯定不是从文件(有界数据)读的,而是源源不断的那种,所以大数据组件中,能实现这样的情况就是kafka消息传输队列。课程中演示的是用linux的nc来实现的流式。【后续可以学一下kafka】
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)