ForkJoin线程池

ForkJoin线程池,第1张

ForkJoin线程池 一、分而治之

严格来讲,分而治之不算一种模式,而是一种思想。它可以将一个大任务拆解为若干个小任务并行执行,提高系统吞吐量。主要讲两个场景,Master-Worker 模式,ForkJoin 线程池。

ForkJoin 线程池是 jdk7 之后引入的一个并行执行任务的框架,其核心思想也是将任务分割为子任务,有可能子任务还是很大,还需要进一步拆解,最终得到足够小的任务。将分割出来的子任务放入双端队列中,然后几个启动线程从双端队列中获取任务执行。子任务执行的结果放到一个队列里,另起线程从队列中获取数据,合并结果。

二、ForkJoin 与传统线程池的区别

采用 “工作窃取”模式(work-stealing):当执行新的任务时,它可以将其拆分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。 相较于一般的线程池,ForkJoin 的优势体现在对其中包含的任务的处理方式上。在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而 ForkJoin,如果某个子问题由于等待另外一个子问题的完成而无法继续运行,那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行。这种方式减少了线程的等待时间,提高了性能。

三、案例

计算从 0 到 10000000L 的累加求和。CountTask 继承自 RecursiveTask,可以携带返回值。每次分解大任务,简单的将任务划分为 100 个等规模的小任务,并使用 fork() 提交子任务。在子任务中通过 THRESHOLD(门槛) 设置子任务分解的阈值,如果当前需要求和的总数大于 THRESHOLD,则子任务需要再次分解,如果子任务可以直接执行,则进行求和 *** 作,返回结果。最终等待所有的子任务执行完毕,对所有结果求和。

public class CountTask extends RecursiveTask {
    //任务分解的阈值
    private static final int THRESHOLD = 10000;
    private long start;
    private long end;

    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    public Long compute() {
        long sum = 0;
        boolean canCompute = (end - start) < THRESHOLD;
        if (canCompute) {
            for (long i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            //分成100个小任务
            long step = (start + end) / 100;
            ArrayList subTasks = new ArrayList();
            long pos = start;
            for (int i = 0; i < 100; i++) {
                long lastOne = pos + step;
                if (lastOne > end) {
                    lastOne = end;
                }
                CountTask subTask = new CountTask(pos, lastOne);
                pos += step + 1;
                //将子任务推向线程池
                subTasks.add(subTask);
                subTask.fork();
            }

            for (CountTask task : subTasks) {
                //对结果进行join
                sum += task.join();
            }
        }
        return sum;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        // 累加求和 0 -> 10000000L
        CountTask task = new CountTask(0, 10000000L);
        ForkJoinTask result = pool.submit(task);
        System.out.println("sum result : " + result.get());
    }
}

ForkJoin 线程池使用一个无锁的栈来管理空闲线程,如果一个工作线程暂时取不到可用的任务,则可能被挂起。挂起的线程将被压入由线程池维护的栈中,待将来有任务可用时,再从栈中唤醒这些线程。Java8 的并行流就是基于 ForkJoin,并进行了优化。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5707181.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存