CompletableFuture是一个异步编程工具类
简单使用
CompletableFuturecompletableFuture = new CompletableFuture<>(); new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //保存结果 completableFuture.complete("hello world"); }).start(); //获取结果 String result = completableFuture.get(); System.out.println("输出结果:" + result);
CompletableFuture实现了Future接口,所以它调用get()方法的时候会阻塞在那, 直到结果返回。
runAsync方法//runAsync传入一个runnable,返回一个CompletableFuture CompletableFuturevoidCompletableFuture = CompletableFuture.runAsync(() -> { try { Thread.sleep(2000); System.out.println("线程执行完毕"); } catch (InterruptedException e) { e.printStackTrace(); } }); // 等待线程执行完毕 System.out.println(voidCompletableFuture.get()); System.out.println("程序运行结束");
runAsync()方法的实现原理
public static CompletableFuturerunAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } //判断使用ForkJoinPool还是每过来一个任务创建一个线程执行,这里asyncPool是一个ForkJoinPool private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); //common在ForkJoinPool静态代码块中被初始化 public static ForkJoinPool commonPool() { // assert common != null : "static init error"; return common; }
//e是一个ForkJoinPool static CompletableFutureasyncRunStage(Executor e, Runnable f) { if (f == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture (); //封装成AsyncRun,AsyncRun继承了ForkJoinTask,实现了runnable接口,然后执行ForkJoinPool的execute //如果是supplyAsync这里会封装成一个AsyncSupply //thenRun封装成UniRun,thenAccept封装成UniAccept,thenApply封装成UniApply然后放到前一个任务的栈里,即后一个任务会被放进前一个任务的CompletableFuture的栈中:unipush(new UniRun ...) e.execute(new AsyncRun(d, f)); return d; } AsyncRun(CompletableFuture dep, Runnable fn) { this.dep = dep; this.fn = fn; }
public void execute(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask> job; //传入的是一个AsyncRun,AsyncRun继承了ForkJoinTask if (task instanceof ForkJoinTask>) // avoid re-wrap job = (ForkJoinTask>) task; else job = new ForkJoinTask.RunnableExecuteAction(task); //job被转换成ForkJoinTask externalPush(job); }
final void externalPush(ForkJoinTask> task) { WorkQueue[] ws; WorkQueue q; int m; //获得当前线程的探针 int r = ThreadLocalRandom.getProbe(); int rs = runState; //不会进if if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask>[] a; int am, n, s; if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) { int j = ((am & s) << ASHIFT) + Abase; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); U.putIntVolatile(q, QLOCK, 0); if (n <= 1) signalWork(ws, q); return; } U.compareAndSwapInt(q, QLOCK, 1, 0); } //task入队列并执行 externalSubmit(task); }
private void externalSubmit(ForkJoinTask> task) { int r; // initialize caller's probe //如果probe没有初始化,则初始化 if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (;;) { WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; //判断ForkJoinPool的状态 if ((rs = runState) < 0) { tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } else if ((rs & STARTED) == 0 || // initialize ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; rs = lockRunState(); try { //初始化workQueues if ((rs & STARTED) == 0) { U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // create workQueues array with size a power of two int p = config & SMASK; // ensure at least 2 slots int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; workQueues = new WorkQueue[n]; ns = STARTED; } } finally { unlockRunState(rs, (rs & ~RSLOCK) | ns); } } else if ((q = ws[k = r & m & SQMASK]) != null) { //加锁 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { //获得队列中的ForkJoinTask数组 ForkJoinTask>[] a = q.array; int s = q.top; boolean submitted = false; // initial submission or resizing try { // locked version of push if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + Abase; //cas把task加入到ForkJoinTask数组中 U.putOrderedObject(a, j, task); //更新下一个加入的task的数组索引 U.putOrderedInt(q, QTOP, s + 1); submitted = true; } } finally { //解锁 U.compareAndSwapInt(q, QLOCK, 1, 0); } if (submitted) { //通知执行task signalWork(ws, q); return; } } move = true; // move on failure } else if (((rs = runState) & RSLOCK) == 0) { // create new queue q = new WorkQueue(this, null); q.hint = r; q.config = k | SHARED_QUEUE; q.scanState = INACTIVE; rs = lockRunState(); // publish index if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; // else terminated unlockRunState(rs, rs & ~RSLOCK); } else move = true; // move if busy if (move) r = ThreadLocalRandom.advanceProbe(r); } }
//task已经添加到了workqueue中 final void signalWork(WorkQueue[] ws, WorkQueue q) { long c; int sp, i; WorkQueue v; Thread p; while ((c = ctl) < 0L) { // too few active if ((sp = (int)c) == 0) { // no idle workers if ((c & ADD_WORKER) != 0L) // too few workers //添加工作线程 tryAddWorker(c); break; } if (ws == null) // unstarted/terminated break; if (ws.length <= (i = sp & SMASK)) // terminated break; if ((v = ws[i]) == null) // terminating break; int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState int d = sp - v.scanState; // screen CAS long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; // activate v if ((p = v.parker) != null) U.unpark(p); break; } if (q != null && q.base == q.top) // no more work break; } }
private void tryAddWorker(long c) { boolean add = false; do { long nc = ((AC_MASK & (c + AC_UNIT)) | (TC_MASK & (c + TC_UNIT))); if (ctl == c) { int rs, stop; // check if terminating if ((stop = (rs = lockRunState()) & STOP) == 0) add = U.compareAndSwapLong(this, CTL, c, nc); unlockRunState(rs, rs & ~RSLOCK); if (stop != 0) break; if (add) { //添加worker createWorker(); break; } } } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); } private boolean createWorker() { //factory在静态代码块中被初始化 ForkJoinWorkerThreadFactory fac = factory; Throwable ex = null; ForkJoinWorkerThread wt = null; try { //wt = ForkJoinWorkerThread,ForkJoinWorkerThread继承Thread if (fac != null && (wt = fac.newThread(this)) != null) { //执行ForkJoinWorkerThread的start方法即执行它的run方法 wt.start(); return true; } } catch (Throwable rex) { ex = rex; } deregisterWorker(wt, ex); return false; }
public void run() { if (workQueue.array == null) { // only run once Throwable exception = null; try { onStart(); //执行task pool.runWorker(workQueue); } catch (Throwable ex) { exception = ex; } finally { try { onTermination(exception); } catch (Throwable ex) { if (exception == null) exception = ex; } finally { pool.deregisterWorker(this, exception); } } } }
final void runWorker(WorkQueue w) { w.growArray(); // allocate queue int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift for (ForkJoinTask> t;;) { //获得task if ((t = scan(w, r)) != null) //执行task w.runTask(t); else if (!awaitWork(w, r)) break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } } final void runTask(ForkJoinTask> task) { if (task != null) { scanState &= ~SCANNING; // mark as busy //执行task (currentSteal = task).doExec(); U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC execLocalTasks(); ForkJoinWorkerThread thread = owner; if (++nsteals < 0) // collect on overflow transferStealCount(pool); scanState |= SCANNING; if (thread != null) thread.afterTopLevelExec(); } }
final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { //ForkJoinTask一开始被传入的他的子类AsyncRun,所以这里执行AsyncRun的exec方法 completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; }
public final boolean exec() { run(); return true; } public void run() { CompletableFuturesupplyAsync方法d; Runnable f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { //执行最开始自定义的runnable f.run(); d.completeNull(); } catch (Throwable ex) { d.completeThrowable(ex); } } //thenRun/thenAccept/thenApply这种有后续的方法的话,会将压入栈中任务出栈执行 d.postComplete(); } }
//传入的是一个Supplier,表示他有返回值 CompletableFuturefuture = CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "返回结果"; } }); //获取返回值 String result = future.get(); System.out.println("任务执行结果:" + result);
supplyAsync原理和runAsync类似,只是在forkjoinpool.execute的时候传入的是AsyncSupply,所以后面执行的是
AsyncSupply的exec方法
public final boolean exec() { run(); return true; } public void run() { CompletableFuturethenRund; Supplier f; //获得最开始声明的CompletableFuture if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { //将Supplier执行后的值放到CompletableFuture中,CompletableFuture通过get方法获取 d.completevalue(f.get()); } catch (Throwable ex) { d.completeThrowable(ex); } } d.postComplete(); } }
CompletableFuture voidCompletableFuture = CompletableFuture.supplyAsync(() -> { try { System.out.println("supplyAsync run"); Thread.sleep(2000); System.out.println("supplyAsync run end"); } catch (InterruptedException e) { e.printStackTrace(); } return "返回结果"; }).thenRun(() -> { try { System.out.println("thenRun run"); Thread.sleep(3000); System.out.println("thenRun run end"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务执行结束之后执行的语句"); }); // 阻塞等待任务执行完成 System.out.println("阻塞"); voidCompletableFuture.get(); System.out.println("任务执行结束");thenAccept(Consumer)
CompletableFuturethenApply(Function)future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("supplyAsync执行完毕"); return "返回的结果"; }).thenAccept(new Consumer () { //参数为前面supplyAsync的结果 @Override public void accept(String param) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("输出结果:" + param); } }); // 等待任务执行完成 future.get(); System.out.println("任务执行完毕");
CompletableFuturethenComposefuture = CompletableFuture.supplyAsync(()-> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回supplyAsync结果; return "返回supplyAsync的结果"; }).thenApply(new Function () { //参数为前面supplyAsync的结果 @Override public String apply(String middle) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } //获取supplyAsync结果,返回"); return middle+"再执行thenApply后返回"; } }); String str = future.get(); System.out.println("最终的结果为:" + str);
CompletableFuturethenCombinefuture = CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { return "hello world"; } }).thenCompose(new Function >() { @Override public CompletionStage apply(String s) { return CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { String finalStr = s + "执行thenCompose中的supplyAsync然后返回"; return finalStr; } }); } }); String str = future.get(); System.out.println(str);
CompletableFutureallOf 和anyOffuture = CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { return "hello"; } //thenCombine的第二个参数是一个BiFunction,对CompletableFuture的结果进行统一处理 }).thenCombine(CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { return "word"; } //获得上面两个supplyAsync的返回结果并处理 }), new BiFunction () { @Override public String apply(String s1, String s2) { return s1 +" "+ s2; } }); String result = future.get(); System.out.println(result);
thenCompose和thenCombine方法只能组合2个CompletableFuture,allOf 和anyOf 可以组合多个CompletableFuture
Random RANDOM = new Random(); CompletableFuture[] futures = new CompletableFuture[10]; int[] ints = new int[10]; for (int i = 0; i < 10; i++) { final int finalI = i; CompletableFuturefuture = CompletableFuture.runAsync(new Runnable() { @Override public void run() { try { //随机一个数并sleep int intNext = RANDOM.nextInt(5000); Thread.sleep(intNext); //随机数放入数组 ints[finalI] = intNext; } catch (InterruptedException e) { e.printStackTrace(); } } }); //future放入数组 futures[i] = future; } CompletableFuture.allOf(futures).thenRun(new Runnable() { // CompletableFuture.anyOf(futures).thenRun(new Runnable() { @Override public void run() { System.out.println(Arrays.toString(ints)); } }).get();
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)