Flink程序的基本构建块是流和转换(请注意,Flink的DataSet API中使用的DataSet也是内部流 )。
1、实时需求 每隔5秒,统计最近10秒的窗口数据 2、开发环境部署 1. 官网建议使用 IDEA , IDEA 默认集成了 Java 和 Maven ,使用起来方便 2. 本次使用了 Flink-1.12 版本 3 、实时代码开发public class StreamWordCount { public static void main(String[] args) throws Exception { //创建执行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(1); //DataSource *** 作 DataStreamSourcesourceStream = env.socketTextStream("192.168.153.10", 6666); //通过匿名内部类的方式实现flatMap算子 final SingleOutputStreamOperator > flatMapStream = sourceStream.flatMap(new FlatMapFunction >() { @Override public void flatMap(String line, Collector > collector) throws Exception { final String[] words = line.split(" "); for (String word : words) { collector.collect(new Tuple2<>(word, 1)); } } }); //keyBy分组 *** 作 final KeyedStream , String> keyedStream = flatMapStream.keyBy(value -> value.f0); //每隔5秒,统计最近10秒的窗口数据 WindowedStream , String, TimeWindow> window = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))); //sum求和 *** 作 final SingleOutputStreamOperator > result = window.sum(1); //输出结果 result.print(); //执行程序 env.execute("StreamWordCount"); } }
实时代码开发需连接集群,具体集群搭建方式参考Flink集群部署
4、离线需求 对文件中的单词内容进行统计计数 5、离线代码开发class BatchWordCount { public static void main(String[] args) throws Exception { //创建执行环境 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //创建数据源 final DataSourcesource = env.fromElements("Flink flink sqoop hadoop", "flume hadoop MapReduce flink"); //Transformation *** 作 final FlatMapOperator > flatMap = source.flatMap(new FlatMapClass()); final AggregateOperator > result = flatMap.groupBy(0).sum(1); //输出结果 result.print(); } private static class FlatMapClass implements FlatMapFunction > { @Override public void flatMap(String line, Collector > collector) throws Exception { final String[] words = line.split(" "); for (String word : words) { collector.collect(new Tuple2 (word, 1)); } } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)