Flink Windows本地 wordcount 批、流、sql 案例

Flink Windows本地 wordcount 批、流、sql 案例,第1张

Flink Windows本地 wordcount 批、流、sql 案例

文章目录
  • 创建工程
      • 直接用maven 创建工程
      • flink 提供的创建工程的方式
      • 添加依赖
  • DataSet wordcount
  • DataStream wordCount
  • flinkSql wordcount

创建工程 直接用maven 创建工程
mvn archetype:generate  
     -DarchetypeGroupId=org.apache.flink 
     -DarchetypeArtifactId=flink-quickstart-java 
     -DarchetypeVersion=1.10.0

通过maven工程的三要素,GroupId ArtifactId Version 可以通过maven 命令的方式创建工程。

flink 提供的创建工程的方式
curl https://flink.apache.org/q/quickstart.sh  | bash -s 1.10.0

其中 1.10.0 指的是flink 的版本号。

添加依赖
		
		
		
			org.apache.flink
			flink-java
			1.10.0

		
		
			org.apache.flink
			flink-streaming-java_2.11
			1.10.0

		
DataSet wordcount

package org.myorg.quickstart;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.util.Locale;


public class BatchJob {

	public static void main(String[] args) throws Exception {
		// set up the batch execution environment
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

		DataSet text = env.fromElements(
				"Flink Spark Storm",
				"Flink Flink Flink",
				"Spark Spark Spark",
				"Kafka Kafka Kafka",
				"Kafka Kafka Kafka"
		);

		// 通过Flink 内置的转换函数进行计算
		DataSet> counts = text.flatMap(new LineSplitter())
				.groupBy(0)
				.sum(1);
		counts.printToErr();
	}

	public static final class LineSplitter implements FlatMapFunction> {

		@Override
		public void flatMap(String value, Collector> out) throws Exception {
			//将文本分割
			String[] tokens = value.toLowerCase().split("\W+");
			for (String token : tokens) {
				if (token.length() > 0 ) {
					out.collect(new Tuple2(token,1));
				}
			}
		}
	}

}

DataStream wordCount

package org.myorg.quickstart;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class StreamingJob {

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //监听端口
        DataStream text = env.socketTextStream("127.0.0.1", 9000, "n");

        // 将接收的数据进行拆分,分组窗口计算聚合再输出
        DataStream windowCounts = text.flatMap(new FlatMapFunction() {
                    @Override
                    public void flatMap(String value, Collector out) throws Exception {
                        for (String word : value.split("\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                }).keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction() {
                    @Override
                    public WordWithCount reduce(WordWithCount value1, WordWithCount value2) throws Exception {
                        return new WordWithCount(value1.word, value1.count + value2.count);
                    }
                });
        windowCounts.print().setParallelism(1);
        env.execute("Flink Streaming Java API Skeleton");
    }

    public static class WordWithCount {
        public String word;
        public Long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, Long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + ":" + count;
        }
    }

}
flinkSql wordcount
package org.myorg.quickstart;


import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

import java.util.ArrayList;


public class FlinkSql {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(fbEnv);
        String words = "hello flink hello balckjoker";

        String[] split = words.split("\W+");
        ArrayList list = new ArrayList();

        for (String word : split) {
            WordWithCount  wordWithCount = new WordWithCount(word,1L);
            list.add(wordWithCount);
        }
        DataSet inpput = fbEnv.fromCollection(list);

        Table table = batchTableEnvironment.fromDataSet(inpput);
        table.printSchema();

        // 注册一个表
        batchTableEnvironment.createTemporaryView("wordCount",table);
        Table table1 = batchTableEnvironment.sqlQuery("select word as word, sum(frequency) as frequency from wordCount group by word");

        DataSet ds3 = batchTableEnvironment.toDataSet(table1,WordWithCount.class);
        ds3.printToErr();
                
    }

    public static class WordWithCount {
        public String word;
        public Long frequency;

        public WordWithCount() {
        }

        public WordWithCount(String word, Long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return word + ":" + frequency;
        }
    }
}

运行结果

root
 |-- frequency: BIGINT
 |-- word: STRING

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/D:/Dev/repository/org/apache/flink/flink-core/1.12.0/flink-core-1.12.0.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
flink:1
balckjoker:1
hello:2

Process finished with exit code 0

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5575078.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存