flink自定义函数

flink自定义函数,第1张

flink自定义函数 前言

在很多情况下,尽管flink提供了丰富的转换算子API可供开发者对数据进行各自处理,比如 map(),filter()等,但在实际使用的时候仍然不能满足所有的场景,这时候,就需要开发人员基于常用的转换算子的基础上,做一些自定义函数的处理

1、来看一个常用的 *** 作

原始待读取的文件

核心代码

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FilterFunc {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从环境的集合中获取数据
        String path = "E:\code-self\flink_study\src\main\resources\hello.txt";
        DataStreamSource dataStream = env.readTextFile(path);
        SingleOutputStreamOperator flinkFilter = dataStream.filter(new FilterFunction() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.contains("hello");
            }
        });
        flinkFilter.print("flinkFilter").setParallelism(1);
        env.execute();
    }

}

这是前面的博文中我们谈到的一种比较常用的写法,即需要对输入数据流过滤时,只需要重新匿名内部类的某某function即可,比如这里重新里面的FilterFunction

本例提取出文本内容中包含 hello的数据,运行上面的程序,观察控制台输出结果

 

简化写法,在Java8之后,针对类似的API可以使用lambda表达式的写法简化编码,flink也是如此,于是上面的代码可以按照如下方式编写

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class LambdaFunction {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从环境的集合中获取数据
        String path = "E:\code-self\flink_study\src\main\resources\hello.txt";
        DataStreamSource dataStream = env.readTextFile(path);
        SingleOutputStreamOperator flinkFilter = dataStream.filter(line -> line.contains("hello"));
        flinkFilter.print("flinkFilter").setParallelism(1);
        env.execute();
    }

}

这样写看起来更简洁了,效果类似,

 

 2、自定义filter

为什么要自定义呢?比如当我们对读取的数据需要做进一步的处理时,或者需要过滤的数据内容作为参数传入的时候,显然直接使用filter就不是很方便了,这种情况下,就需要自定义函数了,具体来说,只需要在外部定义一个类,并实现FilterFunction即可,如下面的案例

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class UDF_Filter {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从环境的集合中获取数据
        String path = "E:\code-self\flink_study\src\main\resources\hello.txt";
        DataStreamSource dataStream = env.readTextFile(path);
        SingleOutputStreamOperator flinkFilter = dataStream.filter(new FlinkFilter("hello"));
        flinkFilter.print("flinkFilter").setParallelism(1);
        env.execute();
    }

    public static class FlinkFilter implements FilterFunction {

        private String keyWord;

        public FlinkFilter(String keyWord) {
            this.keyWord = keyWord;
        }

        @Override
        public boolean filter(String value) throws Exception {
            return value.contains(keyWord);
        }
    }

}

在该方法中,我们需要过滤的hello字符串是作为参数传进去的,这样处理起来就更灵活了,最终效果类似

 2、富函数(Rich Functions)

“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都 有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一 些生命周期方法,所以可以实现更复杂的功能

RichMapFunction RichFilterFunction RichFlatMapFunction ...... Rich Function 有一个生命周期的概念,典型的生命周期方法有:

     open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter
    被调用之前 open()会被调用。close()方法是生命周期中的最后一个调用的方法,做一些清理工作。getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函
    数执行的并行度,任务的名字,以及 state 状态

下面来看一个具体的案例

import com.congge.source.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class RichFunction {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从环境的集合中获取数据
        String path = "E:\code-self\flink_study\src\main\resources\sensor.txt";
        DataStreamSource inputStream = env.readTextFile(path);
        SingleOutputStreamOperator dataStream = inputStream.map(new MapFunction() {
            @Override
            public SensorReading map(String value) throws Exception {
                String[] fields = value.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            }
        });

        DataStream> map = dataStream.map(new MyMapFunction());
        map.print("richFunc").setParallelism(1);
        env.execute();
    }

    public static class MyMapFunction extends RichMapFunction> {

        @Override
        public Tuple2 map(SensorReading value) throws Exception {
            return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(),
                    value.getId());
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("my map open");
            // 以下可以做一些初始化工作,例如建立一个和 HDFS 的连接
        }

        @Override
        public void close() throws Exception {
            System.out.println("my map close");
            // 以下做一些清理工作,例如断开和 HDFS 的连接
        }
    }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705349.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存