- 创建工程
- 直接用maven 创建工程
- flink 提供的创建工程的方式
- 添加依赖
- DataSet wordcount
- DataStream wordCount
- flinkSql wordcount
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.0
通过maven工程的三要素,GroupId ArtifactId Version 可以通过maven 命令的方式创建工程。
flink 提供的创建工程的方式curl https://flink.apache.org/q/quickstart.sh | bash -s 1.10.0
其中 1.10.0 指的是flink 的版本号。
添加依赖DataSet wordcountorg.apache.flink flink-java1.10.0 org.apache.flink flink-streaming-java_2.111.10.0
package org.myorg.quickstart; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import java.util.Locale; public class BatchJob { public static void main(String[] args) throws Exception { // set up the batch execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSetDataStream wordCounttext = env.fromElements( "Flink Spark Storm", "Flink Flink Flink", "Spark Spark Spark", "Kafka Kafka Kafka", "Kafka Kafka Kafka" ); // 通过Flink 内置的转换函数进行计算 DataSet > counts = text.flatMap(new LineSplitter()) .groupBy(0) .sum(1); counts.printToErr(); } public static final class LineSplitter implements FlatMapFunction > { @Override public void flatMap(String value, Collector > out) throws Exception { //将文本分割 String[] tokens = value.toLowerCase().split("\W+"); for (String token : tokens) { if (token.length() > 0 ) { out.collect(new Tuple2 (token,1)); } } } } }
package org.myorg.quickstart; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class StreamingJob { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //监听端口 DataStreamflinkSql wordcounttext = env.socketTextStream("", 9000, "n"); // 将接收的数据进行拆分,分组窗口计算聚合再输出 DataStream windowCounts = text.flatMap(new FlatMapFunction () { @Override public void flatMap(String value, Collector out) throws Exception { for (String word : value.split("\s")) { out.collect(new WordWithCount(word, 1L)); } } }).keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .reduce(new ReduceFunction () { @Override public WordWithCount reduce(WordWithCount value1, WordWithCount value2) throws Exception { return new WordWithCount(value1.word, value1.count + value2.count); } }); windowCounts.print().setParallelism(1); env.execute("Flink Streaming Java API Skeleton"); } public static class WordWithCount { public String word; public Long count; public WordWithCount() { } public WordWithCount(String word, Long count) { this.word = word; this.count = count; } @Override public String toString() { return word + ":" + count; } } }
package org.myorg.quickstart; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import java.util.ArrayList; public class FlinkSql { public static void main(String[] args) throws Exception { ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(fbEnv); String words = "hello flink hello balckjoker"; String[] split = words.split("\W+"); ArrayListlist = new ArrayList (); for (String word : split) { WordWithCount wordWithCount = new WordWithCount(word,1L); list.add(wordWithCount); } DataSet inpput = fbEnv.fromCollection(list); Table table = batchTableEnvironment.fromDataSet(inpput); table.printSchema(); // 注册一个表 batchTableEnvironment.createTemporaryView("wordCount",table); Table table1 = batchTableEnvironment.sqlQuery("select word as word, sum(frequency) as frequency from wordCount group by word"); DataSet ds3 = batchTableEnvironment.toDataSet(table1,WordWithCount.class); ds3.printToErr(); } public static class WordWithCount { public String word; public Long frequency; public WordWithCount() { } public WordWithCount(String word, Long frequency) { this.word = word; this.frequency = frequency; } @Override public String toString() { return word + ":" + frequency; } } }
root |-- frequency: BIGINT |-- word: STRING WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/D:/Dev/repository/org/apache/flink/flink-core/1.12.0/flink-core-1.12.0.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release flink:1 balckjoker:1 hello:2 Process finished with exit code 0