线程池的底层原理

线程池的底层原理,第1张

线程池的底层原理

new Thread().start(); 1. 线程的频繁创建好销毁 2. 线程的数量过多,会造成CPU资源的开销。 上下文切换 (消耗CPU资源)

使用线程池进行线程复用

线程池

提前创建一系列的线程,保存在这个线程池中。(核心线程) 有任务要执行的时候,从线程池中取出线程来执行。 没有任务的时候,线程池放回去。

Java中提供的线程池

Executors

1,newFixedThreadPool 固定线程数量

public static ExecutorService newFixedThreadPool(int nThreads) {
		return new ThreadPoolExecutor(nThreads, nThreads,
			0L, TimeUnit.MILLISECONDS,
			new linkedBlockingQueue());
     }

2,newSingleThreadExecutor 只有一个线程的线程池

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new linkedBlockingQueue()));
}

3,newCachedThreadPool 可以缓存的线程池 ->理论上来说,有多少请求,该线程池就可以创建多 少的线程来处理。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue());
}

4,newScheduledThreadPool //提供了按照周期执行的线程池. ->Timer

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

Executors底层都是ThreadPoolExecutor实现的

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,//核心线程数

                          int maximumPoolSize,//最大线程数

                          long keepAliveTime,//存活时间

                          TimeUnit unit, //存活单位

                          BlockingQueue workQueue, //阻塞队列
                          ThreadFactory threadFactory, //线程工厂,用来创建工作线程的。 默认实现(自定义线程池中线程的名字)

                          RejectedExecutionHandler handler) {//拒绝执行策略 。默认实现
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.tonanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

如何线程的复用. 1,[共享内存]->BlockingQueue 2,有任务来的时候,执行 3,没有任务的时候,阻塞 结论: 通过阻塞队列的方式,来实现线程池中线程的复用。 线程池的实现原理的过程推演

queue.take()和queue.poll(timeout)都是阻塞的状态,当线程池中没有线程任务(即阻塞队列中没有线程任务)的时候,会阻塞一短时间(keepAliveTime,//存活时间),超过这个时间,说明当前线程空闲时间超过了设置的时间keepAliveTime,该线程需要回收,摧毁

线程回收一般是回收 > corePoolSize,//核心线程以外的线程(即maximumPoolSize-corePoolSize 之间的线程 ),

而corePoolSize线程默认是不会回收的,会一致空闲的,知道线程池关闭,核心线程才会摧毁,回收

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果线程池已经结束状态,即线程池关闭了,直接返回null. 需要清理掉线程池中所有的工作线程

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c); // 当前线程池中的线程数

        // allowCoreThreadTimeOut 核心线程是否允许超时回收,默认是false,不回收
        // wc > corePoolSize;对于大于核心线程的非核心线程会进行超时回收
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // timedOut 为true,只有是线程超时了可以被回收了,才会是true
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 下面两种情况,只在阻塞队列为空,线程池没有任务的时候,核心线程和非核心线程空闲
            // 1,timed = true代表该线程超时了可以被回收,一般是非核心线程(>corePoolSized的线程)
            // workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 就不会一直阻塞在这,会返回r=null,
            // 所以timedOut = true;在上面该线程就会回收
            // 2,例如核心线程数( corePoolSize; timed = false
            // workQueue.take(); 会一致被阻塞这
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

 源码分析

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    //判断当前工作线程数是否小于核心线程数(延迟初始化)

    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) //添加核心工作线程的同时,执行command

            return;
        c = ctl.get();
    }
    //线程池中线程大于核心线程数,workQueue.offer 添加到阻塞队列,返回false说明阻塞队列满,
    // 将Worker任务添加到阻塞队列中,在runWorker()中会去从队列获取任务执行(如果当前Worker线程有任务
    // 就会执行当前线程自己的任务,如果当前线程没有任务就会从workQueue阻塞队列中获取任务执行)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果阻塞队列满了,则添加非核心工作线程(扩容的线程)
    else if (!addWorker(command, false))
        reject(command); // 创建非核心线程之后线程池中的线程大于最大线程数,执行拒绝策略
}

// 添加工作线程逻辑,并封装成Worker对象,去开去线程,
// 最终调用Worker的run()在run()方法中会调用Runnable firstTask任务的run()方法,这个主动调用
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) { //case1 通过原子操作来增加线程数量.

        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c)) //通过原子 *** 作来增加线程数量.
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    //case2 初始化工作线程.

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //构建一个工作线程,此时还没有启动.
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
    		 // private final HashSet workers = new HashSet();
       		 // 这这个workers的set是存放线程池中的线程容器,使用set来存放线程的
                    workers.add(w); //添加到一个容器中。
                    int s = workers.size();
                    if (s > largestPoolSize) 
                        largestPoolSize = s; //重新更新largestPoolSize
                    workerAdded = true; //添加成功。

                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //工作线程创建完成并且成功添加到了workers线程池的容器中,就启动线程
                // 此时,会调用到Worker对象中的run()方法,Worker在线程池中就相当与一个工作线程
                t.start(); 
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

// 工作线程worker,可以看到继承了Runnable,通过worker中的
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable{

    private static final long serialVersionUID = 6138294804551838833L;

    
    final Thread thread;
    
    Runnable firstTask;
    
    volatile long completedTasks;

    
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 将Worker本身作为线程任务,addWorker启动start()任务机会调到run()
        this.thread = getThreadFactory().newThread(this);
    }
    // 工作线程执行线程人物的入口
    public void run() {
        	// 这里会调用我们传进来的Runnable firstTask,就是我们添加到线程池的真正的任务的run()方法,
          // 这里是直接通过实例调用
    	runWorker(this);
    }
}

// 执行线程任务
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 拿到当前线程的任务,可能为空,也就是说当前线程自身没有任务了,
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 1,while循环保证当前线程不结束. 直到task为null,在getTask()中从阻塞队列中获取任务的时候
        // 如果阻塞队列中没有任务会阻塞,至到线程池中非核心线程数空闲时间达到了设置的时间就会返回null,
        // 线程运行结束,线程摧毁,
        // 2,线程池中的核心线程数默认是不摧毁的,只有在线程池关闭的时候全部摧毁
        // 3,如如果当前Worker线程已经持有任务,就执行自己的任务,如果没有任务就从阻塞队列中获取任务执行
        while (task != null || (task = getTask()) != null) {
            //表示当前线程正在运行一个任务,如果其他地方要shutdown().你必须要等我执行完成。
            w.lock(); //Worker继承了AQS -> 实现了互斥锁

            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run(); //执行task.run

                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

线程数量设置

1,IO密集型 CPU 2core+1

CPU利用率不高 IO密集型,是执行的任务run()方法计算比较大,IO比较多,IO阻塞会释放CPU的执行权,所以CPU利用率不高,可以多开一些线程任务,来提高并发处理能力,所以可以设置为 = CPU核数*2 + 1 2,CPU密集型 CPU +1

CPU密集型 是线程任务run()处理时间比较短,线程任务量比较大,所以CPU利用率比较高,所以CPU会频繁的进行上下文切换,导致系统性能降低,所以这种CPU密集型线程数不能设置过高,可以 = CPU核数 +1 CPU利用率很高,会增加上下文切换

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5683111.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存