该文章代码来自黑马程序员全面深入学习Java并发编程,JUC并发编程全套教程_哔哩哔哩_bilibili
阻塞队列public class BlockingQueue线程池{ // 任务队列 private Queue queue = new linkedList (); // 锁 private ReentrantLock lock = new ReentrantLock(); // 生产者队列 private Condition fullWaitSet = lock.newCondition(); // 消费者队列 private Condition emptyWaitSet = lock.newCondition(); // 队列的最大容量 private int maxNum; public BlockingQueue(int maxNum){ this.maxNum = maxNum; } // 带超时时间的获取任务 public T poll(long timeout, TimeUnit timeUnit){ lock.lock(); try{ // 将timeout统一转换成纳秒 long nanos = timeUnit.tonanos(timeout); while(queue.isEmpty()){ // 返回的是剩余时间 if(nanos <= 0){ return null; } nanos = emptyWaitSet.awaitNanos(nanos); } T task = (T) queue.poll(); fullWaitSet.signal(); return task; } catch (Exception e){ e.printStackTrace(); return null; } finally { lock.unlock(); } } // 消费者获取任务 public T take() { lock.lock(); try{ while(queue.isEmpty()){ emptyWaitSet.await(); } T task = (T) queue.poll(); fullWaitSet.signal(); return task; } catch (Exception e){ e.printStackTrace(); return null; } finally { lock.unlock(); } } // 生产者添加任务 public void put(T task) { lock.lock(); try{ while(queue.size() == maxNum){ fullWaitSet.await(); } queue.add(task); emptyWaitSet.signal(); } catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); } } // 带超时时间的阻塞添加 public boolean offer(T task, long timeout, TimeUnit timeUnit){ lock.lock(); try{ long nanos = timeUnit.tonanos(timeout); while(queue.size() == maxNum){ if(nanos <= 0){ return false; } nanos = fullWaitSet.awaitNanos(nanos); } queue.add(task); emptyWaitSet.signal(); } catch (Exception e){ e.printStackTrace(); } finally { lock.unlock(); } return true; } // 拒绝策略 public void tryPut(RejectPolicy rejctPolicy, T task){ lock.lock(); try{ // 判断队列是否已满 if(queue.size() == maxNum){ System.out.println("[" + Thread.currentThread().getName() + "]" + "执行拒绝策略"); rejctPolicy.reject(this, task); } else { System.out.println("[" + Thread.currentThread().getName() + "]" + "加入任务队列"); queue.add(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } // 返回阻塞队列中的任务数 public int size(){ lock.lock(); try{ return queue.size(); } finally { lock.unlock(); } } }
public class ThreadPool { // 阻塞队列 private BlockingQueue测试类taskQueue; // 线程集合 private HashSet workers = new HashSet<>(); // 核心线程数 private int coreSize; // 获取任务的超时时间 private long timeout; private TimeUnit timeUnit; private RejectPolicy rejectPolicy; public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueNum, RejectPolicy rejctPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(queueNum); this.rejectPolicy = rejctPolicy; } // 执行任务 public void execute(Runnable task){ synchronized (workers) { // 当任务数没有超过coreSize的时候,交给Worker对象执行 // 当任务数超过了coreSize,将其假如任务队列 if (workers.size() < coreSize) { Worker worker = new Worker(task); System.out.println("[" + Thread.currentThread().getName() + "]" + "新增worker"); workers.add(worker); worker.start(); } else { //System.out.println("[" + Thread.currentThread().getName() + "]" + "加入到任务队列或执行拒绝策略"); //taskQueue.put(task); taskQueue.tryPut(rejectPolicy, task); } } } class Worker extends Thread{ private Runnable task; public Worker(Runnable task){ this.task = task; } @Override public void run() { // 执行任务 // 1) 当task不为空,执行task // 2) 当task执行完毕,看看任务队列是否为空,如果不为空就取出并执行 while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null){ try{ System.out.println("["+Thread.currentThread().getName()+"]"+"正在执行任务"); task.run(); } catch (Exception e){ e.printStackTrace(); } finally { task = null; } } synchronized (workers){ System.out.println("["+Thread.currentThread().getName()+"]"+"任务执行完毕,将worker移除"); workers.remove(this); } } } }
public class Test { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool( 2, 1000, TimeUnit.MICROSECONDS, 5, (queue, task) -> { queue.put(task); }); for(int i=0; i<15; i++){ int j = i; threadPool.execute(new Runnable() { @Override public void run() { System.out.println("第"+j+"个task执行"); } }); } } }执行结果
[main]新增worker [main]新增worker [main]加入任务队列 [main]加入任务队列 [Thread-1]正在执行任务 [main]加入任务队列 [Thread-0]正在执行任务 第0个task执行 第1个task执行 [main]加入任务队列 [main]加入任务队列 [main]执行拒绝策略 [Thread-0]正在执行任务 [Thread-1]正在执行任务 [main]加入任务队列 第2个task执行 [main]执行拒绝策略 第3个task执行 [main]执行拒绝策略 [Thread-0]正在执行任务 第4个task执行 [Thread-1]正在执行任务 第5个task执行 [main]执行拒绝策略 [Thread-0]正在执行任务 第6个task执行 [Thread-1]正在执行任务 第7个task执行 [main]加入任务队列 [main]执行拒绝策略 [Thread-0]正在执行任务 [main]加入任务队列 [Thread-1]正在执行任务 第9个task执行 第8个task执行 [Thread-1]正在执行任务 第10个task执行 [Thread-0]正在执行任务 第11个task执行 [Thread-1]正在执行任务 第12个task执行 [Thread-0]正在执行任务 [Thread-1]正在执行任务 第13个task执行 第14个task执行 [Thread-1]任务执行完毕,将worker移除 [Thread-0]任务执行完毕,将worker移除
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)