- 1.从集合或者Elements中读取数据
- 2.从文件中读取数据流
- 3.从Kafka中读取数据流
- 4.自定义Source
一个Flink程序主要由Source + TransForm + Sink这三大部分组成,下面主要总结常见的Source的API *** 作 1.从集合或者Elements中读取数据
package com.hjt.yxh.hw.apitest
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
case class SensorReading(id:String, timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//1. 从集合中读取数据
val dataList = List(
SensorReading("sensor_1",1547718199,35.8),
SensorReading("sensor_6",1547718201,15.4),
SensorReading("sensor_7",1547718199,6.7),
SensorReading("sensor_10",1547718199,38.1),
)
val stream1 = env.fromCollection(dataList)
stream1.print()
//从Elements中读取数据
val stream2 = env.fromElements(1.0,35,"hello")
stream2.print()
env.execute("source test")
}
}
2.从文件中读取数据流
package com.hjt.yxh.hw.apitest
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
case class SensorReading(id:String, timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//从文件中读取数据
val inputPath = "D:\LearnWorkSpace\FlinkDemo\src\main\resources\Data\sensor.txt"
val txtDs = env.readTextFile(inputPath)
txtDs.print()
env.execute("source test")
}
}
3.从Kafka中读取数据流
- 在pom.xml中添加kafka的依赖包
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka-0.11_2.12artifactId>
<version>1.10.1version>
dependency>
- 连接kafaka的代码如下:
package com.hjt.yxh.hw.apitest
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
object SourceTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//从Kafka中读取数据
val properties = new Properties()
properties.setProperty("bootstrap.servers","192.168.0.52:9092,192.168.0.109:9092,192.168.0.115:9092")
properties.setProperty("auto.offset.reset","latest")
properties.setProperty("group.id","consumer-group")
//创建Kafka消费者对象
val source = {
new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties)
}
val stream3 = env.addSource(source).flatMap((_.split(" "))).map((_,1)).keyBy(0).sum(1)
//打印出接收到的Kafka的topic中的word 统计信息
stream3.print().setParallelism(1)
env.execute("source test")
}
}
- 然后在kafka那边可以模拟productor发送消息到topic
[root@k8s-node3 bin]# ./kafka-console-producer.sh --broker-list 192.168.0.52:9092,192.168.0.109:9092,192.168.0.115:9092 --topic sensor
>hello world test send message
>hello world test send message
需要注意的是,如果一直报错无法连接到broker,可能是因为flink连接到kafka拿到的broker信息是主机名+port。如k8s-node3:9092,这时需要在windows的hosts文件中添加k8s-node3 和对应的IP地址
4.自定义Source自定义的source的意义是除了Flink官方常用的一些数据源,我们可以扩展一些特定的数据源作为Flink的数据输入流。
package com.hjt.yxh.hw.apitest
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
case class SensorReading(id:String, timestamp: Long, temperature: Double)
object SourceTest {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//自定义Source
val stream4 = env.addSource(new MySensorSource)
stream4.print()
env.execute("source test")
}
}
//自定义Source类
class MySensorSource extends SourceFunction[SensorReading]{
var running = true
override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
var index = 0
while(running){
sourceContext.collect(SensorReading("sendor_"+index,System.currentTimeMillis(),35.4))
index = index +1
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
running = false
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)