多线程 *** 作来防止kafka积压和顺序消费

多线程 *** 作来防止kafka积压和顺序消费,第1张

线程 *** 作来防止kafka积压和顺序消费

1.先使kafka积压 ,写一个定时任务 来使kafka一直发送消息造成积压

@Component
public class MyTask {
    @Autowired
    KafkaTemplate kafkaTemplate;
    //定时扫描日志文件
    //发送给kafka

    //读文件
    @Scheduled(cron = "0 21 11 * * ?")
    public void readLog() throws IOException {
        System.out.println("定时任务启动了");
        String path="d:/logs/";
        readFile(new File(path));
    }

    private void readFile(File file) throws IOException {
        File[] files = file.listFiles();
        if (files!=null&&files.length>0){
            for (File file1 : files) {
                if (file1.isFile()){
               BufferedReader reader = new BufferedReader(new FileReader(file1));
               String str="";
               while ((str=reader.readLine())!=null){
                   //一直读取消息
                   kafkaTemplate.send("test","import", UUID.randomUUID()+str);
               }
                }else {
                    this.readFile(file1);
                }
            }
        }
    }
}

上面的conpont注解是必须的 ,定时任务的注解也需要写

2.

public class MyListenerACK implements AcknowledgingMessageListener {
    @Autowired
    RedisTemplate redisTemplate;
    @Autowired
    LogRep logRep;

    //队列
    linkedBlockingQueue> lbq = new linkedBlockingQueue<>();


    int i=0;
    @Override
    public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {

        //开启线程  用来把消息放入队列
        new Thread(new Runnable() {
            @Override
            public void run() {
                //把数据放入线程
                try {
                    lbq.put(data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        //在开启一个线程来消费消息
        new Thread(new Runnable() {
            @Override
            public void run() {
                //从队列取出消息
                ConsumerRecord data = lbq.poll();
                if (null != data) {
                    myWork(data, acknowledgment);
                }
            }
        }).start();
    }

    private void myWork(ConsumerRecord data, Acknowledgment acknowledgment) {

        ReentrantLock lock = new ReentrantLock(true);
        //加锁
        lock.lock();
        String key = data.key();
        String value = data.value();
        System.out.println("获取的数据key是"+key+"value是"+value);
        if (null!=key){
            if (key.equals("FLOOR_ADD")){
                //添加
                //转换
                TbFloor floor = JSON.parseObject(value, TbFloor.class);

                //防止提交的时候重复消费
                if (redisTemplate.opsForValue().setIfAbsent(floor.getToken(),1,1, TimeUnit.DAYS)){
                    //存入redis
                    redisTemplate.opsForList().rightPush("addFloors",floor);
                }
                //手动提交
                acknowledgment.acknowledge();

                //解锁
                lock.unlock();
            }else  if (key.equals("logs")){
                //把文件写入硬盘
                FileUtil.writeFile("d:\logs\",data.value());
                acknowledgment.acknowledge();
                //解锁
                lock.unlock();
            }else if (key.equals("import")){
                //截取前36位
                String uuid = value.substring(0, 35);
                if (redisTemplate.opsForValue().setIfAbsent(uuid,1,7,TimeUnit.DAYS)){
                    //为了造成数据积压,所以把写入和手动提交暂时关闭
                    //读取日志文件
                    logRep.save(new LogInfo(i++,value));
                    acknowledgment.acknowledge();
                }
                //解锁
                lock.unlock();
                System.out.println("log已经存入es");
            }
        }

    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688765.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存