java消费kafka数据之后,进行堆积之后在插入数据库

java消费kafka数据之后,进行堆积之后在插入数据库,第1张

java高频的获取kafka数据,导致数据库数据一致在高频读写,为了降低数据库的高频连接搞高频读写,可以将数据堆积一段时间之后,进行插入数据库 *** 作。

主要采用了队列和缓存,将获取到的数据放入java队列中,利用缓存进行延时判断。

 
      cn.hutool
      hutool-all
      5.8.0.M2
    


    
      com.google.guava
      guava
      26.0-jre
    
    @KafkaListener(topics = {"jyz_xxxxxxx"})
    public void jyz_xxxxxxx(ConsumerRecord record) throws InterruptedException {

        Optional kafkaMessage = Optional.ofNullable(record.value());

        String  SQL=null;

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();

            blockingQueue.offer(message.toString());

            long startTime= System.currentTimeMillis();

            long endTime = 0;

            try {
                endTime = fifoCache.get("endTime");
            } catch (Exception e) {
                endTime = System.currentTimeMillis();

                fifoCache.put("endTime",endTime);
            }

            if(startTime-endTime>=2000){

                List list = new ArrayList<>();
                // 转移阻塞队列数据到普通的List
                blockingQueue.drainTo(list);

                 SQL = String.join("", list);

                System.out.println("SQL:"+SQL);

                endTime = System.currentTimeMillis();

                fifoCache.put("endTime",endTime);


            }



        }
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/722385.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存