Flink获取数据源的四种方式 (scala)

Flink获取数据源的四种方式 (scala),第1张

Flink获取数据的四种方式 (scala) 1.从集合中读取数据

使用 fromCollection() 方法将数据放入Seq序列中作为Flink的数据源

object Collections {
  def main(args: Array[String]): Unit = {
    //声明 flink 的运行环境
    var env = StreamExecutionEnvironment.getExecutionEnvironment

    var ds = env.fromCollection(Seq(
      (1,"zhangsan","male","20"),
      (2,"lisi","female","30")
    ))
    ds.print()
    env.execute()
  }
}

也可以使用样例类将数据封装成对象序列传入 fromCollection() 方法作为Flink的数据源

case class Worker(id:Int,name:String,gender:String,age:Int)
object Collections {
  def main(args: Array[String]): Unit = {
    //声明 flink 的运行环境
    var env = StreamExecutionEnvironment.getExecutionEnvironment
    var ds = env.fromCollection(Seq(
      Worker(1,"张三","男",20),
      Worker(2,"李四","女",21),
      Worker(3,"王五","男",22),
      Worker(4,"赵六","女",23)
    ))
    ds.print()
    env.execute()
  }
}

还可以通过调用 fromElements() 方法将数据直接输入

2.从文件中读取数据

readTextFile()

通过 readTextFile() 方法可以将本地或hdfs上的文件作为数据源读入Flink

3.从 Kafka 中读取数据

在实现 Flink 读取 Kafka 的数据时,需要导入相关依赖


    org.apache.flink
    flink-shaded-hadoop-2-uber
    2.7.5-9.0

通过 addSource() 方法加载kafka中的数据 

def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置kafka的相关配置
    val prop = new Properties()
    prop.setProperty("bootstrap.servers","192.168.100.155:9092")
    prop.setProperty("group.id","user1")

    val ds = env.addSource(new FlinkKafkaConsumer011[String]("flink",new SimpleStringSchema(),prop))
    ds.print()
    env.execute()
  }

可通过调节kafka的游标相关参数,读取已存在kafka队列中的数据

4.自定义数据源

以Flink读取MySQL的数据为例,通过 addSource() 方法,将实现 SourceFunction 的对象作为参数传入。代码如下:

object UDFSource {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val ds = env.addSource(new MysqlSource())
    ds.print()
    env.execute()
  }
}

 实现 SourceFunction 的代码如下:

class MysqlSource extends SourceFunction[Stu] {
  var isFlag = true
  val url = "jdbc:mysql://192.168.100.155:3306/exp"
  val driver = "com.mysql.jdbc.Driver"
  val username = "root"
  val password = "okok"
  Class.forName(driver)

  override def run(sourceContext: SourceFunction.SourceContext[Stu]): Unit = {
    //添加mysql数据库的配置信息
    var conn = DriverManager.getConnection(url,username,password)  //该参数不能作为成员变量,必须写入方法中,否则报错:无法实现序列化
    val ps = conn.prepareStatement("select * from s1_student")
    while (isFlag) {
      val rs = ps.executeQuery()
      while (rs.next()) {
        var no = rs.getString("sno")
        var name = rs.getString("sname")
        var birthday = rs.getString("sbirthday")
        var sex = rs.getString("ssex")
        var sclass = rs.getString("sclass")
        sourceContext.collect(new Stu(no,name,birthday,sex,sclass))
      }
      Thread.sleep(3000)
    }
  }

  override def cancel(): Unit = {
    isFlag = false
  }
}

Stu类:

class Stu (no:String,name:String,birthday:String,sex:String,sclass:String) extends Serializable {
  override def toString = {
    "no:" + no + "tname:" + name + "tbirthday:" + birthday + "tsex:" + sex + "tsclass:" + sclass
  }
}

打印效果如下:

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5690184.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存