一 读取 指定路径文件日志 生产信息
@Test public void testReadLog() throws Exception { // 读取日志文件 String path="D:\idea-work\2021-12-28"; System.out.println("开始读取"); this.getFile(new File(path)); // 测试类 读取文件消耗时间 Thread.sleep(100000); System.out.println("结束读取"); } private void getFile(File file) throws Exception { // 列出 该目录下所有文件 File[] files = file.listFiles(); if (files!=null && files.length>0){ for (File file1 : files) { if (file1.isFile()){ // 如果是文件 就读去 System.err.println("文件名称=====>" + file1.getName()); BufferedReader reader = new BufferedReader(new FileReader(file1)); // 唯一值 防止 重复消费 UUID uuid = UUID.randomUUID(); String str=""; while ((str=reader.readLine()) != null){ kafkaTemplate.sendDefault("import",uuid+str); // 发送消息 } } else { // 如果是文件夹 递归 读取 this.getFile(file1); } } } }
二 kafka 消费端消费消息 使用手动提交解决防止丢失 使用线程队列 顺序
package com.hao.listener; import com.hao.dao.LogRep; import com.hao.domain.until.LogInfo; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.listener.AcknowledgingMessageListener; import org.springframework.kafka.support.Acknowledgment; import java.util.concurrent.linkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; public class MyAKMessage implements AcknowledgingMessageListener{ @Autowired LogRep logRep; @Autowired RedisTemplate redisTemplate; linkedBlockingQueue > queue = new linkedBlockingQueue<>(); // 公平锁 ReentrantLock lock = new ReentrantLock(true); @Override public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) { // 开启一个线程用来 把消息放入队列 new Thread(new Runnable() { @Override public void run() { try { // 存放信息 queue.put(data); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 开启一个线程 消费信息 new Thread(new Runnable() { @Override public void run() { // 取出信息 ConsumerRecord data = queue.poll(); if (null!=data){ // 消费信息 myWork(data,acknowledgment); } } }).start(); } int i=0; private void myWork(ConsumerRecord data, Acknowledgment acknowledgment) { // 上锁 lock.lock(); // 获取键 String key = data.key(); // 获取值 String value = data.value(); System.out.println("消费者接收数据+key:"+key+"value:"+value); if (key.equals("import")){ // 构建实体 存入 es LogInfo logInfo = new LogInfo(); logInfo.setId(i++); logInfo.setValue(value); String id = value.substring(0, 35); Boolean b = redisTemplate.opsForValue().setIfAbsent(id, value, 7, TimeUnit.DAYS); if (b){ // 防止 重复小费 logRep.save(logInfo); } } // 手动提交 acknowledgment.acknowledge(); lock.unlock();//解锁 } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)