Java Review - 并发编程

Java Review - 并发编程,第1张

Java Review - 并发编程

文章目录
  • 线程池主要解决两个问题
  • 类关系图
  • ctl 含义 ---- 记录线程池状态和线程池中线程个数
  • 线程池状态 及转换
  • 线程池参数
  • 线程池类型
  • mainLock & termination
  • Worker
  • 源码分析
    • public void execute(Runnable command)
    • 新增线程addWorkder源码分析
    • 工作线程Worker的执行
      • getTask()
      • processWorkerExit
    • shutdown
    • shutdownNow
    • awaitTermination
  • 小结


线程池主要解决两个问题
  • 一是当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行异步任务时直接new一个线程来运行,而线程的创建和销毁是需要开销的。线程池里面的线程是可复用的,不需要每次执行异步任务时都重新创建和销毁线程。

  • 二是线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等。每个ThreadPoolExecutor也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。

另外,线程池也提供了许多可调参数和可扩展性接口,以满足不同情境的需要,程序员可以使用更方便的Executors的工厂方法,比如newCachedThreadPool(线程池线程个数最多可达Integer.MAX_VALUE,线程自动回收)、newFixedThreadPool(固定大小的线程池)和newSingleThreadExecutor(单个线程)等来创建线程池,当然用户还可以自定义。


类关系图

在上图中,Executors其实是个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不同的线程池实例。


ctl 含义 ---- 记录线程池状态和线程池中线程个数

ThreadPoolExecutor继承了AbstractExecutorService,成员变量ctl是一个Integer的原子变量,用来记录线程池状态和线程池中线程个数,类似于ReentrantReadWriteLock使用一个变量来保存两种信息。

  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

这里假设Integer类型是32位二进制表示,则其中高3位用来表示线程池状态,后面29位用来记录线程池线程个数。

/用来标记线程池状态(高3位),线程个数(低29位)
//默认是RUNNING状态,线程个数为0

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//线程个数掩码位数
private static final int COUNT_BITS = Integer.SIZE - 3;

//线程最大个数(低29位)00011111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;


线程池状态:

//(高3位):11100000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;

//(高3位):00000000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

//(高3位):00100000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;

//(高3位):01000000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;

//(高3位):01100000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

// 获取高三位 运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//获取低29位 线程个数
private static int workerCountOf(int c)  { return c & CAPACITY; }

//计算ctl新值,线程状态 与 线程个数
private static int ctlOf(int rs, int wc) { return rs | wc; }


线程池状态 及转换
  • RUNNING:接受新任务并且处理阻塞队列里的任务。

  • SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务。

  • STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务。

  • TIDYING:所有任务都执行完(包含阻塞队列里面的任务)后当前线程池活动线程数为0,将要调用terminated方法。

  • TERMINATED:终止状态。terminated方法调用完成以后的状态。

线程池状态转换列举如下。

  • RUNNING -> SHUTDOWN :显式调用shutdown()方法,或者隐式调用了finalize()、方法里面的shutdown()方法。

  • RUNNING 或 SHUTDOWN)-> STOP :显式调用 shutdownNow()方法时。

  • SHUTDOWN -> TIDYING :当线程池和任务队列都为空时。

  • STOP -> TIDYING :当线程池为空时。

  • TIDYING -> TERMINATED: 当 terminated() hook 方法执行完成时


线程池参数
  • corePoolSize:线程池核心线程个数。

  • workQueue:用于保存等待执行的任务的阻塞队列, 比如基于数组的有界ArrayBlockingQueue、基于链表的无界linkedBlockingQueue、最多只有一个元素的同步队列SynchronousQueue及优先级队列PriorityBlockingQueue等。

  • maximunPoolSize:线程池最大线程数量。

  • ThreadFactory:创建线程的工厂。

  • RejectedExecutionHandler:饱和策略,当队列满并且线程个数达到maximunPoolSize后采取的策略。

    比如
    AbortPolicy(抛出异常)、
    CallerRunsPolicy(使用调用者所在线程来运行任务)、
    DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)
    DiscardPolicy(默默丢弃,不抛出异常)

  • keeyAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态,则这些闲置的线程能存活的最大时间。

  • TimeUnit:存活时间的时间单位


线程池类型
  • newFixedThreadPool :创建一个核心线程个数和最大线程个数都为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE。keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new linkedBlockingQueue());
    }
	// 使用自定义线程创建工厂
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new linkedBlockingQueue(),
                                      threadFactory);
    }
  • newSingleThreadExecutor: 创建一个核心线程个数和最大线程个数都为1的线程池,并且阻塞队列长度为Integer.MAX_VALUE。keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new linkedBlockingQueue()));
    }

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new linkedBlockingQueue(),
                                    threadFactory));
    }
  • newCachedThreadPool :创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE,并且阻塞队列为同步队列。keeyAliveTime=60说明只要当前线程在60s内空闲则回收。这个类型的特殊之处在于,加入同步队列的任务会被马上执行,同步队列里面最多只有一个任务。
  public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue(),
                                      threadFactory);
    }

mainLock & termination
    
    private final ReentrantLock mainLock = new ReentrantLock();

   
    private final Condition termination = mainLock.newCondition();
  • mainLock是独占锁,用来控制新增Worker线程 *** 作的原子性。

  • termination是该锁对应的条件队列,在线程调用awaitTermination时用来存放阻塞的线程。


Worker

Worker继承AQS和Runnable接口,是具体承载任务的对象。Worker继承了AQS,自己实现了简单不可重入独占锁,其中state=0表示锁未被获取状态,state=1表示锁已经被获取的状态,state=-1是创建Worker时默认的状态,创建时状态设置为-1是为了避免该线程在运行runWorker()方法前被中断。其中变量firstTask记录该工作线程执行的第一个任务,thread是具体执行任务的线程。

DefaultThreadFactory是线程工厂,newThread方法是对线程的一个修饰。其中poolNumber是个静态的原子变量,用来统计线程工厂的个数,threadNumber用来记录每个线程工厂创建了多少线程,这两个值也作为线程池和线程的名称的一部分。


源码分析 public void execute(Runnable command)

execute方法的作用是提交任务command到线程池进行执行。用户线程提交任务到线程池的模型图如下图所示。

从该图可以看出,ThreadPoolExecutor的实现实际是一个生产消费模型,当用户添加任务到线程池时相当于生产者生产元素,workers线程工作集中的线程直接执行任务或者从任务队列里面获取任务时则相当于消费者消费元素。

用户线程提交任务的execute方法的具体代码如下

public void execute(Runnable command) {

		// 1  任务为null ,抛出 npe异常
        if (command == null)
            throw new NullPointerException();
        
        
        // 2 获取当前线程池的状态 + 线程个数变量的组合值 
        int c = ctl.get();
		// 3 当前线程池中的个数是否小于corePoolSize ,小于的话则开启新的线程 
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 4 如果线程池处于running状态,则添加任务到阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
        	// 4.1 二次检查
            int recheck = ctl.get();
            // 4.2 如果当前线程池的状态不是running 则从队列中移除任务,并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 4.3 否则当线程数数量为空,则添加一个线程    
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }	
        // 5 如果队列满,则新增线程,新增线程失败,触发拒绝策略 
        else if (!addWorker(command, false))
            reject(command);
    }
  • 代码(3)判断如果当前线程池中线程个数小于corePoolSize,会向workers里面新增一个核心线程(core线程)执行该任务。

  • 如果当前线程池中线程个数大于等于corePoolSize则执行代码(4)。如果当前线程池处于RUNNING状态则添加当前任务到任务队列。这里需要判断线程池状态是因为有可能线程池已经处于非RUNNING状态,而在非RUNNING状态下是要抛弃新任务的。

  • 如果向任务队列添加任务成功,则代码(4.2)对线程池状态进行二次校验,这是因为添加任务到任务队列后,执行代码(4.2)前有可能线程池的状态已经变化了。这里进行二次校验,如果当前线程池状态不是RUNNING了则把任务从任务队列移除,移除后执行拒绝策略;如果二次校验通过,则执行代码(4.3)重新判断当前线程池里面是否还有线程,如果没有则新增一个线程。

  • 如果代码(4)添加任务失败,则说明任务队列已满,那么执行代码(5)尝试新开启线程(如上的thread3和thread4)来执行该任务,如果当前线程池中线程个数>maximumPoolSize则执行拒绝策略。

新增线程addWorkder源码分析
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary. 
            // 6 检查队列是否只在必要的时候为空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
			
			// 7 循环CAS增加线程个数 
            for (;;) {
                int wc = workerCountOf(c);
                // 7.1 如果线程个数超过限制 则返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 7.2 cas增加线程个数,同时只能有1个线程成功  
                if (compareAndIncrementWorkerCount(c))
                    break retry;
               // 7.3 cas失败了,则看线程池状态是否变化了,变化则跳到外层循环重试重新获取线程池状态,否者内层循环重新cas。
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
		
		// 8 到这里,说明CAS成功了
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        	// 8.1 创建Worker
        	 w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
            	// 8.2 加独占锁,为了workers同步,因为可能多个线程调用了线程池的execute方法。
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //8.3 重新检查线程池的状态,避免在获取锁前调用了shutdown接口
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 8.4 添加任务
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 8.5 添加成功,则启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

主要分两个部分:

  • 第一部分双重循环的目的是通过CAS *** 作增加线程数;
  • 第二部分主要是把并发安全的任务添加到workers里面,并且启动任务执行。

首先来分析第一部分的代码6

    // 6 检查队列是否只在必要的时候为空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

展开!运算后等价于

rs >= SHUTDOWN &&
               (rs != SHUTDOWN ||
             firstTask != null ||
             workQueue.isEmpty())

也就是说下面几种情况下会返回false:

  • 当前线程池状态为STOP,TIDYING,TERMINATED
  • 当前线程池状态为SHUTDOWN并且已经有了第一个任务
  • 当前线程池状态为SHUTDOWN并且任务队列为空

内层循环的作用是使用CAS *** 作增加线程数,代码(7.1)判断如果线程个数超限则返回false,否则执行代码(7.2)CAS *** 作设置线程个数,CAS成功则退出双循环,CAS失败则执行代码(7.3)看当前线程池的状态是否变化了,如果变了,则再次进入外层循环重新获取线程池状态,否则进入内层循环继续进行CAS尝试。

执行到第二部分的代码(8)时说明使用CAS成功地增加了线程个数,但是现在任务还没开始执行。这里使用全局的独占锁来控制把新增的Worker添加到工作集workers中。代码(8.1)创建了一个工作线程Worker。

代码(8.2)获取了独占锁,代码(8.3)重新检查线程池状态,这是为了避免在获取锁前其他线程调用了shutdown关闭了线程池。如果线程池已经被关闭,则释放锁,新增线程失败,否则执行代码(8.4)添加工作线程到线程工作集,然后释放锁。代码(8.5)判断如果新增工作线程成功,则启动工作线程。


工作线程Worker的执行

用户线程提交任务到线程池后,由Worker来执行。先看下Worker的构造函数。

		
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //创建一个线程
            this.thread = getThreadFactory().newThread(this);
        }

在构造函数内首先设置Worker的状态为-1,这是为了避免当前Worker在调用runWorker方法前被中断(当其他线程调用了线程池的shutdownNow时,如果Worker状态>=0则会中断该线程)。这里设置了线程的状态为-1,所以该线程就不会被中断了。在如下runWorker代码中,运行代码(9)时会调用unlock方法,该方法把status设置为了0,所以这时候调用shutdownNow会中断Worker线程。

  
        public void run() {
            runWorker(this);
        }
  final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 9 将state 置为0 ,允许终端
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        	// 10 
	            while (task != null || (task = getTask()) != null) {
            	// 10.1 
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                	// 10.2 执行任务前干一些事情
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                    	// 10.3 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                    	// 10.4 执行任务完成后干一些事情
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 10.5 统计当前Worker完成了多少任务
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
        	// 11 执行清理工作 
            processWorkerExit(w, completedAbruptly);
        }
    }
  • 在如上代码(10)中,如果当前task==null或者调用getTask从任务队列获取的任务返回null,则跳转到代码(11)执行。

  • 如果task不为null则执行代码(10.1)获取工作线程内部持有的独占锁,然后执行扩展接口代码(10.2)在具体任务执行前做一些事情。代码(10.3)具体执行任务,代码(10.4)在任务执行完毕后做一些事情,代码(10.5)统计当前Worker完成了多少个任务,并释放锁。

  • 这里在执行具体任务期间加锁,是为了避免在任务运行期间,其他线程调用了shutdown后正在执行的任务被中断(shutdown只会中断当前被阻塞挂起的线程)

getTask()

如果当前task为空,则直接执行,否者调用getTask从任务队列获取一个任务执行,如果任务队列为空,则worker退出。

private Runnable getTask() {
   boolean timedOut = false; // Did the last poll() time out?

   retry:
   for (;;) {
       int c = ctl.get();
       int rs = runStateOf(c);

       // 如果当前线程池状态>=STOP 或者线程池状态为shutdown并且工作队列为空则,减少工作线程个数
       if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
           decrementWorkerCount();
           return null;
       }

       boolean timed;      // Are workers subject to culling?

       for (;;) {
           int wc = workerCountOf(c);
           timed = allowCoreThreadTimeOut || wc > corePoolSize;

           if (wc <= maximumPoolSize && ! (timedOut && timed))
               break;
           if (compareAndDecrementWorkerCount(c))
               return null;
           c = ctl.get();  // Re-read ctl
           if (runStateOf(c) != rs)
               continue retry;
           // else CAS failed due to workerCount change; retry inner loop
       }

       try {

           //根据timed选择调用poll还是阻塞的take
           Runnable r = timed ?
               workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
               workQueue.take();
           if (r != null)
               return r;
           timedOut = true;
       } catch (InterruptedException retry) {
           timedOut = false;
       }
   }
}

processWorkerExit

代码(11)执行清理任务,其代码如下。

  
     private void processWorkerExit(Worker w, boolean completedAbruptly) {
   if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
       decrementWorkerCount();

   //统计整个线程池完成的任务个数
   final ReentrantLock mainLock = this.mainLock;
   mainLock.lock();
   try {
       completedTaskCount += w.completedTasks;
       workers.remove(w);
   } finally {
       mainLock.unlock();
   }

   //尝试设置线程池状态为TERMINATED,如果当前是shutdonw状态并且工作队列为空
   //或者当前是stop状态当前线程池里面没有活动线程
   tryTerminate();

   //如果当前线程个数小于核心个数,则增加
   int c = ctl.get();
   if (runStateLessThan(c, STOP)) {
       if (!completedAbruptly) {
           int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
           if (min == 0 && ! workQueue.isEmpty())
               min = 1;
           if (workerCountOf(c) >= min)
               return; // replacement not needed
       }
       addWorker(null, false);
   }
}

shutdown

调用shutdown后,线程池就不会在接受新的任务了,但是工作队列里面的任务还是要执行的,但是该方法立刻返回的,并不等待队列任务完成在返回。

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess(); // 12 
            advanceRunState(SHUTDOWN);// 13 
            interruptIdleWorkers();// 14 
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();// 15 
    }
  • 代码(12)检查看是否设置了安全管理器,是则看当前调用shutdown命令的线程是否有关闭线程的权限,如果有权限则还要看调用线程是否有中断工作线程的权限,如果没有权限则抛出SecurityException或者NullPointerException异常。

  • 其中代码(13)的内容如下,如果当前线程池状态>=SHUTDOWN则直接返回,否则设置为SHUTDOWN状态。

  private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }
  • 代码(14)的内容如下,其设置所有空闲线程的中断标志。这里首先加了全局锁,同时只有一个线程可以调用shutdown方法设置中断标志。然后尝试获取Worker自己的锁,获取成功则设置中断标志。由于正在执行的任务已经获取了锁,所以正在执行的任务没有被中断。这里中断的是阻塞到getTask()方法并企图从队列里面获取任务的线程,也就是空闲线程。
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

在如上代码中,首先使用CAS设置当前线程池状态为TIDYING,如果设置成功则执行扩展接口terminated在线程池状态变为TERMINATED前做一些事情,然后设置当前线程池状态为TERMINATED。最后调用 termination.signalAll()激活因调用条件变量termination的await系列方法而被阻塞的所有线程


shutdownNow

调用shutdownNow方法后,线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务,正在执行的任务会被中断,该方法会立刻返回,并不等待激活的任务执行完成。返回值为这时候队列里面被丢弃的任务列表。

 public List shutdownNow() {
        List tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess(); // 16
            advanceRunState(STOP);// 17 
            interruptWorkers();//18 
            tasks = drainQueue();//19 
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

在如上代码中,首先调用代码(16)检查权限,然后调用代码(17)设置当前线程池状态为STOP,随后执行代码(18)中断所有的工作线程。这里需要注意的是,中断的所有线程包含空闲线程和正在执行任务的线程。

  private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

然后代码(19)将当前任务队列里面的任务移动到tasks列表。


awaitTermination

等待线程池状态变为TERMINATED则返回,或者时间超时。由于整个过程独占锁,所以一般调用shutdown或者shutdownNow后使用。

    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }
  • 如上代码首先获取独占锁,然后在无限循环内部判断当前线程池状态是否至少是TERMINATED状态,如果是则直接返回,否则说明当前线程池里面还有线程在执行 ,则看设置的超时时间nanos是否小于0,小于0则说明不需要等待,那就直接返回,如果大于0则调用条件变量termination的awaitNanos方法等待nanos时间,期望在这段时间内线程池状态变为TERMINATED。

  • 在shutdown方法时提到过,当线程池状态变为TERMINATED时,会调用termination.signalAll()用来激活调用条件变量termination的await系列方法被阻塞的所有线程,所以如果在调用awaitTermination之后又调用了shutdown方法,并且在shutdown内部将线程池状态设置为TERMINATED,则termination.awaitNanos方法会返回。

  • 另外在工作线程Worker的runWorker方法内,当工作线程运行结束后,会调用processWorkerExit方法,在processWorkerExit方法内部也会调用tryTerminate方法测试当前是否应该把线程池状态设置为TERMINATED,如果是,则也会调用termination.signalAll()用来激活调用线程池的awaitTermination方法而被阻塞的线程。

  • 而且当等待时间超时后,termination.awaitNanos也会返回,这时候会重新检查当前线程池状态是否为TERMINATED,如果是则直接返回,否则继续阻塞挂起自己。

小结

线程池巧妙地使用一个Integer类型的原子变量来记录线程池状态和线程池中的线程个数。通过线程池状态来控制任务的执行,每个Worker线程可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5659818.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存