用Flink程序数据分流项目

用Flink程序数据分流项目,第1张

首先消费到源数据:

这里源数据放在了kafka 发布订阅消息系统(专门用来处理流数据平台)

flink 消费kafka中的数据:

这里提供一个获取消费者的方法


def getFlinkKafkaConsume(kafkaServer:String,kafkaGroupId:String,topicName:String): FlinkKafkaConsumer[String] ={

val props = new Properties()

props.setProperty("bootstrap.servers",kafkaServer)
props.setProperty("group.id",kafkaGroupId)
props.setProperty("key.deserializer",classOf[StringSerializer].getName)
props.setProperty("value.deserializer",classOf[StringSerializer].getName)
props.setProperty("auto.offset.reset","latest")   // latest        earliest

val kafkaConsume = new FlinkKafkaConsumer[String](topicName,new SimpleStringSchema(),props)
kafkaConsume
}

分流 *** 作主要是写 *** 作(将消费到的数据向kafka 的一个或者多个主题中写入)


声明一个kafka生产者:

val properties = new Properties
properties.setProperty("bootstrap.servers", kafkaServer)

properties.setProperty("transaction.timeout.ms", s"${timeoutKafka}")


val myProducer = new FlinkKafkaProducer[String](
  "dwd-default-topic",
  new BinLogOdsToDwdProducerKafkaSchema("dwd_qy_db"),
  properties,
  FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)

mapDataSourde.addSink(produce)  //将数据写入到下一级kafka中

BinLogOdsToDwdProducerKafkaSchema 类实现

import java.lang
import java.nio.charset.StandardCharsets

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord
import java.nio.charset.StandardCharsets

import com.zw.bigdata.common.pojo.MaxwellBinlogRecord
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JObject, JValue}
import org.json4s.jackson.JsonMethods.parse
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.json4s._


           
class BinLogOdsToDwdProducerKafkaSchema(layerPrefix:String)  extends KafkaSerializationSchema[String]{

    val ODS_PREFIX=layerPrefix       //"dwd_yd_db"
    
    override def serialize(t: String, aLong: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
    
    implicit val formats = DefaultFormats
    
    val value = parse(t).extract[MaxwellBinlogRecord[JValue]]
    
    var dbName=value.database
    
    if (value.database.matches("^cps_user_(\d+).*")){
    dbName="cps_user"
    }
    
    if (value.database.matches("^cps_shard_(\d+).*")){
    dbName="cps_shard"
    }

    val targetTopic = s"${ODS_PREFIX}_${dbName}_${value.table}"


    //    println(compact(render(value.data)))      
    //    println(value)

    new ProducerRecord[Array[Byte], Array[Byte]](targetTopic, t.getBytes(StandardCharsets.UTF_8))
    
    //new ProducerRecord[Array[Byte], Array[Byte]](targetTopic, compact(render(value.data)).getBytes(StandardCharsets.UTF_8))
    
      }
    }

入口类:

import java.util.Properties

import com.zw.bigdata.common.config.EnvConfig
import com.zw.bigdata.common.kafka.BinLogOdsToDwdProducerKafkaSchema
import com.zw.bigdata.common.util.{FlinkExecutionEnvUtil, FlinkKafkaUtil}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer



object BinlogOds2DwdWriter {


val flinkCheckpointPath = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.LogOds2DwdWriter.flink.checkout.path")
val jobName = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.LogOds2DwdWriter.flink.job.name")
val timeoutKafka = 15 * 60 * 1000
val kafkaServer = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.kafka.bootstrap.servers")

val kafkaGroupId = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.kafka.group.id")

val binlogTopicName = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.kafka.binlog.topic.name")

val offsetPosition = EnvConfig.getConfigValue("com.zw.bigdata.qy.rt.base.kafka.topic.offset")


def main(args: Array[String]): Unit = {
val environment = FlinkExecutionEnvUtil.getStreamEnv(flinkCheckpointPath)

binlogOds2DwdWrite(environment)


environment.execute(jobName)

}



def binlogOds2DwdWrite(env:StreamExecutionEnvironment):Unit={
val qyBinlogKafka = FlinkKafkaUtil.getFlinkKafkaConsume(kafkaServer,kafkaGroupId,binlogTopicName)
qyBinlogKafka.setStartFromEarliest()
val qyBinlogDs = env.addSource(qyBinlogKafka)

val properties = new Properties
properties.setProperty("bootstrap.servers", kafkaServer)

properties.setProperty("transaction.timeout.ms", s"${timeoutKafka}")
val myProducer = new FlinkKafkaProducer[String](
  "dwd-default-topic",
  new BinLogOdsToDwdProducerKafkaSchema("dwd_qy_db"),
  properties,
  FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)

//    qyBinlogDs.print()
qyBinlogDs.addSink(myProducer)


  }

}

数据聚合:

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/887071.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-14
下一篇 2022-05-14

发表评论

登录后才能评论

评论列表(0条)

保存