val sourceDF = Range(0, 10) .map(index => { val dbtable = s"""( |select |aid1, |aid2, |t1 , |t2 , |along_interval , |source_id , |create_time, |dt , |thumbnail_id1, |thumbnail_url1, |image_id1, |image_url1 , |thumbnail_id2 , |thumbnail_url2, |image_id2 , |image_url2 , |score1 , |score2 |from dwd_bigdata_relation_peer_day_5030 |WHERe create_time > '2020-06-27 00:00:00' AND create_time <= '2020-06-28 00:00:00') AS t_tmp_$index""".stripMargin println(dbtable) spark .read .format("jdbc") .option("driver", "org.postgresql.Driver") .option("url","jdbc:postgresql://192.168.11.33:2222/bigdata_dwd" ) .option("dbtable", dbtable) .option("user", "张三") .option("password", "123456") .option("fetchsize","5000") .load() }) .reduce((rdd1, rdd2) => rdd1.union(rdd2)) println("加载同行事件") sourceDF.show() val producer = new KafkaProducer[String, String](props) val peerArray = sourceDF.collect for(i <- 0 to peerArray.length-1){ val row = peerArray(i) val sourceAid = row.getAs[String]("aid1") val targetAid = row.getAs[String]("aid2") val time = row.getAs[String]("t1") val source_id = row.getAs[String]("source_id") //根据摄像头sourceid进行反查 val along_interval = row.getAs[Int]("along_interval") val create_time = row.getAs1665720284("create_time") val dt = row.getAs[String]("dt") val thumbnail_id1 = row.getAs[String]("thumbnail_id1") val thumbnail_url1 = row.getAs[String]("thumbnail_url1") val image_id1 = row.getAs[String]("image_id1") val image_url1 = row.getAs[String]("image_url1") val thumbnail_id2 = row.getAs[String]("thumbnail_id2") val thumbnail_url2 = row.getAs[String]("thumbnail_url2") val image_id2 = row.getAs[String]("image_id2") val image_url2 = row.getAs[String]("image_url2") val score1 = row.getAs[String]("score1") val score2 = row.getAs[String]("score2") val event = RelationshipPeer(sourceAid, targetAid, time, time, along_interval, source_id, create_time, dt, thumbnail_id1, thumbnail_url1, image_id1, image_url1, thumbnail_id2, thumbnail_url2, image_id2, image_url2, score1, score2) implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats val data: String = Serialization.write(event) val message = new ProducerRecord[String, String](topic,s"key$i", data.toString()) producer.send(message) } case class RelationshipPeer(aid1: String, aid2: String, t1: String, t2: String, along_interval: Int, source_id: String, create_time: Timestamp, dt: String, thumbnail_id1: String, thumbnail_url1: String, image_id1: String, image_url1: String , thumbnail_id2: String , thumbnail_url2: String, image_id2: String, image_url2: String, score1: String, score2: String)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)