使用 fromCollection() 方法将数据放入Seq序列中作为Flink的数据源
object Collections { def main(args: Array[String]): Unit = { //声明 flink 的运行环境 var env = StreamExecutionEnvironment.getExecutionEnvironment var ds = env.fromCollection(Seq( (1,"zhangsan","male","20"), (2,"lisi","female","30") )) ds.print() env.execute() } }
也可以使用样例类将数据封装成对象序列传入 fromCollection() 方法作为Flink的数据源
case class Worker(id:Int,name:String,gender:String,age:Int) object Collections { def main(args: Array[String]): Unit = { //声明 flink 的运行环境 var env = StreamExecutionEnvironment.getExecutionEnvironment var ds = env.fromCollection(Seq( Worker(1,"张三","男",20), Worker(2,"李四","女",21), Worker(3,"王五","男",22), Worker(4,"赵六","女",23) )) ds.print() env.execute() } }
还可以通过调用 fromElements() 方法将数据直接输入
2.从文件中读取数据readTextFile()
通过 readTextFile() 方法可以将本地或hdfs上的文件作为数据源读入Flink
3.从 Kafka 中读取数据在实现 Flink 读取 Kafka 的数据时,需要导入相关依赖
org.apache.flink flink-shaded-hadoop-2-uber2.7.5-9.0
通过 addSource() 方法加载kafka中的数据
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //设置kafka的相关配置 val prop = new Properties() prop.setProperty("bootstrap.servers","192.168.100.155:9092") prop.setProperty("group.id","user1") val ds = env.addSource(new FlinkKafkaConsumer011[String]("flink",new SimpleStringSchema(),prop)) ds.print() env.execute() }
可通过调节kafka的游标相关参数,读取已存在kafka队列中的数据
4.自定义数据源以Flink读取MySQL的数据为例,通过 addSource() 方法,将实现 SourceFunction 的对象作为参数传入。代码如下:
object UDFSource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val ds = env.addSource(new MysqlSource()) ds.print() env.execute() } }
实现 SourceFunction 的代码如下:
class MysqlSource extends SourceFunction[Stu] { var isFlag = true val url = "jdbc:mysql://192.168.100.155:3306/exp" val driver = "com.mysql.jdbc.Driver" val username = "root" val password = "okok" Class.forName(driver) override def run(sourceContext: SourceFunction.SourceContext[Stu]): Unit = { //添加mysql数据库的配置信息 var conn = DriverManager.getConnection(url,username,password) //该参数不能作为成员变量,必须写入方法中,否则报错:无法实现序列化 val ps = conn.prepareStatement("select * from s1_student") while (isFlag) { val rs = ps.executeQuery() while (rs.next()) { var no = rs.getString("sno") var name = rs.getString("sname") var birthday = rs.getString("sbirthday") var sex = rs.getString("ssex") var sclass = rs.getString("sclass") sourceContext.collect(new Stu(no,name,birthday,sex,sclass)) } Thread.sleep(3000) } } override def cancel(): Unit = { isFlag = false } }
Stu类:
class Stu (no:String,name:String,birthday:String,sex:String,sclass:String) extends Serializable { override def toString = { "no:" + no + "tname:" + name + "tbirthday:" + birthday + "tsex:" + sex + "tsclass:" + sclass } }
打印效果如下:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)