自制简易线程池

自制简易线程池,第1张

自制简易线程池

该文章代码来自黑马程序员全面深入学习Java并发编程,JUC并发编程全套教程_哔哩哔哩_bilibili

阻塞队列
public class BlockingQueue {

    // 任务队列
    private Queue queue = new linkedList();

    // 锁
    private ReentrantLock lock = new ReentrantLock();

    // 生产者队列
    private Condition fullWaitSet = lock.newCondition();

    // 消费者队列
    private Condition emptyWaitSet = lock.newCondition();

    // 队列的最大容量
    private int maxNum;

    public BlockingQueue(int maxNum){
        this.maxNum = maxNum;
    }

    // 带超时时间的获取任务
    public  T poll(long timeout, TimeUnit timeUnit){
        lock.lock();
        try{
            // 将timeout统一转换成纳秒
            long nanos = timeUnit.tonanos(timeout);
            while(queue.isEmpty()){
                // 返回的是剩余时间
                if(nanos <= 0){
                    return null;
                }
                nanos = emptyWaitSet.awaitNanos(nanos);
            }
            T task = (T) queue.poll();
            fullWaitSet.signal();
            return task;
        } catch (Exception e){
            e.printStackTrace();
            return null;
        } finally {
            lock.unlock();
        }
    }

    // 消费者获取任务
    public  T take() {
        lock.lock();
        try{
            while(queue.isEmpty()){
                emptyWaitSet.await();
            }
            T task = (T) queue.poll();
            fullWaitSet.signal();
            return task;
        } catch (Exception e){
            e.printStackTrace();
            return null;
        } finally {
            lock.unlock();
        }
    }

    // 生产者添加任务
    public void put(T task) {
        lock.lock();
        try{
            while(queue.size() == maxNum){
                fullWaitSet.await();
            }
            queue.add(task);
            emptyWaitSet.signal();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }


    // 带超时时间的阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit){
        lock.lock();
        try{
            long nanos = timeUnit.tonanos(timeout);
            while(queue.size() == maxNum){
                if(nanos <= 0){
                    return false;
                }
                nanos = fullWaitSet.awaitNanos(nanos);
            }
            queue.add(task);
            emptyWaitSet.signal();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return true;
    }

    // 拒绝策略
    public void tryPut(RejectPolicy rejctPolicy, T task){
        lock.lock();
        try{
            // 判断队列是否已满
            if(queue.size() == maxNum){
                System.out.println("[" + Thread.currentThread().getName() + "]" + "执行拒绝策略");
                rejctPolicy.reject(this, task);
            } else {
                System.out.println("[" + Thread.currentThread().getName() + "]" + "加入任务队列");
                queue.add(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    // 返回阻塞队列中的任务数
    public int size(){
        lock.lock();
        try{
            return queue.size();
        } finally {
            lock.unlock();
        }
    }
}
线程池
public class ThreadPool {

    // 阻塞队列
    private BlockingQueue taskQueue;

    // 线程集合
    private HashSet workers = new HashSet<>();

    // 核心线程数
    private int coreSize;

    // 获取任务的超时时间
    private long timeout;

    private TimeUnit timeUnit;

    private RejectPolicy rejectPolicy;

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueNum, RejectPolicy rejctPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockingQueue<>(queueNum);
        this.rejectPolicy = rejctPolicy;
    }

    // 执行任务
    public void execute(Runnable task){
        synchronized (workers) {
            // 当任务数没有超过coreSize的时候,交给Worker对象执行
            // 当任务数超过了coreSize,将其假如任务队列
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                System.out.println("[" + Thread.currentThread().getName() + "]" + "新增worker");
                workers.add(worker);
                worker.start();
            } else {
                //System.out.println("[" + Thread.currentThread().getName() + "]" + "加入到任务队列或执行拒绝策略");
                //taskQueue.put(task);
                taskQueue.tryPut(rejectPolicy, task);
            }
        }

    }




    class Worker extends Thread{
        private Runnable task;

        public Worker(Runnable task){
            this.task = task;
        }

        @Override
        public void run() {
            // 执行任务
            // 1) 当task不为空,执行task
            // 2) 当task执行完毕,看看任务队列是否为空,如果不为空就取出并执行
            while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null){
                try{
                    System.out.println("["+Thread.currentThread().getName()+"]"+"正在执行任务");
                    task.run();
                } catch (Exception e){
                    e.printStackTrace();
                } finally {
                    task = null;
                }
            }
            synchronized (workers){
                System.out.println("["+Thread.currentThread().getName()+"]"+"任务执行完毕,将worker移除");
                workers.remove(this);
            }
        }
    }

}
测试类
public class Test {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(
                2,
                1000,
                TimeUnit.MICROSECONDS,
                5,
                (queue, task) -> {
                    queue.put(task);
                });
        for(int i=0; i<15; i++){
            int j = i;
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("第"+j+"个task执行");
                }
            });
        }
    }

}
执行结果
[main]新增worker
[main]新增worker
[main]加入任务队列
[main]加入任务队列
[Thread-1]正在执行任务
[main]加入任务队列
[Thread-0]正在执行任务
第0个task执行
第1个task执行
[main]加入任务队列
[main]加入任务队列
[main]执行拒绝策略
[Thread-0]正在执行任务
[Thread-1]正在执行任务
[main]加入任务队列
第2个task执行
[main]执行拒绝策略
第3个task执行
[main]执行拒绝策略
[Thread-0]正在执行任务
第4个task执行
[Thread-1]正在执行任务
第5个task执行
[main]执行拒绝策略
[Thread-0]正在执行任务
第6个task执行
[Thread-1]正在执行任务
第7个task执行
[main]加入任务队列
[main]执行拒绝策略
[Thread-0]正在执行任务
[main]加入任务队列
[Thread-1]正在执行任务
第9个task执行
第8个task执行
[Thread-1]正在执行任务
第10个task执行
[Thread-0]正在执行任务
第11个task执行
[Thread-1]正在执行任务
第12个task执行
[Thread-0]正在执行任务
[Thread-1]正在执行任务
第13个task执行
第14个task执行
[Thread-1]任务执行完毕,将worker移除
[Thread-0]任务执行完毕,将worker移除

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5706258.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存