多线程实现文件解析避免大数据量文件一次性加载到内存引起OOM

多线程实现文件解析避免大数据量文件一次性加载到内存引起OOM,第1张

线程实现文件解析避免大数据量文件一次性加载到内存引起OOM
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.*;

public class HelloWord {
    public static void main(String[] args) throws Exception {
        String filepath = "C:\test.txt";
        FileInputStream fileInputStream = new FileInputStream(new File(filepath));
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileInputStream));

        // 容器阈值
        int limit = 100000;
        // 数据容器
        List container = new ArrayList<>(limit);

        // 定义线程锁
        Lock lock = new ReentrantLock();
        Condition producer = lock.newCondition();
        Condition consumer = lock.newCondition();
        CountDownLatch latch = new CountDownLatch(2);

        // 文件结束标识
        AtomicBoolean eof = new AtomicBoolean(false);

        // 生产者线程
        Thread producerThread = new Thread(() -> {
            String str;
            try {
                lock.lock();
                while ((str = bufferedReader.readLine()) != null) {
                    container.add(str);
                    if (container.size() == limit) {
                        consumer.signal();
                        producer.await();
                    }
                }
                System.out.println("Producer thread name is:" + Thread.currentThread().getName() + ", data read end.");
                eof.set(true);
                latch.countDown();
                consumer.signal();
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });

        // 数据消费线程
        Thread consumerThread = new Thread(() -> {
            try {
                lock.lock();
                while (true) {
                    if (container.size() > 0) {
                        // 模拟消费数据
                        for (String var0 : container) {
                            System.out.println(var0);
                        }
                        // 清空数据容器
                        container.clear();
                    }
                    System.out.println("Consumer thread name is:" + Thread.currentThread().getName() + ", this batch data consume finish, current container size:" + container.size() + ", EOF:" + eof + ".");
                    if (!eof.get()) {
                        producer.signal();
                        consumer.await();
                    } else {
                        System.out.println("Consumer thread name is:" + Thread.currentThread().getName() + " file data consume end.");
                        break;
                    }
                }
                latch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });

        // 启动线程
        producerThread.setName("producer");
        producerThread.start();
        consumerThread.setName("consumer");
        consumerThread.start();

        // 阻塞线程
        latch.await();

        // 关闭IO流
        System.out.println("ready close resource.");
        bufferedReader.close();
        fileInputStream.close();
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5659153.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存