Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确知道应用程序所处理的数据类型。Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
Flink支持Java和Scala中所有常见数据类型。使用最广泛的类型有以下几种。
1.1 基础数据类型
Flink支持所有的Java和Scala基础数据类型,Int, Double, Long, String, …
DataStream
1.2 Java和scala元祖(Tuples)
java不像Scala天生支持元组Tuple类型,java的元组类型由Flink的包提供,默认提供Tuple0~Tuple25
DataStream> personStream = env.fromElements( new Tuple2("Adam", 17), new Tuple2("Sarah", 23) ); personStream.filter(p -> p.f1 > 18);
1.3 Scala 样例类(case Classes)
case class Person(name:String,age:Int) val numbers: DataStream[(String,Integer)] = env.fromElements( Person("张三",12), Person("李四",23) )
1.4 Java简单对象(Bean) (必须提供无参构造函数)
成员变量要求都是public(或者private但是提供get、set方法)
public class Person{ public String name; public int age; public Person() {} public Person( String name , int age) { this.name = name; this.age = age; } } DataStream Pe rson > persons = env.fromElements( new Person (" Alex", 42), new Person (" Wendy",23) );
1.5 其他(ArraysListsMapsEnums等等)
Flink对Java和Scala中的一些特殊目的的类型也都是支持的,比如Java的ArrayList,HashMap,Enum等等。
2.实现UDF函数
2.1 函数类(Function Classes)
Flink暴露了所有UDF函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。
下面例子实现了FilterFunction接口:
DataStreamflinkTweets = tweets.filter(new FlinkFilter()); public static class FlinkFilter implements FilterFunction { @Override public boolean filter(String value) throws Exception { return value.contains("flink"); } }
还可以将函数实现成匿名类
DataStreamflinkTweets = tweets.filter( new FilterFunction () { @Override public boolean filter(String value) throws Exception { return value.contains("flink"); } } );
我们filter的字符串"flink"还可以当作参数传进去。
DataStreamtweets = env.readTextFile("INPUT_FILE "); DataStream flinkTweets = tweets.filter(new KeyWordFilter("flink")); public static class KeyWordFilter implements FilterFunction { private String keyWord; KeyWordFilter(String keyWord) { this.keyWord = keyWord; } @Override public boolean filter(String value) throws Exception { return value.contains(this.keyWord); } }
2.2 匿名函数(Lambda Functions)
DataStreamtweets = env.readTextFile("INPUT_FILE"); DataStream flinkTweets = tweets.filter( tweet -> tweet.contains("flink") );
2.3 富函数(Rich Functions)
富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。
它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
RichMapFunction
RichFlatMapFunction
RichFilterFunction
........
Rich Function有一个生命周期的概念。典型的生命周期方法有:
open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。
close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
public static class MyMapFunction extends RichMapFunction> { @Override public Tuple2 map(SensorReading value) throws Exception { return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), value.getId()); } @Override public void open(Configuration parameters) throws Exception { System.out.println("my map open"); // 以下可以做一些初始化工作,例如建立一个和HDFS的连接 } @Override public void close() throws Exception { System.out.println("my map close"); // 以下做一些清理工作,例如断开和HDFS的连接 } }
测试代码:
package apitest.transform; import apitest.beans.SensorReading; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TransformTest5_RichFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); DataStreaminputStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt"); // 转换成SensorReading类型 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); DataStream > resultStream = dataStream.map( new MyMapper() ); resultStream.print(); env.execute(); } // 传统的Function不能获取上下文信息,只能处理当前数据,不能和其他数据交互 public static class MyMapper0 implements MapFunction > { @Override public Tuple2 map(SensorReading value) throws Exception { return new Tuple2<>(value.getId(), value.getId().length()); } } // 实现自定义富函数类(RichMapFunction是一个抽象类) public static class MyMapper extends RichMapFunction > { @Override public Tuple2 map(SensorReading value) throws Exception { // RichFunction可以获取State状态 // getRuntimeContext().getState(); return new Tuple2<>(value.getId(), getRuntimeContext().getIndexOfThisSubtask()); } @Override public void open(Configuration parameters) throws Exception { // 初始化工作,一般是定义状态,或者建立数据库连接 System.out.println("open"); } @Override public void close() throws Exception { // 一般是关闭连接和清空状态的收尾 *** 作 System.out.println("close"); } } }
3.数据重新分区 *** 作
重分区 *** 作,在DataStream类中可以看到很多Partitioner字眼的类。其中partitionCustom(...)方法用于自定义重分区。
package apitest.transform; import apitest.beans.SensorReading; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TransformTest6_Partition { public static void main(String[] args) throws Exception{ // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度 = 4 env.setParallelism(4); // 从文件读取数据 DataStreaminputStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt"); // 转换成SensorReading类型 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // SingleOutputStreamOperator多并行度默认就rebalance,轮询方式分配 dataStream.print("input"); // 1. shuffle (并非批处理中的获取一批后才打乱,这里每次获取到直接打乱且分区) DataStream shuffleStream = inputStream.shuffle(); shuffleStream.print("shuffle"); // 2. keyBy (Hash,然后取模) dataStream.keyBy(SensorReading::getId).print("keyBy"); // 3. global (直接发送给第一个分区,少数特殊情况才用) dataStream.global().print("global"); env.execute(); } }
4.Sink
Flink之流处理API之Sink
Flink没有类似于spark中foreach方法,让用户进行迭代的 *** 作。虽有对外的输出 *** 作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出 *** 作。
stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。
Kafka
pom依赖
4.0.0 org.example Flink_Tutorial1.0-SNAPSHOT 8 8 1.12.1 2.12 org.apache.flink flink-java${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version}${flink.version} org.apache.flink flink-clients_${scala.binary.version}${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version}${flink.version}
编写java代码
package apitest.sink; import apitest.beans.SensorReading; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.util.Properties; public class SinkTest1_Kafka { public static void main(String[] args) throws Exception{ // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 并行度设置为1 env.setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); // 从Kafka中读取数据 DataStreaminputStream = env.addSource( new FlinkKafkaConsumer ("sensor", new SimpleStringSchema(), properties)); // 序列化从Kafka中读取的数据 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])).toString(); }); // 将数据写入Kafka dataStream.addSink( new FlinkKafkaProducer ("localhost:9092", "sinktest", new SimpleStringSchema())); env.execute(); } }
Redis
这里将Redis当作sink的输出对象。
pom依赖
这个可谓相当老的依赖了,2017年的。
org.apache.bahir flink-connector-redis_2.111.0
编写java代码
package apitest.sink; import apitest.beans.SensorReading; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; public class SinkTest2_Redis { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件读取数据 DataStreaminputStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt"); // 转换成SensorReading类型 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 定义jedis连接配置(我这里连接的是docker的redis) FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder() .setHost("localhost") .setPort(6379) .setPassword("123456") .setDatabase(0) .build(); dataStream.addSink(new RedisSink<>(config, new MyRedisMapper())); env.execute(); } // 自定义RedisMapper public static class MyRedisMapper implements RedisMapper { // 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp"); } @Override public String getKeyFromData(SensorReading data) { return data.getId(); } @Override public String getValueFromData(SensorReading data) { return data.getTemperature().toString(); } } }
Elasticsearch
pom依赖
org.apache.flink flink-connector-elasticsearch7_2.121.12.1
编写java代码
package apitest.sink; import apitest.beans.SensorReading; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; import org.apache.http.HttpHost; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Requests; import java.util.ArrayList; import java.util.HashMap; import java.util.List; public class SinkTest3_Es { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件读取数据 DataStreaminputStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt"); // 转换成SensorReading类型 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 定义es的连接配置 List httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("localhost", 9200)); dataStream.addSink( new ElasticsearchSink.Builder (httpHosts, new MyEsSinkFunction()).build()); env.execute(); } // 实现自定义的ES写入 *** 作 public static class MyEsSinkFunction implements ElasticsearchSinkFunction { @Override public void process(SensorReading element, RuntimeContext ctx, RequestIndexer indexer) { // 定义写入的数据source HashMap dataSource = new HashMap<>(); dataSource.put("id", element.getId()); dataSource.put("temp", element.getTemperature().toString()); dataSource.put("ts", element.getTimestamp().toString()); // 创建请求,作为向es发起的写入命令(ES7统一type就是_doc,不再允许指定type) IndexRequest indexRequest = Requests.indexRequest() .index("sensor") .source(dataSource); // 用index发送请求 indexer.add(indexRequest); } } }
JDBC 自定义sink
pom依赖
mysql mysql-connector-java8.0.19
新建数据库 CREATE DATAbase `flink_test` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
新建schema
CREATE TABLE `sensor_temp` ( `id` varchar(32) NOT NULL, `temp` double NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
编写java代码
package apitest.sink; import apitest.beans.SensorReading; import apitest.source.SourceTest4_UDF; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class SinkTest4_Jdbc { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度 = 1 env.setParallelism(1); // 从文件读取数据 // DataStreaminputStream = env.readTextFile("/tmp/Flink_Tutorial/src/main/resources/sensor.txt"); // // // 转换成SensorReading类型 // DataStream dataStream = inputStream.map(line -> { // String[] fields = line.split(","); // return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); // }); // 使用之前编写的随机变动温度的SourceFunction来生成数据 DataStream dataStream = env.addSource(new SourceTest4_UDF.MySensorSource()); dataStream.addSink(new MyJdbcSink()); env.execute(); } // 实现自定义的SinkFunction public static class MyJdbcSink extends RichSinkFunction { // 声明连接和预编译语句 Connection connection = null; PreparedStatement insertStmt = null; PreparedStatement updateStmt = null; @Override public void open(Configuration parameters) throws Exception { connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&serverTimezone=Asia/Shanghai&characterEncoding=UTF-8&useSSL=false", "root", "example"); insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)"); updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?"); } // 每来一条数据,调用连接,执行sql @Override public void invoke(SensorReading value, Context context) throws Exception { // 直接执行更新语句,如果没有更新那么就插入 updateStmt.setDouble(1, value.getTemperature()); updateStmt.setString(2, value.getId()); updateStmt.execute(); if (updateStmt.getUpdateCount() == 0) { insertStmt.setString(1, value.getId()); insertStmt.setDouble(2, value.getTemperature()); insertStmt.execute(); } } @Override public void close() throws Exception { insertStmt.close(); updateStmt.close(); connection.close(); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)