和其他所有的计算框架一样,flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分
1.EnvironmentFlink Job在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。而这个环境对象的获取方式相对比较简单
2.SourceFlink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源.
3.Transform算子对数据进行 *** 作后面我会详细介绍
4.Sink在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出 *** 作
之前我们一直在使用的print方法其实就是一种Sink。
简单书写一个flink的应用工程
这里新建一个maven工程项目myflink刚开始学习的话我现在本地window下玩flink flink有多种写法 可以用java书写 也可以用scala书写 相比较java,scala书写对于flink来说更符合
//flink包org.apache.flink flink-scala_2.111.7.2 //flink streaming包 org.apache.flink flink-streaming-scala_2.111.7.2 //flink 连接hadoop包 //flink 连接kafka包org.apache.flink flink-shaded-hadoop-2-uber2.7.5-9.0 org.apache.flink flink-connector-kafka_2.111.10.1
import org.apache.flink.streaming.api.scala._ case class WaterSensor(id:String,ts:Long,high:Double) object MyExp1{ def main(args: Array[String]): Unit = { // 流式数据处理环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //这个数组是自己造的先简单的查看一下 // val ds = env.fromCollection(Seq( // WaterSensor("ws_001", 1577844001, 45.0), // WaterSensor("ws_002", 1577844015, 43.0), // WaterSensor("ws_003", 1577844020, 42.0) // )) // //读hdfs上的文件 如果是本地的修改地址即可 注意要导入连接hdfs的包 不然会报错 val ds = env.readTextFile("hdfs://192.168.80.181:9000/data/userbehavior/UserBehavior.csv") ds.print() env.execute("one") } }
文件就全部读取到了
第二步 连接kafka上读取文件
启动kafka kafka-server-start.sh /opt/soft/kafka200/config/server.properties
创建一个mydemo消费者
kafka-topics.sh --zookeeper 192.168.80.181:2181 --create --topic demo --partitions 1 --replication-factor 1
开启消费
kafka-console-producer.sh --broker-list 192.168.80.181:9092 --topic demo
开启监听
kafka-console-consumer.sh --bootstrap-server 192.168.80.181:9092 --topic demo --from-beginning
新建一个myexp2
输入代码
import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka._ import org.apache.kafka.clients.consumer.ConsumerConfig object MyExp2 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val prop=new Properties() prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.80.181:9092") prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest") prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cm") val kafka:DataStream[String] = env.addSource( new FlinkKafkaConsumer[String]( "demo", new SimpleStringSchema(), prop) ) kafka.print() env.execute("sensor") } }
3.自定义数据源
import java.util.Date import org.apache.flink.streaming.api.functions.source.SourceFunction import scala.util.Random class MySource extends SourceFunction[String]{ var flag=true override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (flag){ ctx.collect(Random.nextInt(100)+":"+new Date().getTime+":"+"hehe") Thread.sleep(1000) } } override def cancel(): Unit ={ flag=false } }
import org.apache.flink.streaming.api.scala._ object MyExp3 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 这里写的是死循环岁哟会一直走 如果你只跑了10条或者不会一直循环的话 多加一行参数 给定一个固定的核数调用 取决于你的电脑核数 // env.setParallelism(1) val ds = env.addSource( new MySource ) ds.print() env.execute("one") } }
3.map
package com.hc.myflinkone.exp import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api._ import org.apache.flink.streaming.api.scala._ object MyExp5 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val cfg=new Configuration() cfg.setString("testParam","ws_001:50,ws_002:60,ws_003:70") env.getConfig.setGlobalJobParameters(cfg) val ds = env.fromElements( WaterSensor("ws_001", 1577844001, 45.0), WaterSensor("ws_002", 1577844015, 43.0), WaterSensor("ws_003", 1577844020, 42.0) ) ds.map(new MyRich()).print() env.execute() } class MyRich extends RichMapFunction[WaterSensor, String] { var paramMap=Map[String,Double]() override def map(in: WaterSensor): String = { val hi =paramMap.get(in.id).getOrElse(0.0) in.id+":"+in.ts.toString+":"+(hi+in.high).toString } override def open(parameters: Configuration): Unit = { var conf=getRuntimeContext.getExecutionConfig .getGlobalJobParameters.asInstanceOf[Configuration] val info = conf.getString("testParam", "") .split(",") info.map(line=>{ var ps = line.split(":") paramMap +=(ps(0) -> ps(1).toDouble) }) } } }
4.连接redis
package com.hc.myflinkone.exp import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ import redis.clients.jedis.Jedis //连接redis case class Message(id:String,acttype:String,ts:Long) case class Users(id:String,name:String,age:Int,gender:String,act:String,ts:Long) object MyExp6 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val ds=env.fromCollection(Seq( Message("1","buy",1521357896), Message("2","collect",1521357896), Message("3","browse",1521357896), Message("4","order",1521357896), Message("5","borwse",1521357896) )) ds.map(new MyRich).print() env.execute("mytrans") } } class MyRich extends RichMapFunction[Message,Users] { var jedis:Jedis =null override def map(in: Message): Users ={ var lst=jedis.hmget("users",in.id) if(lst.get(0)==null){ Users(in.id,"",0,"",in.acttype,in.ts) }else{ var str =lst.get(0).split(",") Users(in.id,str(0),str(1).toInt,str(2),in.acttype,in.ts) } } override def open(parameters: Configuration): Unit = { jedis=new Jedis("192.168.80.181",6379) } }
5.连接mongodb
package com.hc.myflinkone.exp import com.mongodb.casbah.commons.MongoDBObject import com.mongodb.casbah.{MongoClient, MongoClientURI} object MyExp7 { def main(args: Array[String]): Unit = { val mc = MongoClient("192.168.80.181", 27017) val db=mc("mydemo")//加载数据库 val tab = db("userinfos") val where=MongoDBObject("name" -> "zhangsan") val rdd=tab.map(x=>{ (x.get("id"),x.get("name"),x.get("age")) }).foreach(println) // tab.filter(_.get("name").toString.startsWith("zhang")).foreach(println) // val row =MongoDBObject("id"->"4","name"->"zhaoyun","age"->1300) // tab.insert(row) } }
6.flink做词频统计
package com.hc.myflinkone.exp import org.apache.flink.streaming.api.scala._ object MyExp8 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val ds = env.fromCollection(Seq( "hello world", "hello hadoop", "hello flink" )) env.setParallelism(2) ds.flatMap(x=>{ val info=x.split(" ") info.map(word=>(word,1)) }).keyBy(0).reduce((x,y)=>(x._1,x._2+y._2)) .shuffle.print() env.execute() } }
7.flink 检测机器温度报警(简单书写)
package com.hc.myflinkone.exp import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.kafka.clients.consumer.ConsumerConfig object MyExp9 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val prop=new Properties() prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.80.181:9092") prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest") prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cm") val ds = env.addSource( new FlinkKafkaConsumer[String]( "mac", new SimpleStringSchema(), prop ) ) //1234,4,153264221,34.5 --机器号,设备部位,时间,温度 val spstream = ds.split(line => { val info = line.split(",", -1) if (info(3).toDouble >= 50) { Seq("warning") } else { Seq("normal") } }) spstream.select("warning").print() env.execute("sensor") } }
8.flink 连接kafka读取数据 对比数据到mongo数据库中查询报警
package com.hc.myflinkone.exp import java.util.Properties import com.mongodb.casbah.imports.MongoClient import com.mongodb.casbah.MongoCollection import com.mongodb.casbah.commons.MongoDBObject import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.kafka.clients.consumer.ConsumerConfig case class MacTemplate(id:String,macname:String, partid:String,partname:String, maxtemp:Double,cTemp:Double,cTime:Long) object MyExp10 { def main(args: Array[String]): Unit = { val env=StreamExecutionEnvironment.getExecutionEnvironment val prop=new Properties() prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.80.181:9092") prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest") prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cm") val ds:DataStream[String] = env.addSource( new FlinkKafkaConsumer[String]( "mac", new SimpleStringSchema(), prop ) ) ds.map(new xptemp).split(x=>{ if (x.cTemp >= x.maxtemp) { Seq("warning") } else { Seq("normal") } }).select("warning").print() env.execute() } } class xptemp extends RichMapFunction[String,MacTemplate]{ var tab:MongoCollection=_ override def map(in: String): MacTemplate = { val info = in.split(",") val mo = MongoDBObject("id" -> info(0),"partid"->info(1)) val ct =tab.findOne(mo) MacTemplate( ct.get.get("id").toString, ct.get.get("macname").toString, ct.get.get("partid").toString, ct.get.get("partname").toString, ct.get.get("maxtemp").toString.toDouble, info(3).toDouble, info(2).toLong ) } override def open(parameters: Configuration): Unit = { val mc=MongoClient("192.168.80.181",27017) val database = mc("mac") tab = database("mac_temp") } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)