new Thread().start(); 1. 线程的频繁创建好销毁 2. 线程的数量过多,会造成CPU资源的开销。 上下文切换 (消耗CPU资源)
使用线程池进行线程复用
线程池
提前创建一系列的线程,保存在这个线程池中。(核心线程) 有任务要执行的时候,从线程池中取出线程来执行。 没有任务的时候,线程池放回去。
Java中提供的线程池
Executors
1,newFixedThreadPool 固定线程数量
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new linkedBlockingQueue()); }
2,newSingleThreadExecutor 只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new linkedBlockingQueue())); }
3,newCachedThreadPool 可以缓存的线程池 ->理论上来说,有多少请求,该线程池就可以创建多 少的线程来处理。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
4,newScheduledThreadPool //提供了按照周期执行的线程池. ->Timer
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
Executors底层都是ThreadPoolExecutor实现的
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,//核心线程数 int maximumPoolSize,//最大线程数 long keepAliveTime,//存活时间 TimeUnit unit, //存活单位 BlockingQueueworkQueue, //阻塞队列 ThreadFactory threadFactory, //线程工厂,用来创建工作线程的。 默认实现(自定义线程池中线程的名字) RejectedExecutionHandler handler) {//拒绝执行策略 。默认实现 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.tonanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
如何线程的复用. 1,[共享内存]->BlockingQueue 2,有任务来的时候,执行 3,没有任务的时候,阻塞 结论: 通过阻塞队列的方式,来实现线程池中线程的复用。 线程池的实现原理的过程推演
queue.take()和queue.poll(timeout)都是阻塞的状态,当线程池中没有线程任务(即阻塞队列中没有线程任务)的时候,会阻塞一短时间(keepAliveTime,//存活时间),超过这个时间,说明当前线程空闲时间超过了设置的时间keepAliveTime,该线程需要回收,摧毁
线程回收一般是回收 > corePoolSize,//核心线程以外的线程(即maximumPoolSize-corePoolSize 之间的线程 ),
而corePoolSize线程默认是不会回收的,会一致空闲的,知道线程池关闭,核心线程才会摧毁,回收
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 如果线程池已经结束状态,即线程池关闭了,直接返回null. 需要清理掉线程池中所有的工作线程 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 当前线程池中的线程数 // allowCoreThreadTimeOut 核心线程是否允许超时回收,默认是false,不回收 // wc > corePoolSize;对于大于核心线程的非核心线程会进行超时回收 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // timedOut 为true,只有是线程超时了可以被回收了,才会是true if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 下面两种情况,只在阻塞队列为空,线程池没有任务的时候,核心线程和非核心线程空闲 // 1,timed = true代表该线程超时了可以被回收,一般是非核心线程(>corePoolSized的线程) // workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 就不会一直阻塞在这,会返回r=null, // 所以timedOut = true;在上面该线程就会回收 // 2,例如核心线程数(corePoolSize; timed = false // workQueue.take(); 会一致被阻塞这 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
源码分析
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //判断当前工作线程数是否小于核心线程数(延迟初始化) if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) //添加核心工作线程的同时,执行command return; c = ctl.get(); } //线程池中线程大于核心线程数,workQueue.offer 添加到阻塞队列,返回false说明阻塞队列满, // 将Worker任务添加到阻塞队列中,在runWorker()中会去从队列获取任务执行(如果当前Worker线程有任务 // 就会执行当前线程自己的任务,如果当前线程没有任务就会从workQueue阻塞队列中获取任务执行) if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果阻塞队列满了,则添加非核心工作线程(扩容的线程) else if (!addWorker(command, false)) reject(command); // 创建非核心线程之后线程池中的线程大于最大线程数,执行拒绝策略 } // 添加工作线程逻辑,并封装成Worker对象,去开去线程, // 最终调用Worker的run()在run()方法中会调用Runnable firstTask任务的run()方法,这个主动调用 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //case1 通过原子操作来增加线程数量. int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) //通过原子 *** 作来增加线程数量. break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //case2 初始化工作线程. boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //构建一个工作线程,此时还没有启动. w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // private final HashSetworkers = new HashSet (); // 这这个workers的set是存放线程池中的线程容器,使用set来存放线程的 workers.add(w); //添加到一个容器中。 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //重新更新largestPoolSize workerAdded = true; //添加成功。 } } finally { mainLock.unlock(); } if (workerAdded) { //工作线程创建完成并且成功添加到了workers线程池的容器中,就启动线程 // 此时,会调用到Worker对象中的run()方法,Worker在线程池中就相当与一个工作线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } // 工作线程worker,可以看到继承了Runnable,通过worker中的 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 将Worker本身作为线程任务,addWorker启动start()任务机会调到run() this.thread = getThreadFactory().newThread(this); } // 工作线程执行线程人物的入口 public void run() { // 这里会调用我们传进来的Runnable firstTask,就是我们添加到线程池的真正的任务的run()方法, // 这里是直接通过实例调用 runWorker(this); } } // 执行线程任务 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 拿到当前线程的任务,可能为空,也就是说当前线程自身没有任务了, Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 1,while循环保证当前线程不结束. 直到task为null,在getTask()中从阻塞队列中获取任务的时候 // 如果阻塞队列中没有任务会阻塞,至到线程池中非核心线程数空闲时间达到了设置的时间就会返回null, // 线程运行结束,线程摧毁, // 2,线程池中的核心线程数默认是不摧毁的,只有在线程池关闭的时候全部摧毁 // 3,如如果当前Worker线程已经持有任务,就执行自己的任务,如果没有任务就从阻塞队列中获取任务执行 while (task != null || (task = getTask()) != null) { //表示当前线程正在运行一个任务,如果其他地方要shutdown().你必须要等我执行完成。 w.lock(); //Worker继承了AQS -> 实现了互斥锁 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); //执行task.run } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
线程数量设置
1,IO密集型 CPU 2core+1
CPU利用率不高 IO密集型,是执行的任务run()方法计算比较大,IO比较多,IO阻塞会释放CPU的执行权,所以CPU利用率不高,可以多开一些线程任务,来提高并发处理能力,所以可以设置为 = CPU核数*2 + 1 2,CPU密集型 CPU +1
CPU密集型 是线程任务run()处理时间比较短,线程任务量比较大,所以CPU利用率比较高,所以CPU会频繁的进行上下文切换,导致系统性能降低,所以这种CPU密集型线程数不能设置过高,可以 = CPU核数 +1 CPU利用率很高,会增加上下文切换
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)