Flink入门到清明(持续上香中)

Flink入门到清明(持续上香中),第1张

Flink入门到清明(持续上香中)

和其他所有的计算框架一样,flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分

1.Environment

Flink Job在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。而这个环境对象的获取方式相对比较简单

2.Source

Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源.

3.Transform

算子对数据进行 *** 作后面我会详细介绍

4.Sink

在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出 *** 作

之前我们一直在使用的print方法其实就是一种Sink。

 简单书写一个flink的应用工程

这里新建一个maven工程项目myflink刚开始学习的话我现在本地window下玩flink flink有多种写法 可以用java书写 也可以用scala书写 相比较java,scala书写对于flink来说更符合

//flink包     

      org.apache.flink
      flink-scala_2.11
      1.7.2
    
    
//flink streaming包
      org.apache.flink
      flink-streaming-scala_2.11
      1.7.2
    
    
//flink 连接hadoop包
      org.apache.flink
      flink-shaded-hadoop-2-uber
      2.7.5-9.0
    
//flink 连接kafka包
    
      org.apache.flink
      flink-connector-kafka_2.11
      1.10.1
    
import org.apache.flink.streaming.api.scala._

case class WaterSensor(id:String,ts:Long,high:Double)
object MyExp1{
  def main(args: Array[String]): Unit = {
// 流式数据处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//这个数组是自己造的先简单的查看一下
//    val ds = env.fromCollection(Seq(
//      WaterSensor("ws_001", 1577844001, 45.0),
//      WaterSensor("ws_002", 1577844015, 43.0),
//      WaterSensor("ws_003", 1577844020, 42.0)
//    ))
//
//读hdfs上的文件 如果是本地的修改地址即可 注意要导入连接hdfs的包 不然会报错
val ds = env.readTextFile("hdfs://192.168.80.181:9000/data/userbehavior/UserBehavior.csv")
    ds.print()
    env.execute("one")
  }
}

 

 文件就全部读取到了

第二步 连接kafka上读取文件

启动kafka   kafka-server-start.sh /opt/soft/kafka200/config/server.properties

创建一个mydemo消费者

kafka-topics.sh --zookeeper 192.168.80.181:2181 --create --topic demo --partitions 1 --replication-factor 1

开启消费

kafka-console-producer.sh --broker-list 192.168.80.181:9092 --topic demo

开启监听

kafka-console-consumer.sh --bootstrap-server 192.168.80.181:9092 --topic demo --from-beginning

 

 新建一个myexp2

输入代码

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka._
import org.apache.kafka.clients.consumer.ConsumerConfig


object MyExp2 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val prop=new Properties()

    prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.80.181:9092")
    prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringDeserializer")
    prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringDeserializer")
    prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")
    prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cm")
    val kafka:DataStream[String] = env.addSource(
      new FlinkKafkaConsumer[String](
        "demo", new SimpleStringSchema(), prop)
    )
    kafka.print()
    env.execute("sensor")
  }
}

 

 3.自定义数据源

import java.util.Date

import org.apache.flink.streaming.api.functions.source.SourceFunction

import scala.util.Random

class MySource extends SourceFunction[String]{
  var flag=true
  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {

    while (flag){
      ctx.collect(Random.nextInt(100)+":"+new Date().getTime+":"+"hehe")
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit ={
    flag=false
  }
}
import org.apache.flink.streaming.api.scala._

object MyExp3 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
// 这里写的是死循环岁哟会一直走 如果你只跑了10条或者不会一直循环的话 多加一行参数 给定一个固定的核数调用 取决于你的电脑核数
//    env.setParallelism(1)
    val ds = env.addSource(
      new MySource
    )
    ds.print()
    env.execute("one")
  }
}

 3.map

package com.hc.myflinkone.exp
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api._
import org.apache.flink.streaming.api.scala._
object MyExp5 {
    def main(args: Array[String]): Unit = {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      val cfg=new Configuration()
      cfg.setString("testParam","ws_001:50,ws_002:60,ws_003:70")
      env.getConfig.setGlobalJobParameters(cfg)
      val ds = env.fromElements(
        WaterSensor("ws_001", 1577844001, 45.0),
        WaterSensor("ws_002", 1577844015, 43.0),
        WaterSensor("ws_003", 1577844020, 42.0)
      )
      ds.map(new MyRich()).print()
      env.execute()
    }
    class MyRich extends RichMapFunction[WaterSensor, String] {
      var paramMap=Map[String,Double]()
      override def map(in: WaterSensor): String = {
        val hi =paramMap.get(in.id).getOrElse(0.0)
        in.id+":"+in.ts.toString+":"+(hi+in.high).toString
      }

      override def open(parameters: Configuration): Unit = {
        var conf=getRuntimeContext.getExecutionConfig
          .getGlobalJobParameters.asInstanceOf[Configuration]
        val info = conf.getString("testParam", "")
          .split(",")
        info.map(line=>{
          var ps = line.split(":")
          paramMap +=(ps(0) -> ps(1).toDouble)
        })
      }
    }
  }

4.连接redis

package com.hc.myflinkone.exp

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import redis.clients.jedis.Jedis

//连接redis
case class Message(id:String,acttype:String,ts:Long)
case class Users(id:String,name:String,age:Int,gender:String,act:String,ts:Long)
object MyExp6 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val ds=env.fromCollection(Seq(
      Message("1","buy",1521357896),
      Message("2","collect",1521357896),
      Message("3","browse",1521357896),
      Message("4","order",1521357896),
      Message("5","borwse",1521357896)
    ))
    ds.map(new MyRich).print()
    env.execute("mytrans")

  }
}
class MyRich extends RichMapFunction[Message,Users] {
  var jedis:Jedis =null
  override def map(in: Message): Users ={
    var lst=jedis.hmget("users",in.id)
    if(lst.get(0)==null){
      Users(in.id,"",0,"",in.acttype,in.ts)
    }else{
      var str =lst.get(0).split(",")
      Users(in.id,str(0),str(1).toInt,str(2),in.acttype,in.ts)
    }
  }

  override def open(parameters: Configuration): Unit = {
    jedis=new Jedis("192.168.80.181",6379)
  }
}

5.连接mongodb

package com.hc.myflinkone.exp

import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}

object MyExp7 {
  def main(args: Array[String]): Unit = {
    val mc = MongoClient("192.168.80.181", 27017)
    val db=mc("mydemo")//加载数据库
    val tab = db("userinfos")
    val where=MongoDBObject("name" -> "zhangsan")
    val rdd=tab.map(x=>{
      (x.get("id"),x.get("name"),x.get("age"))
    }).foreach(println)
//    tab.filter(_.get("name").toString.startsWith("zhang")).foreach(println)
//    val row =MongoDBObject("id"->"4","name"->"zhaoyun","age"->1300)
//    tab.insert(row)
  }

}

6.flink做词频统计

package com.hc.myflinkone.exp

import org.apache.flink.streaming.api.scala._

object MyExp8 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val ds = env.fromCollection(Seq(
      "hello world", "hello hadoop", "hello flink"
    ))
    env.setParallelism(2)
    ds.flatMap(x=>{
      val info=x.split(" ")
      info.map(word=>(word,1))
    }).keyBy(0).reduce((x,y)=>(x._1,x._2+y._2))
      .shuffle.print()

    env.execute()
  }
}

7.flink 检测机器温度报警(简单书写)

package com.hc.myflinkone.exp

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig

object MyExp9 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val prop=new Properties()

    prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.80.181:9092")
    prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringDeserializer")
    prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringDeserializer")
    prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")
    prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cm")
    val ds = env.addSource(
      new FlinkKafkaConsumer[String](
        "mac",
        new SimpleStringSchema(),
        prop
      )
    )
    //1234,4,153264221,34.5 --机器号,设备部位,时间,温度
    val spstream = ds.split(line => {
      val info = line.split(",", -1)
      if (info(3).toDouble >= 50) {
        Seq("warning")
      } else {
        Seq("normal")
      }
    })
    spstream.select("warning").print()
    env.execute("sensor")
  }
}

8.flink 连接kafka读取数据 对比数据到mongo数据库中查询报警

package com.hc.myflinkone.exp
import java.util.Properties

import com.mongodb.casbah.imports.MongoClient
import com.mongodb.casbah.MongoCollection
import com.mongodb.casbah.commons.MongoDBObject
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig

case class MacTemplate(id:String,macname:String,
                       partid:String,partname:String,
                       maxtemp:Double,cTemp:Double,cTime:Long)
object MyExp10 {
  def main(args: Array[String]): Unit = {
    val env=StreamExecutionEnvironment.getExecutionEnvironment
    val prop=new Properties()
    prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.80.181:9092")
    prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringDeserializer")
    prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringDeserializer")
    prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")
    prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cm")
    val ds:DataStream[String] = env.addSource(
      new FlinkKafkaConsumer[String](
        "mac",
        new SimpleStringSchema(),
        prop
      )
    )
    ds.map(new xptemp).split(x=>{
      if (x.cTemp >= x.maxtemp) {
        Seq("warning")
      } else {
        Seq("normal")
      }
    }).select("warning").print()
    env.execute()
  }

}
class xptemp extends RichMapFunction[String,MacTemplate]{

  var tab:MongoCollection=_
  override def map(in: String): MacTemplate = {
    val info = in.split(",")
    val mo = MongoDBObject("id" -> info(0),"partid"->info(1))
    val ct =tab.findOne(mo)
    MacTemplate(
      ct.get.get("id").toString,
      ct.get.get("macname").toString,
      ct.get.get("partid").toString,
      ct.get.get("partname").toString,
      ct.get.get("maxtemp").toString.toDouble,
      info(3).toDouble,
      info(2).toLong
    )
  }

  override def open(parameters: Configuration): Unit = {
    val mc=MongoClient("192.168.80.181",27017)
    val database = mc("mac")
     tab = database("mac_temp")
  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688587.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存