1.先使kafka积压 ,写一个定时任务 来使kafka一直发送消息造成积压
@Component public class MyTask { @Autowired KafkaTemplate kafkaTemplate; //定时扫描日志文件 //发送给kafka //读文件 @Scheduled(cron = "0 21 11 * * ?") public void readLog() throws IOException { System.out.println("定时任务启动了"); String path="d:/logs/"; readFile(new File(path)); } private void readFile(File file) throws IOException { File[] files = file.listFiles(); if (files!=null&&files.length>0){ for (File file1 : files) { if (file1.isFile()){ BufferedReader reader = new BufferedReader(new FileReader(file1)); String str=""; while ((str=reader.readLine())!=null){ //一直读取消息 kafkaTemplate.send("test","import", UUID.randomUUID()+str); } }else { this.readFile(file1); } } } } }
上面的conpont注解是必须的 ,定时任务的注解也需要写
2.
public class MyListenerACK implements AcknowledgingMessageListener{ @Autowired RedisTemplate redisTemplate; @Autowired LogRep logRep; //队列 linkedBlockingQueue > lbq = new linkedBlockingQueue<>(); int i=0; @Override public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) { //开启线程 用来把消息放入队列 new Thread(new Runnable() { @Override public void run() { //把数据放入线程 try { lbq.put(data); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); //在开启一个线程来消费消息 new Thread(new Runnable() { @Override public void run() { //从队列取出消息 ConsumerRecord data = lbq.poll(); if (null != data) { myWork(data, acknowledgment); } } }).start(); } private void myWork(ConsumerRecord data, Acknowledgment acknowledgment) { ReentrantLock lock = new ReentrantLock(true); //加锁 lock.lock(); String key = data.key(); String value = data.value(); System.out.println("获取的数据key是"+key+"value是"+value); if (null!=key){ if (key.equals("FLOOR_ADD")){ //添加 //转换 TbFloor floor = JSON.parseObject(value, TbFloor.class); //防止提交的时候重复消费 if (redisTemplate.opsForValue().setIfAbsent(floor.getToken(),1,1, TimeUnit.DAYS)){ //存入redis redisTemplate.opsForList().rightPush("addFloors",floor); } //手动提交 acknowledgment.acknowledge(); //解锁 lock.unlock(); }else if (key.equals("logs")){ //把文件写入硬盘 FileUtil.writeFile("d:\logs\",data.value()); acknowledgment.acknowledge(); //解锁 lock.unlock(); }else if (key.equals("import")){ //截取前36位 String uuid = value.substring(0, 35); if (redisTemplate.opsForValue().setIfAbsent(uuid,1,7,TimeUnit.DAYS)){ //为了造成数据积压,所以把写入和手动提交暂时关闭 //读取日志文件 logRep.save(new LogInfo(i++,value)); acknowledgment.acknowledge(); } //解锁 lock.unlock(); System.out.println("log已经存入es"); } } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)