java -XX:+UnlockDiagnosticVMOptions -XX:NativeMemoryTracking=summary -XX:+PrintNMTStatistics -version
public class FutureTaskDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { // 第一种方式:Future + ExecutorService 创建缓存线程池 ExecutorService service = Executors.newCachedThreadPool(); // 创建任务 Task task = new Task(); Futurejava实现和管理线程池的方式有哪些?简单举例使用 线程池基础使用示例future = service.submit(task); System.out.println(future.get()); service.shutdown(); } // 1. 继承Callable接口,实现call()方法,泛型参数为要返回的类型 static class Task implements Callable { @Override public Integer call() throws Exception { System.out.println("Thread [" + Thread.currentThread().getName() + "] is running"); int result = 0; for (int i = 1; i <= 100; ++i) { result += i; } Thread.sleep(3000); // System.out.println(result); return result; } } }
public class WorkerThread implements Runnable { private String command; public WorkerThread(String s){ this.command=s; } @Override public void run() { System.out.println(Thread.currentThread().getName()+" Start. Command = "+command); processCommand(); System.out.println(Thread.currentThread().getName()+" End."); } private void processCommand() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString(){ return this.command; } } public class ThreadPool { public static void main(String[] args) { // 创建一个定量工作线程且队列无限大的线程池 ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { WorkerThread thread = new WorkerThread("线程" + i); executorService.execute(thread); } // 等待当前所有任务执行完成且不再接手新任务进来 executorService.shutdown(); //如果有任务未完成就在这个循环中 while (!executorService.isTerminated()) { } System.out.println("总线程运行结束"); } }
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + "被拒绝了"); } }
package com.shark.wiki.threadExecutor.monitor; import java.util.concurrent.ThreadPoolExecutor; public class MyMonitorThread implements Runnable { private ThreadPoolExecutor executor; private int seconds; private boolean run=true; public MyMonitorThread(ThreadPoolExecutor executor, int delay) { this.executor = executor; this.seconds=delay; } public void shutdown(){ this.run=false; } @Override public void run() { // 如果当前对象run为true就不断循环打印线程池监控状态 while(run){ System.out.println( String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s", this.executor.getPoolSize(), this.executor.getCorePoolSize(), this.executor.getActiveCount(), this.executor.getCompletedTaskCount(), this.executor.getTaskCount(), this.executor.isShutdown(), this.executor.isTerminated())); try { Thread.sleep(seconds*1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
public class Test { public static void main(String args[]) throws InterruptedException { //RejectedExecutionHandler implementation RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl(); //Get the ThreadFactory implementation to use ThreadFactory threadFactory = Executors.defaultThreadFactory(); //creating the ThreadPoolExecutor ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue线程的启动和停止的方式有几种(todo) ThreadPoolExecutors有那些核心的参数?(2), threadFactory, rejectionHandler); //start the monitoring thread MyMonitorThread monitor = new MyMonitorThread(executorPool, 3); Thread monitorThread = new Thread(monitor); monitorThread.start(); //submit work to the thread pool for (int i = 0; i < 10; i++) { executorPool.execute(new WorkerThread("cmd" + i)); } Thread.sleep(30000); //shut down the pool executorPool.shutdown(); //shut down the monitor thread Thread.sleep(5000); monitor.shutdown(); } }
AbortPolicy: 直接抛出异常,默认策略; CallerRunsPolicy: 用调用者所在的线程来执行任务; DiscardOldestPolicy: 丢弃阻塞队列中靠最前的任务,并执行当前任务; DiscardPolicy: 直接丢弃任务;
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue状态workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // Integer.SIZE为 32 -3为29 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; //通过左移运算符使得高位为-1,当前状态表示会接受新任务且也会同理对了中的任务 private static final int SHUTDOWN = 0 << COUNT_BITS;//表示会处理任务但不接受新任务 private static final int STOP = 1 << COUNT_BITS;//不处理任务也不接受新任务,并会中断所有的任务 private static final int TIDYING = 2 << COUNT_BITS; //表示所有任务都已成功终止 private static final int TERMINATED = 3 << COUNT_BITS; //执行执行完terminated()即代表当前线程池已经终止执行了 // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }任务执行
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) {
//获取当前运行状态// int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl
//状态有改变从retry开始执行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
//加锁并初始化创建worker将任务分配给worker线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
final void runWorker(Worker w) { Thread wt = Thread.currentThread();任务提交
//拿到任务并重置firstTask Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try {
//循环的去执行任务 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { //让当前worker任务置为空,记录完成的任务 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
public class Test { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); Future线程池关闭future = es.submit(new Callable () { @Override public String call() throws Exception { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "future result"; } }); try { String result = future.get(); System.out.println(result); } catch (Exception e) { e.printStackTrace(); } } } // submit方法在AbstractExecutorService中的实现 public Future> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。如下一段代码所示 RunnableFuture ftask = newTaskFor(task, null); execute(ftask); return ftask; } 关于futureTask可以参考笔者的这篇文章简单聊聊FutureTask protected RunnableFuture newTaskFor(Callable callable) { return new FutureTask (callable); }
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //检查是否可以关闭线程 checkShutdownAccess(); //设置线程池状态 advanceRunState(SHUTDOWN); //尝试中断worker interruptIdleWorkers(); //预留方法,留给子类实现 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍历所有的worker for (Worker w : workers) { Thread t = w.thread; //先尝试调用w.tryLock(),如果获取到锁,就说明worker是空闲的,就可以直接中断它 //注意的是,worker自己本身实现了AQS同步框架,然后实现的类似锁的功能 //它实现的锁是不可重入的,所以如果worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回false if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }Executors可以创建哪三种线程池?
public class ThreadPoolExecutorDemo { private static final int CORE_POOL_SIZE = 5; // 开10个每个线程5s并行5s搞定 // private static final int CORE_POOL_SIZE = 10; private static final int MAX_POOL_SIZE = 10; private static final int QUEUE_CAPACITY = 100; private static final Long KEEP_ALIVE_TIME = 1L; public static void main(String[] args) { long start = System.currentTimeMillis(); //使用阿里巴巴推荐的创建线程池的方式 //通过ThreadPoolExecutor构造函数自定义参数创建 ThreadPoolExecutor executor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 0; i < 10; i++) { //创建WorkerThread对象(WorkerThread类实现了Runnable 接口) Runnable worker = new MyRunnable("线程" + i); //执行Runnable executor.execute(worker); } //终止线程池 executor.shutdown(); while (!executor.isTerminated()) { } long end = System.currentTimeMillis(); System.out.println("Finished all threads use:" + (end - start)+"ms"); } }几个常见的对比 Runnable vs Callable
public class Test { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); Futurefuture = es.submit(new Callable () { @Override public String call() throws Exception { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "future result"; } }); try { String result = future.get(); System.out.println(result); //简单示例下记得关闭线程池 es.shutdown(); } catch (Exception e) { e.printStackTrace(); } } }
public class ThreadPoolExecutorDemo { // private static final int CORE_POOL_SIZE = 5; // 开10个每个线程5s并行5s搞定 private static final int CORE_POOL_SIZE = 10; private static final int MAX_POOL_SIZE = 10; private static final int QUEUE_CAPACITY = 100; private static final Long KEEP_ALIVE_TIME = 1L; public static void main(String[] args) throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); //使用阿里巴巴推荐的创建线程池的方式 //通过ThreadPoolExecutor构造函数自定义参数创建 ThreadPoolExecutor executor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_CAPACITY), new ThreadPoolExecutor.CallerRunsPolicy()); ListfutureList=new ArrayList<>(); for (int i = 0; i < 100; i++) { //创建WorkerThread对象(WorkerThread类实现了Runnable 接口) Callable worker = Executors.callable(new baseTask("线程" + i),true); //执行Runnable Future future = executor.submit(worker); futureList.add(future); } for (int i = 0; i < futureList.size(); i++) { System.out.println(futureList.get(i).get()); } //终止线程池 executor.shutdown(); long end = System.currentTimeMillis(); System.out.println("Finished all threads use:" + (end - start) + "ms"); } }
shutdown()VSshutdownNow()(todo增加示例)shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。 shutdownNow() :关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。线程池使用注意事项
public class DebugRunnable implements Runnable{ @Override public void run() { Thread currentThread = Thread.currentThread(); System.out.println(currentThread.getName() + "-------------进入"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(currentThread.getName() + "-------------离开"); } } public static void main(String[] args) { DebugRunnable myRunnable = new DebugRunnable(); Thread thread1 = new Thread(myRunnable, "线程1"); Thread thread2 = new Thread(myRunnable, "线程2"); Thread thread3 = new Thread(myRunnable, "线程3"); thread1.start(); thread2.start(); thread3.start(); } }
@FunctionalInterface public interface Callable执行 execute()方法和 submit()方法的区别是什么呢?{ V call() throws Exception; }
而submit可以获取返回值,如下源码所示,他会将传入的任务封装成一个RunnableFuture对象(这个对象我们在源码也可以看出继承了Future所以可以拿到返回值),我们可以通过这个类的get方法阻塞获取返回值,而你你也可以通过get(long timeout, TimeUnit unit)在指定时间内阻塞获取返回值,当然小心使用,否则报了一个超时异常如代码段2
//线程池使用超时的例子 public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future如何创建线程池future = executorService.submit(new Callable () { @Override public Integer call() throws Exception { Thread.currentThread().sleep(5000); System.out.println(Thread.currentThread().getName()); return 3; } }); System.out.println( future.get(1000, TimeUnit.MILLISECONDS)); executorService.shutdown(); }
由阿里开发手册我们就知道创建线程池的方式尽可能使ThreadPoolExecutor ,若使用newSingleThreadExecutor或者使用newFixedThreadPool通过源码我们可以看出,他们使用的队列都是无界队列,使用不当很容易导致OOM问题,源码如下所示
- corePoolSize:当前线程池的核心线程数maximumPoolSize:当核心线程和队列都满了时,就会开启最大的线程数keepAliveTime:当最大线程数空闲执行时间时,就会被销毁unit:上述参数的单位workQueue:等待队列threadFactory:创建线程的工厂,可自定义handler:拒绝策略
- ThreadPoolExecutor.AbortPolicy: 抛出 RejectedExecutionException来拒绝新任务的处理(默认就是使用这种拒绝策略)。ThreadPoolExecutor.CallerRunsPolicy: 如果线程和队列都满了的话,该策略则会调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
public class MyTask implements Runnable { private int id; public MyTask(int id) { this.id = id; } @Override public void run() { System.out.println("当前线程名: " + Thread.currentThread().getName() + " id:" + id); } }
- ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }参考文献