DataStream API 简介 & Flink 出租车实验

DataStream API 简介 & Flink 出租车实验,第1张

DataStream API 简介 & Flink 出租车实验

该练习的重点是充分全面地了解 DataStream API,以便于编写流式应用入门。

什么能被转化成流?

Flink 的 Java 和 Scala DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有

基本类型,即 String、Long、Integer、Boolean、Array
复合类型:Tuples、POJOs 和 Scala case classes
而且 Flink 会交给 Kryo 序列化其他类型。也可以将其他序列化器和 Flink 一起使用。特别是有良好支持的 Avro。

Java tuples 和 POJOs

Flink 的原生序列化器可以高效地 *** 作 tuples 和 POJOs

Tuples

对于 Java,Flink 自带有 Tuple0 到 Tuple25 类型。后面的数字表示可容纳的单个元祖元素的数据量。

Tuple1 tuple1 =  Tuple1.of( 1 );
Integer t1_f0 = tuple1.f0;

Tuple2 tuple2 =  Tuple2.of( 1, "Hello" );
Integer t2_f0 = tuple2.f0;
String t2_f1 = tuple2.f1;


Tuple3 tuple3 =  Tuple3.of( 1, "Hello", 1L);
Integer t3_f0 = tuple3.f0;
String t3_f1 = tuple3.f1;
Long t3_f2 = tuple3.f2;

POJOs

如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):

该类是公有且独立的(没有非静态内部类)
该类有公有的无参构造函数
类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。
示例:

public class Person {
    public String name;  
    public Integer age;  
    public Person() {};  
    public Person(String name, Integer age) {  
        . . .
    };  
}  

Person person = new Person("Fred Flintstone", 35);

Flink 的序列化器支持的 POJO 类型数据结构升级。

Scala tuples 和 case classes

如果你了解 Scala,那一定知道 tuple 和 case class。

转换成流 Stream 执行环境

每个 Flink 应用都需要有执行环境,在该示例中为 env。流式应用需要用到 StreamExecutionEnvironment。

DataStream API 将你的应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。
注意,如果没有调用 execute(),应用就不会运行。


此分布式运行时取决于你的应用是否是可序列化的。它还要求所有依赖对集群中的每个节点均可用。

基本的 stream source
// 指定执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


// env.fromCollection(..)
List people = new ArrayList();

people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));

DataStream flintstones = env.fromCollection(people);

//Tuples
Tuple2 person = Tuple2.of("Fred", 35);
// zero based index!
String name = person.f0;
Integer age = person.f1;

// env.fromCollection(..)
List people = new ArrayList<>();
people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));

// DataStream flintstones = env.fromCollection(people);

// env.fromElements
DataStream flintstones = env.fromElements(
	new Person("Fred", 35),
	new Person("Wilma", 35),
	new Person("Pebbles", 2));
 
// env.socketTextStream
DataStreamSource streamSource = env.socketTextStream("localhost", 9000, "n");

// env.readTextFile
DataStream lines = env.readTextFile("file:///path");

Flink 出租车实验 ◆ 行程清理

清理纽约市以外的行程。
项目地址:https://gitee.com/hoas/flink-training/tree/release-1.13/ride-cleansing

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5716298.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存