import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.*; public class HelloWord { public static void main(String[] args) throws Exception { String filepath = "C:\test.txt"; FileInputStream fileInputStream = new FileInputStream(new File(filepath)); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileInputStream)); // 容器阈值 int limit = 100000; // 数据容器 Listcontainer = new ArrayList<>(limit); // 定义线程锁 Lock lock = new ReentrantLock(); Condition producer = lock.newCondition(); Condition consumer = lock.newCondition(); CountDownLatch latch = new CountDownLatch(2); // 文件结束标识 AtomicBoolean eof = new AtomicBoolean(false); // 生产者线程 Thread producerThread = new Thread(() -> { String str; try { lock.lock(); while ((str = bufferedReader.readLine()) != null) { container.add(str); if (container.size() == limit) { consumer.signal(); producer.await(); } } System.out.println("Producer thread name is:" + Thread.currentThread().getName() + ", data read end."); eof.set(true); latch.countDown(); consumer.signal(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }); // 数据消费线程 Thread consumerThread = new Thread(() -> { try { lock.lock(); while (true) { if (container.size() > 0) { // 模拟消费数据 for (String var0 : container) { System.out.println(var0); } // 清空数据容器 container.clear(); } System.out.println("Consumer thread name is:" + Thread.currentThread().getName() + ", this batch data consume finish, current container size:" + container.size() + ", EOF:" + eof + "."); if (!eof.get()) { producer.signal(); consumer.await(); } else { System.out.println("Consumer thread name is:" + Thread.currentThread().getName() + " file data consume end."); break; } } latch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }); // 启动线程 producerThread.setName("producer"); producerThread.start(); consumerThread.setName("consumer"); consumerThread.start(); // 阻塞线程 latch.await(); // 关闭IO流 System.out.println("ready close resource."); bufferedReader.close(); fileInputStream.close(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)