2021-11-09

2021-11-09,第1张

2021-11-09

解决问题:从gp->kafka,数据集获取,发送kafka的数据变换

val sourceDF = Range(0, 10)
  .map(index => {
    val dbtable =
      s"""(
         |select
         |aid1,
         |aid2,
         |t1 ,
         |t2 ,
         |along_interval ,
         |source_id ,
         |create_time,
         |dt ,
         |thumbnail_id1,
         |thumbnail_url1,
         |image_id1,
         |image_url1 ,
         |thumbnail_id2 ,
         |thumbnail_url2,
         |image_id2 ,
         |image_url2 ,
         |score1 ,
         |score2
         |from dwd_bigdata_relation_peer_day_5030
         |WHERe  create_time > '2020-06-27 00:00:00' AND create_time <= '2020-06-28 00:00:00') AS t_tmp_$index""".stripMargin
    println(dbtable)
    spark
      .read
      .format("jdbc")
      .option("driver", "org.postgresql.Driver")
      .option("url","jdbc:postgresql://192.168.11.33:2222/bigdata_dwd" )
      .option("dbtable", dbtable)
      .option("user", "张三")
      .option("password", "123456")
      .option("fetchsize","5000")
      .load()
  })
  .reduce((rdd1, rdd2) => rdd1.union(rdd2))
println("加载同行事件")
sourceDF.show()

val producer = new KafkaProducer[String, String](props)


val peerArray = sourceDF.collect
for(i <- 0 to peerArray.length-1){


  val row = peerArray(i)


  val sourceAid = row.getAs[String]("aid1")
  val targetAid = row.getAs[String]("aid2")
  val time = row.getAs[String]("t1")
  val source_id = row.getAs[String]("source_id") //根据摄像头sourceid进行反查
  val along_interval = row.getAs[Int]("along_interval")
  val create_time = row.getAs1665720284("create_time")
  val dt = row.getAs[String]("dt")
  val thumbnail_id1 = row.getAs[String]("thumbnail_id1")
  val thumbnail_url1 = row.getAs[String]("thumbnail_url1")
  val image_id1 = row.getAs[String]("image_id1")
  val image_url1 = row.getAs[String]("image_url1")
  val thumbnail_id2 = row.getAs[String]("thumbnail_id2")
  val thumbnail_url2 = row.getAs[String]("thumbnail_url2")
  val image_id2 = row.getAs[String]("image_id2")
  val image_url2 = row.getAs[String]("image_url2")
  val score1 = row.getAs[String]("score1")
  val score2 = row.getAs[String]("score2")
  val event = RelationshipPeer(sourceAid, targetAid, time, time, along_interval, source_id, create_time, dt, thumbnail_id1, thumbnail_url1, image_id1, image_url1, thumbnail_id2, thumbnail_url2, image_id2, image_url2, score1, score2)


  implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
  val data: String = Serialization.write(event)
  val message = new ProducerRecord[String, String](topic,s"key$i", data.toString())
  producer.send(message)
}

case class RelationshipPeer(aid1: String, aid2: String, t1: String, t2: String, along_interval: Int, source_id: String, create_time: Timestamp, dt: String, thumbnail_id1: String, thumbnail_url1: String, image_id1: String, image_url1: String , thumbnail_id2: String , thumbnail_url2: String, image_id2: String, image_url2: String, score1: String, score2: String)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5432744.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-11
下一篇 2022-12-11

发表评论

登录后才能评论

评论列表(0条)

保存