Kafka重复消费,消息丢失,消息积压,消息顺序消费解决方案

Kafka重复消费,消息丢失,消息积压,消息顺序消费解决方案,第1张

Kafka重复消费,消息丢失,消息积压,消息顺序消费解决方案
一  读取 指定路径文件日志 生产信息
  @Test
    public void testReadLog() throws Exception {
        // 读取日志文件
        String path="D:\idea-work\2021-12-28";
        System.out.println("开始读取");
        this.getFile(new File(path));

        // 测试类 读取文件消耗时间
        Thread.sleep(100000);
        System.out.println("结束读取");
    }

    private void getFile(File file) throws Exception {
        // 列出 该目录下所有文件
        File[] files = file.listFiles();
        if (files!=null && files.length>0){
            for (File file1 : files) {
                if (file1.isFile()){
                    // 如果是文件 就读去
                    System.err.println("文件名称=====>" + file1.getName());
                    BufferedReader reader = new BufferedReader(new FileReader(file1));
                    // 唯一值  防止 重复消费
                    UUID uuid = UUID.randomUUID();
                    String str="";
                    while ((str=reader.readLine()) != null){
                        kafkaTemplate.sendDefault("import",uuid+str);
                        // 发送消息
                    }
                } else {
                    // 如果是文件夹 递归 读取
                    this.getFile(file1);
                }
            }
        }
    }
二  kafka 消费端消费消息 使用手动提交解决防止丢失          使用线程队列 顺序
package com.hao.listener;

import com.hao.dao.LogRep;
import com.hao.domain.until.LogInfo;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;

import java.util.concurrent.linkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;


public class MyAKMessage implements AcknowledgingMessageListener {


    @Autowired
    LogRep logRep;

    @Autowired
    RedisTemplate redisTemplate;

    
    linkedBlockingQueue> queue = new linkedBlockingQueue<>();
    // 公平锁
    
    ReentrantLock lock = new ReentrantLock(true);
    
    @Override
    public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {
        
        // 开启一个线程用来 把消息放入队列
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // 存放信息
                    queue.put(data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 开启一个线程 消费信息
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 取出信息
                ConsumerRecord data = queue.poll();
                if (null!=data){
                    // 消费信息
                    myWork(data,acknowledgment);
                }
            }

        }).start();
    }
    int i=0;
    private void myWork(ConsumerRecord data, Acknowledgment acknowledgment) {
        // 上锁 
        lock.lock();
        // 获取键
        String key = data.key();
        // 获取值
        String value = data.value();
        System.out.println("消费者接收数据+key:"+key+"value:"+value);
        if (key.equals("import")){
            // 构建实体 存入 es
            LogInfo logInfo = new LogInfo();
            logInfo.setId(i++);
            logInfo.setValue(value);
            String id = value.substring(0, 35);
            Boolean b = redisTemplate.opsForValue().setIfAbsent(id, value, 7, TimeUnit.DAYS);
            if (b){
                // 防止 重复小费
                logRep.save(logInfo);
            }
        }
        // 手动提交
        acknowledgment.acknowledge();
        lock.unlock();//解锁
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688585.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存