在很多情况下,尽管flink提供了丰富的转换算子API可供开发者对数据进行各自处理,比如 map(),filter()等,但在实际使用的时候仍然不能满足所有的场景,这时候,就需要开发人员基于常用的转换算子的基础上,做一些自定义函数的处理
1、来看一个常用的 *** 作
原始待读取的文件
核心代码
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class FilterFunc { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //从环境的集合中获取数据 String path = "E:\code-self\flink_study\src\main\resources\hello.txt"; DataStreamSourcedataStream = env.readTextFile(path); SingleOutputStreamOperator flinkFilter = dataStream.filter(new FilterFunction () { @Override public boolean filter(String s) throws Exception { return s.contains("hello"); } }); flinkFilter.print("flinkFilter").setParallelism(1); env.execute(); } }
这是前面的博文中我们谈到的一种比较常用的写法,即需要对输入数据流过滤时,只需要重新匿名内部类的某某function即可,比如这里重新里面的FilterFunction
本例提取出文本内容中包含 hello的数据,运行上面的程序,观察控制台输出结果
简化写法,在Java8之后,针对类似的API可以使用lambda表达式的写法简化编码,flink也是如此,于是上面的代码可以按照如下方式编写
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class LambdaFunction { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //从环境的集合中获取数据 String path = "E:\code-self\flink_study\src\main\resources\hello.txt"; DataStreamSourcedataStream = env.readTextFile(path); SingleOutputStreamOperator flinkFilter = dataStream.filter(line -> line.contains("hello")); flinkFilter.print("flinkFilter").setParallelism(1); env.execute(); } }
这样写看起来更简洁了,效果类似,
2、自定义filter
为什么要自定义呢?比如当我们对读取的数据需要做进一步的处理时,或者需要过滤的数据内容作为参数传入的时候,显然直接使用filter就不是很方便了,这种情况下,就需要自定义函数了,具体来说,只需要在外部定义一个类,并实现FilterFunction即可,如下面的案例
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class UDF_Filter { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //从环境的集合中获取数据 String path = "E:\code-self\flink_study\src\main\resources\hello.txt"; DataStreamSourcedataStream = env.readTextFile(path); SingleOutputStreamOperator flinkFilter = dataStream.filter(new FlinkFilter("hello")); flinkFilter.print("flinkFilter").setParallelism(1); env.execute(); } public static class FlinkFilter implements FilterFunction { private String keyWord; public FlinkFilter(String keyWord) { this.keyWord = keyWord; } @Override public boolean filter(String value) throws Exception { return value.contains(keyWord); } } }
在该方法中,我们需要过滤的hello字符串是作为参数传进去的,这样处理起来就更灵活了,最终效果类似
2、富函数(Rich Functions)
“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都 有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一 些生命周期方法,所以可以实现更复杂的功能RichMapFunction RichFilterFunction RichFlatMapFunction ...... Rich Function 有一个生命周期的概念,典型的生命周期方法有:
- open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter
被调用之前 open()会被调用。close()方法是生命周期中的最后一个调用的方法,做一些清理工作。getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函
数执行的并行度,任务的名字,以及 state 状态
下面来看一个具体的案例
import com.congge.source.SensorReading; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class RichFunction { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //从环境的集合中获取数据 String path = "E:\code-self\flink_study\src\main\resources\sensor.txt"; DataStreamSourceinputStream = env.readTextFile(path); SingleOutputStreamOperator dataStream = inputStream.map(new MapFunction () { @Override public SensorReading map(String value) throws Exception { String[] fields = value.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); } }); DataStream > map = dataStream.map(new MyMapFunction()); map.print("richFunc").setParallelism(1); env.execute(); } public static class MyMapFunction extends RichMapFunction > { @Override public Tuple2 map(SensorReading value) throws Exception { return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), value.getId()); } @Override public void open(Configuration parameters) throws Exception { System.out.println("my map open"); // 以下可以做一些初始化工作,例如建立一个和 HDFS 的连接 } @Override public void close() throws Exception { System.out.println("my map close"); // 以下做一些清理工作,例如断开和 HDFS 的连接 } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)