Flink的DataSource三部曲之一:直接API,java基础重点知识点

Flink的DataSource三部曲之一:直接API,java基础重点知识点,第1张

Flink的DataSource三部曲之一:直接API,java基础重点知识点

| :-- | :-- | :-- |

| 项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |

| git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |

| git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |

这个git项目中有多个文件夹,本章的应用在flinkdatasourcedemo文件夹下,如下图红框所示:

环境和版本

本次实战的环境和版本如下:

  1. JDK:1.8.0_211

  2. Flink:1.9.2

  3. Maven:3.6.0

  4. *** 作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)

  5. IDEA:2018.3.5 (Ultimate Edition)

创建工程
  1. 在控制台执行以下命令就会进入创建flink应用的交互模式,按提示输入gourpId和artifactId,就会创建一个flink应用(我输入的groupId是com.bolingcavalry,artifactId是flinkdatasourcedemo):

mvn

archetype:generate

-DarchetypeGroupId=org.apache.flink

-DarchetypeArtifactId=flink-quickstart-java

-DarchetypeVersion=1.9.2

  1. 现在maven工程已生成,用IDEA导入这个工程,如下图:

  1. 以maven的类型导入:

  1. 导入成功的样子:

  1. 项目创建成功,可以开始写代码实战了;
辅助类Splitter

实战中有个功能常用到:将字符串用空格分割,转成Tuple2类型的集合,这里将此算子做成一个公共类Splitter.java,代码如下:

package com.bolingcavalry;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.util.Collector;

import org.apache.flink.util.StringUtils;

public class Splitter implements FlatMapFunction> {

@Override

public void flatMap(String s, Collector> collector) throws Exception {

if(StringUtils.isNullOrWhitespaceonly(s)) {

System.out.println(“invalid line”);

return;

}

for(String word : s.split(" ")) {

collector.collect(new Tuple2(word, 1));

}

}

}

准备完毕,可以开始实战了,先从最简单的Socket开始。

Socket DataSource

Socket DataSource的功能是监听指定IP的指定端口,读取网络数据;

  1. 在刚才新建的工程中创建一个类Socket.java:

package com.bolingcavalry.api;

import com.bolingcavalry.Splitter;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;

public class Socket {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//监听本地9999端口,读取字符串

DataStream socketDataStream = env.socketTextStream(“localhost”, 9999);

//每五秒钟一次,将当前五秒内所有字符串以空格分割,然后统计单词数量,打印出来

socketDataStream

.flatMap(new Splitter())

.keyBy(0)

.timeWindow(Time.seconds(5))

.sum(1)

.print();

env.execute(“API DataSource demo : socket”);

}

}

从上述代码可见,StreamExecutionEnvironment.socketTextStream就可以创建Socket类型的DataSource,在控制台执行命令nc -lk 9999,即可进入交互模式,此时输出任何字符串再回车,都会将字符串传输到本机9999端口;

  1. 在IDEA上运行Socket类,启动成功后再回到刚才执行nc -lk 9999的控制台,输入一些字符串再回车,可见Socket的功能已经生效:

集合DataSource(generateSequence)
  1. 基于集合的DataSource,API如下图所示:

  1. 先试试最简单的generateSequence,创建指定范围内的数字型的DataSource:

package com.bolingcavalry.api;

import org.apache.flink.api.common.functions.FilterFunction;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class GenerateSequence {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//并行度为1

env.setParallelism(1);

//通过generateSequence得到Long类型的DataSource

DataStream dataStream = env.generateSequence(1, 10);

//做一次过滤,只保留偶数,然后打印

dataStream.filter(new FilterFunction() {

@Override

public boolean filter(Long aLong) throws Exception {

return 0L==aLong.longValue()%2L;

}

}).print();

env.execute(“API DataSource demo : collection”);

}

}

  1. 运行时会打印偶数:

集合DataSource(fromElements+fromCollection)
  1. fromElements和fromCollection就在一个类中试了吧,创建FromCollection类,里面是这两个API的用法:

package com.bolingcavalry.api;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

import java.util.List;

public class FromCollection {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//并行度为1

env.setParallelism(1);

//创建一个List,里面有两个Tuple2元素

List> list = new ArrayList<>();

list.add(new Tuple2(“aaa”, 1));

list.add(new Tuple2(“bbb”, 1));

//通过List创建DataStream

DataStream> fromCollectionDataStream = env.fromCollection(list);

//通过多个Tuple2元素创建DataStream

DataStream> fromElementDataStream = env.fromElements(

new Tuple2(“ccc”, 1),

new Tuple2(“ddd”, 1),

new Tuple2(“aaa”, 1)

);

//通过union将两个DataStream合成一个

DataStream> unionDataStream = fromCollectionDataStream.union(fromElementDataStream);

//统计每个单词的数量

unionDataStream

.keyBy(0)

.sum(1)

.print();

env.execute(“API DataSource demo : collection”);

}

}

  1. 运行结果如下:

文件DataSource
  1. 下面的ReadTextFil

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

e类会读取绝对路径的文本文件,并对内容做单词统计:

package com.bolingcavalry.api;

import com.bolingcavalry.Splitter;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ReadTextFile {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5683190.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存