关于source数据源,在flink 官网上介绍了很多对接方式、例如socket、elements、collect等常见的source,可以见下面链接:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/connectors/。在这里要说的是自定义source,通过addsource类接入。
public class sourceMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource studentDataStreamSource = env.addSource(new SourceFunction()); env.execute(); }
如上实例,我们通过addsource实现sourceFunction方法,sourceFunction在这个类里我们可以定义接入数据的方式。如此使代码更加灵活。除了sorceFunction,这个基本自定义source,还有其他三种source:RichSourceFunction、ParallelSourceFunction、RichParallelSourceFunction。下面是各个source的层级结构,接下来会分别对其进行探讨。
上面是SourceFunction源码,我们需要实现上面的接口类,并分别重载run和cancle方法。话不多说直接上demo。
TestSourceFun.java
import com.lxf.model.Student; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; public class TestSourceFun implements SourceFunction{ boolean flag = true; @Override public void run(SourceContext sourceContext) throws Exception { String[] arrs = {"zhangsna","lsi","wangwu"}; Random random = new Random(); while(flag){ for (int i = 0; i < 10; i++) { Student student = new Student(); student.setId(123); student.setName(arrs[random.nextInt(arrs.length)]); student.setAge(random.nextInt()+100); sourceContext.collect(student); } Thread.sleep(5000); } } @Override public void cancel() { flag = false; } }
Student.java
public class Student { private Integer id; private String name; private Integer age; public Integer getId() { return id; } public String getName() { return name; } public Integer getAge() { return age; } public void setId(Integer id) { this.id = id; } public void setName(String name) { this.name = name; } public void setAge(Integer age) { this.age = age; } public Student() { } public Student(Integer id, String name, Integer age) { this.id = id; this.name = name; this.age = age; } @Override public String toString() { return "Student{" + "id=" + id + ", name='" + name + ''' + ", age=" + age + '}'; } }
sourceMain.java
import com.lxf.model.Student; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class sourceMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); test01(env); env.execute(); } public static void test01(StreamExecutionEnvironment env){ DataStreamSourcestudentDataStreamSource = env.addSource(new TestSourceFun());// studentDataStreamSource.print(); System.out.println(studentDataStreamSource.getParallelism());//打印当前SourceFunction的并行度 }
输出结果:从结果看出该类只有一个并行度,但有四个线程打印数据,因为我的电脑是4core的。线程默认从0开始打印,但因为print函数做了加一处理,所以在控制台从1开始。
RichSourceFunction的功能比SourceFunction功能强大,可以实现生命周期方法open,初始化 *** 作,每个task会打开一个open。在这里我们自定对接mysql数据,并将读取的数据打印到控制台。(连接mysql的工具类没有上传,mysql表自行建立)
TestRichSourceFun.java
import com.lxf.model.Student; import com.lxf.utils.MysqlUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; public class TestRichSourceFun extends RichSourceFunction{ Connection conn; PreparedStatement prep; @Override public void open(Configuration parameters) throws Exception { conn = MysqlUtil.getConnection(); prep = conn.prepareStatement("select * from student"); System.out.println("----open----");//标识 } @Override public void close() throws Exception { MysqlUtil.close(conn,prep); } @Override public void run(SourceContext sourceContext) throws Exception { ResultSet rs = prep.executeQuery(); while(rs.next()){ int id = rs.getInt("id"); String name = rs.getString("name"); int age = rs.getInt("age"); sourceContext.collect(new Student(id,name,age)); } } @Override public void cancel() { } }
sourceMain.java
public class sourceMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); test02(env); env.execute(); } public static void test02(StreamExecutionEnvironment env){ DataStreamSourcestudentDataStreamSource = env.addSource(new TestRichSourceFun()); studentDataStreamSource.print(); }
打印结果:从结果可以看出该类只有一个并行度,所以只有一个task运行。
ParallelSourceFunction比SourceFunction多增加的一个功能就是可以设置并行度。
TestParallelSounrceFun.java
import com.lxf.model.Student; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import java.util.Random; public class TestParallelSounrceFun implements ParallelSourceFunction{ boolean flag = true; @Override public void run(SourceContext sourceContext) throws Exception { String[] arrs = {"zhangsna","lisi","wangwu"}; Random random = new Random(); while(flag){ for (int i = 0; i < 10; i++) { Student student = new Student(); student.setId(123); student.setName(arrs[random.nextInt(arrs.length)]); student.setAge(random.nextInt()+100); sourceContext.collect(student); } Thread.sleep(5000); } } @Override public void cancel() { flag = false; } }
TestParallelSounrceFun.java
import com.lxf.model.Student; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class sourceMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); test03(env); env.execute(); } public static void test03(StreamExecutionEnvironment env){ DataStreamSourcestudentDataStreamSource = env.addSource(new TestParallelSounrceFun()); studentDataStreamSource.setParallelism(4); studentDataStreamSource.print(); System.out.println(studentDataStreamSource.getParallelism());//打印当前SourceFunction的并行度 }
打印结果:可以看到的task变为4,所以source增加4个并行度。
抽象类RichParallelSourceFunction实现了ParallelSourceFunction接口,而且还继承了AbstractRichFunction抽象类,所以它既可以实现多并行度还可以实现生命周期方法,功能比较强大。
TestRichParallelSourceFun.java
import com.lxf.model.Student; import com.lxf.utils.MysqlUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; public class TestRichParallelSourceFun extends RichParallelSourceFunction{ Connection conn; PreparedStatement prep; @Override public void open(Configuration parameters) throws Exception { conn = MysqlUtil.getConnection(); prep = conn.prepareStatement("select * from student"); System.out.println("----open----");//标识 } @Override public void close() throws Exception { MysqlUtil.close(conn,prep); } @Override public void run(SourceContext sourceContext) throws Exception { ResultSet rs = prep.executeQuery(); while(rs.next()){ int id = rs.getInt("id"); String name = rs.getString("name"); int age = rs.getInt("age"); sourceContext.collect(new Student(id,name,age)); } } @Override public void cancel() {} }
sourceMain.java
import com.lxf.model.Student; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class sourceMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); test04(env); env.execute(); } public static void test04(StreamExecutionEnvironment env){ DataStreamSourcestudentDataStreamSource = env.addSource(new TestRichParallelSourceFun()); studentDataStreamSource.setParallelism(3);//想当于打印3个task,通过4个线程输出。 studentDataStreamSource.print(); System.out.println(studentDataStreamSource.getParallelism());//打印当前SourceFunction的并行度 }
打印结果:可以看到打印了三个task,每条数据被输出了三次。
从上面结果展示我们可以设置source并行度,也是可以设置全局并行度的。通过env参数进行设置。如下:
public class sourceMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); test01(env); env.execute(); }
我们再次打印test01的结果,但我们把全局并行度设置为2
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)