CompletableFuture的使用和部分原理

CompletableFuture的使用和部分原理,第1张

CompletableFuture的使用和部分原理

CompletableFuture是一个异步编程工具类
简单使用

        CompletableFuture completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //保存结果
            completableFuture.complete("hello world");
        }).start();
        //获取结果
        String result = completableFuture.get();
        System.out.println("输出结果:" + result);

CompletableFuture实现了Future接口,所以它调用get()方法的时候会阻塞在那, 直到结果返回

runAsync方法
        //runAsync传入一个runnable,返回一个CompletableFuture
        CompletableFuture voidCompletableFuture = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("线程执行完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 等待线程执行完毕
        System.out.println(voidCompletableFuture.get());
        System.out.println("程序运行结束");

runAsync()方法的实现原理

    public static CompletableFuture runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
    //判断使用ForkJoinPool还是每过来一个任务创建一个线程执行,这里asyncPool是一个ForkJoinPool
    private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    //common在ForkJoinPool静态代码块中被初始化	
    public static ForkJoinPool commonPool() {
        // assert common != null : "static init error";
        return common;
    }
	//e是一个ForkJoinPool
    static CompletableFuture asyncRunStage(Executor e, Runnable f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture d = new CompletableFuture();
        //封装成AsyncRun,AsyncRun继承了ForkJoinTask,实现了runnable接口,然后执行ForkJoinPool的execute
        //如果是supplyAsync这里会封装成一个AsyncSupply
        //thenRun封装成UniRun,thenAccept封装成UniAccept,thenApply封装成UniApply然后放到前一个任务的栈里,即后一个任务会被放进前一个任务的CompletableFuture的栈中:unipush(new UniRun...)
        e.execute(new AsyncRun(d, f));
        return d;
    }
    
     AsyncRun(CompletableFuture dep, Runnable fn) {
        this.dep = dep; this.fn = fn;
    }
    public void execute(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        ForkJoinTask job;
        //传入的是一个AsyncRun,AsyncRun继承了ForkJoinTask
        if (task instanceof ForkJoinTask) // avoid re-wrap
            job = (ForkJoinTask) task;
        else
            job = new ForkJoinTask.RunnableExecuteAction(task);
        //job被转换成ForkJoinTask
        externalPush(job);
    }
    final void externalPush(ForkJoinTask task) {
        WorkQueue[] ws; WorkQueue q; int m;
        //获得当前线程的探针
        int r = ThreadLocalRandom.getProbe();
        int rs = runState;
        //不会进if
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {
            ForkJoinTask[] a; int am, n, s;
            if ((a = q.array) != null &&
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                int j = ((am & s) << ASHIFT) + Abase;
                U.putOrderedObject(a, j, task);
                U.putOrderedInt(q, QTOP, s + 1);
                U.putIntVolatile(q, QLOCK, 0);
                if (n <= 1)
                    signalWork(ws, q);
                return;
            }
            U.compareAndSwapInt(q, QLOCK, 1, 0);
        }
        //task入队列并执行
        externalSubmit(task);
    }
    private void externalSubmit(ForkJoinTask task) {
        int r;                                    // initialize caller's probe
        //如果probe没有初始化,则初始化
        if ((r = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();
            r = ThreadLocalRandom.getProbe();
        }
        for (;;) {
            WorkQueue[] ws; WorkQueue q; int rs, m, k;
            boolean move = false;
            //判断ForkJoinPool的状态
            if ((rs = runState) < 0) {
                tryTerminate(false, false);     // help terminate
                throw new RejectedExecutionException();
            }
            else if ((rs & STARTED) == 0 ||     // initialize
                     ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                int ns = 0;
                rs = lockRunState();
                try {
                	//初始化workQueues
                    if ((rs & STARTED) == 0) {
                        U.compareAndSwapObject(this, STEALCOUNTER, null,
                                               new AtomicLong());
                        // create workQueues array with size a power of two
                        int p = config & SMASK; // ensure at least 2 slots
                        int n = (p > 1) ? p - 1 : 1;
                        n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                        n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                        workQueues = new WorkQueue[n];
                        ns = STARTED;
                    }
                } finally {
                    unlockRunState(rs, (rs & ~RSLOCK) | ns);
                }
            }
            else if ((q = ws[k = r & m & SQMASK]) != null) {
            	//加锁
                if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                	//获得队列中的ForkJoinTask数组
                    ForkJoinTask[] a = q.array;
                    int s = q.top;
                    boolean submitted = false; // initial submission or resizing
                    try {                      // locked version of push
                        if ((a != null && a.length > s + 1 - q.base) ||
                            (a = q.growArray()) != null) {
                            int j = (((a.length - 1) & s) << ASHIFT) + Abase;
                            //cas把task加入到ForkJoinTask数组中
                            U.putOrderedObject(a, j, task);
                            //更新下一个加入的task的数组索引
                            U.putOrderedInt(q, QTOP, s + 1);
                            submitted = true;
                        }
                    } finally {
                    	//解锁
                        U.compareAndSwapInt(q, QLOCK, 1, 0);
                    }
                    if (submitted) {
                    	//通知执行task
                        signalWork(ws, q);
                        return;
                    }
                }
                move = true;                   // move on failure
            }
            else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                q = new WorkQueue(this, null);
                q.hint = r;
                q.config = k | SHARED_QUEUE;
                q.scanState = INACTIVE;
                rs = lockRunState();           // publish index
                if (rs > 0 &&  (ws = workQueues) != null &&
                    k < ws.length && ws[k] == null)
                    ws[k] = q;                 // else terminated
                unlockRunState(rs, rs & ~RSLOCK);
            }
            else
                move = true;                   // move if busy
            if (move)
                r = ThreadLocalRandom.advanceProbe(r);
        }
    }
	//task已经添加到了workqueue中
    final void signalWork(WorkQueue[] ws, WorkQueue q) {
        long c; int sp, i; WorkQueue v; Thread p;
        while ((c = ctl) < 0L) {                       // too few active
            if ((sp = (int)c) == 0) {                  // no idle workers
                if ((c & ADD_WORKER) != 0L)            // too few workers
                	//添加工作线程
                    tryAddWorker(c);
                break;
            }
            if (ws == null)                            // unstarted/terminated
                break;
            if (ws.length <= (i = sp & SMASK))         // terminated
                break;
            if ((v = ws[i]) == null)                   // terminating
                break;
            int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
            int d = sp - v.scanState;                  // screen CAS
            long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
            if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                v.scanState = vs;                      // activate v
                if ((p = v.parker) != null)
                    U.unpark(p);
                break;
            }
            if (q != null && q.base == q.top)          // no more work
                break;
        }
    }
    private void tryAddWorker(long c) {
        boolean add = false;
        do {
            long nc = ((AC_MASK & (c + AC_UNIT)) |
                       (TC_MASK & (c + TC_UNIT)));
            if (ctl == c) {
                int rs, stop;                 // check if terminating
                if ((stop = (rs = lockRunState()) & STOP) == 0)
                    add = U.compareAndSwapLong(this, CTL, c, nc);
                unlockRunState(rs, rs & ~RSLOCK);
                if (stop != 0)
                    break;
                if (add) {
                	//添加worker
                    createWorker();
                    break;
                }
            }
        } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
    }
    private boolean createWorker() {
    	//factory在静态代码块中被初始化
        ForkJoinWorkerThreadFactory fac = factory;
        Throwable ex = null;
        ForkJoinWorkerThread wt = null;
        try {
        	//wt = ForkJoinWorkerThread,ForkJoinWorkerThread继承Thread
            if (fac != null && (wt = fac.newThread(this)) != null) {
            	//执行ForkJoinWorkerThread的start方法即执行它的run方法
                wt.start();
                return true;
            }
        } catch (Throwable rex) {
            ex = rex;
        }
        deregisterWorker(wt, ex);
        return false;
    } 
    public void run() {
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                onStart();
                //执行task
                pool.runWorker(workQueue);
            } catch (Throwable ex) {
                exception = ex;
            } finally {
                try {
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception);
                }
            }
        }
    }
    final void runWorker(WorkQueue w) {
        w.growArray();                   // allocate queue
        int seed = w.hint;               // initially holds randomization hint
        int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
        for (ForkJoinTask t;;) {
        	//获得task
            if ((t = scan(w, r)) != null)
            	//执行task
                w.runTask(t);
            else if (!awaitWork(w, r))
                break;
            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
        }
    }
        final void runTask(ForkJoinTask task) {
            if (task != null) {
                scanState &= ~SCANNING; // mark as busy
                //执行task
                (currentSteal = task).doExec();
                U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
                execLocalTasks();
                ForkJoinWorkerThread thread = owner;
                if (++nsteals < 0)      // collect on overflow
                    transferStealCount(pool);
                scanState |= SCANNING;
                if (thread != null)
                    thread.afterTopLevelExec();
            }
        }
    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
            	//ForkJoinTask一开始被传入的他的子类AsyncRun,所以这里执行AsyncRun的exec方法
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }
        public final boolean exec() { 
	        run(); 
	        return true; 
        }

        public void run() {
            CompletableFuture d; Runnable f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                    	//执行最开始自定义的runnable
                        f.run();
                        d.completeNull();
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                //thenRun/thenAccept/thenApply这种有后续的方法的话,会将压入栈中任务出栈执行
                d.postComplete();
            }
        }
supplyAsync方法
        //传入的是一个Supplier,表示他有返回值
        CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {
            @Override
            public String get() {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "返回结果";
            }
        });
        //获取返回值
        String result = future.get();
        System.out.println("任务执行结果:" + result);


supplyAsync原理和runAsync类似,只是在forkjoinpool.execute的时候传入的是AsyncSupply,所以后面执行的是
AsyncSupply的exec方法

        public final boolean exec() { run(); return true; }

        public void run() {
            CompletableFuture d; Supplier f;
            //获得最开始声明的CompletableFuture
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                    	//将Supplier执行后的值放到CompletableFuture中,CompletableFuture通过get方法获取
                        d.completevalue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }
thenRun
        CompletableFuture voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("supplyAsync run");
                Thread.sleep(2000);
                System.out.println("supplyAsync run end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "返回结果";
        }).thenRun(() -> {
            try {
                System.out.println("thenRun run");
                Thread.sleep(3000);
                System.out.println("thenRun run end");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任务执行结束之后执行的语句");
        });
        // 阻塞等待任务执行完成
        System.out.println("阻塞");
        voidCompletableFuture.get();
        System.out.println("任务执行结束");

thenAccept(Consumer)
        CompletableFuture future = CompletableFuture.supplyAsync(() ->
        {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("supplyAsync执行完毕");
            return "返回的结果";
        }).thenAccept(new Consumer() {
        	//参数为前面supplyAsync的结果
            @Override
            public void accept(String param) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("输出结果:" + param);
            }
        });
        // 等待任务执行完成
        future.get();
        System.out.println("任务执行完毕");

thenApply(Function)
        CompletableFuture future = CompletableFuture.supplyAsync(()-> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回supplyAsync结果;
            return "返回supplyAsync的结果";
        }).thenApply(new Function() {
        	//参数为前面supplyAsync的结果
            @Override
            public String apply(String middle) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //获取supplyAsync结果,返回");
                return middle+"再执行thenApply后返回";
            }
        });
        String str = future.get();
        System.out.println("最终的结果为:" + str);

thenCompose
        CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {
            @Override
            public String get() {
                return "hello world";
            }
        }).thenCompose(new Function>() {
            @Override
            public CompletionStage apply(String s) {
                return CompletableFuture.supplyAsync(new Supplier() {
                    @Override
                    public String get() {
                        String finalStr = s + "执行thenCompose中的supplyAsync然后返回";
                        return finalStr;
                    }
                });
            }
        });
        String str = future.get();
        System.out.println(str);

thenCombine
        CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {
            @Override
            public String get() {
                return "hello";
            }
            //thenCombine的第二个参数是一个BiFunction,对CompletableFuture的结果进行统一处理
        }).thenCombine(CompletableFuture.supplyAsync(new Supplier() {
            @Override
            public String get() {
                return "word";
            }
            //获得上面两个supplyAsync的返回结果并处理
        }), new BiFunction() {
            @Override
            public String apply(String s1, String s2) {
                return s1 +" "+ s2;
            }
        });
        String result = future.get();
        System.out.println(result);

allOf 和anyOf

thenCompose和thenCombine方法只能组合2个CompletableFuture,allOf 和anyOf 可以组合多个CompletableFuture

        Random RANDOM = new Random();
        CompletableFuture[] futures = new CompletableFuture[10];
        int[] ints = new int[10];
        for (int i = 0; i < 10; i++) {
            final int finalI = i;
            CompletableFuture future = CompletableFuture.runAsync(new Runnable() {
                @Override
                public void run() {
                    try {
                        //随机一个数并sleep
                        int intNext = RANDOM.nextInt(5000);
                        Thread.sleep(intNext);
                        //随机数放入数组
                        ints[finalI] = intNext;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            //future放入数组
            futures[i] = future;
        }
        CompletableFuture.allOf(futures).thenRun(new Runnable() {
//        CompletableFuture.anyOf(futures).thenRun(new Runnable() {
            @Override
            public void run() {
                System.out.println(Arrays.toString(ints));
            }
        }).get();

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4668659.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存