FLink学习笔记:03-Flink DataStream的数据源Source

FLink学习笔记:03-Flink DataStream的数据源Source,第1张

文章目录
    • 1.从集合或者Elements中读取数据
    • 2.从文件中读取数据流
    • 3.从Kafka中读取数据流
    • 4.自定义Source

一个Flink程序主要由Source + TransForm + Sink这三大部分组成,下面主要总结常见的Source的API *** 作

1.从集合或者Elements中读取数据
package com.hjt.yxh.hw.apitest


import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}


case class SensorReading(id:String, timestamp: Long, temperature: Double)

object SourceTest {

  def main(args: Array[String]): Unit = {

    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //1. 从集合中读取数据
    val dataList = List(
      SensorReading("sensor_1",1547718199,35.8),
      SensorReading("sensor_6",1547718201,15.4),
      SensorReading("sensor_7",1547718199,6.7),
      SensorReading("sensor_10",1547718199,38.1),
    )

    val stream1 = env.fromCollection(dataList)
    stream1.print()
    
    //从Elements中读取数据
    val stream2 = env.fromElements(1.0,35,"hello")
    stream2.print()
    
    env.execute("source test")

  }

}
2.从文件中读取数据流
package com.hjt.yxh.hw.apitest


import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011


case class SensorReading(id:String, timestamp: Long, temperature: Double)

object SourceTest {

  def main(args: Array[String]): Unit = {

    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //从文件中读取数据
    val inputPath = "D:\LearnWorkSpace\FlinkDemo\src\main\resources\Data\sensor.txt"
    val txtDs = env.readTextFile(inputPath)
    txtDs.print()
    
    env.execute("source test")

  }

}

3.从Kafka中读取数据流
  • 在pom.xml中添加kafka的依赖包
<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-connector-kafka-0.11_2.12artifactId>
    <version>1.10.1version>
dependency>
  • 连接kafaka的代码如下:
package com.hjt.yxh.hw.apitest


import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

object SourceTest {

  def main(args: Array[String]): Unit = {

    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //从Kafka中读取数据
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","192.168.0.52:9092,192.168.0.109:9092,192.168.0.115:9092")
    properties.setProperty("auto.offset.reset","latest")
    properties.setProperty("group.id","consumer-group")
    
    //创建Kafka消费者对象
    val source = {
      new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties)
    }
    val stream3 = env.addSource(source).flatMap((_.split(" "))).map((_,1)).keyBy(0).sum(1)
    
    //打印出接收到的Kafka的topic中的word 统计信息
    stream3.print().setParallelism(1)
    
    env.execute("source test")
  }
}
  • 然后在kafka那边可以模拟productor发送消息到topic
[root@k8s-node3 bin]# ./kafka-console-producer.sh --broker-list 192.168.0.52:9092,192.168.0.109:9092,192.168.0.115:9092 --topic sensor
>hello world test send message
>hello world test send message

需要注意的是,如果一直报错无法连接到broker,可能是因为flink连接到kafka拿到的broker信息是主机名+port。如k8s-node3:9092,这时需要在windows的hosts文件中添加k8s-node3 和对应的IP地址

4.自定义Source

自定义的source的意义是除了Flink官方常用的一些数据源,我们可以扩展一些特定的数据源作为Flink的数据输入流。

package com.hjt.yxh.hw.apitest


import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}


case class SensorReading(id:String, timestamp: Long, temperature: Double)

object SourceTest {

  def main(args: Array[String]): Unit = {

    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //自定义Source
    val stream4 = env.addSource(new MySensorSource)
    stream4.print()
    env.execute("source test")

  }

}

//自定义Source类
class MySensorSource extends SourceFunction[SensorReading]{

  var running = true

  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    var index = 0
    while(running){
      sourceContext.collect(SensorReading("sendor_"+index,System.currentTimeMillis(),35.4))
      index = index +1
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    running = false
  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/760061.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-01
下一篇 2022-05-01

发表评论

登录后才能评论

评论列表(0条)

保存