严格来讲,分而治之不算一种模式,而是一种思想。它可以将一个大任务拆解为若干个小任务并行执行,提高系统吞吐量。主要讲两个场景,Master-Worker 模式,ForkJoin 线程池。
ForkJoin 线程池是 jdk7 之后引入的一个并行执行任务的框架,其核心思想也是将任务分割为子任务,有可能子任务还是很大,还需要进一步拆解,最终得到足够小的任务。将分割出来的子任务放入双端队列中,然后几个启动线程从双端队列中获取任务执行。子任务执行的结果放到一个队列里,另起线程从队列中获取数据,合并结果。
二、ForkJoin 与传统线程池的区别采用 “工作窃取”模式(work-stealing):当执行新的任务时,它可以将其拆分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。 相较于一般的线程池,ForkJoin 的优势体现在对其中包含的任务的处理方式上。在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而 ForkJoin,如果某个子问题由于等待另外一个子问题的完成而无法继续运行,那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行。这种方式减少了线程的等待时间,提高了性能。
三、案例计算从 0 到 10000000L 的累加求和。CountTask 继承自 RecursiveTask,可以携带返回值。每次分解大任务,简单的将任务划分为 100 个等规模的小任务,并使用 fork() 提交子任务。在子任务中通过 THRESHOLD(门槛) 设置子任务分解的阈值,如果当前需要求和的总数大于 THRESHOLD,则子任务需要再次分解,如果子任务可以直接执行,则进行求和 *** 作,返回结果。最终等待所有的子任务执行完毕,对所有结果求和。
public class CountTask extends RecursiveTask{ //任务分解的阈值 private static final int THRESHOLD = 10000; private long start; private long end; public CountTask(long start, long end) { this.start = start; this.end = end; } public Long compute() { long sum = 0; boolean canCompute = (end - start) < THRESHOLD; if (canCompute) { for (long i = start; i <= end; i++) { sum += i; } } else { //分成100个小任务 long step = (start + end) / 100; ArrayList subTasks = new ArrayList (); long pos = start; for (int i = 0; i < 100; i++) { long lastOne = pos + step; if (lastOne > end) { lastOne = end; } CountTask subTask = new CountTask(pos, lastOne); pos += step + 1; //将子任务推向线程池 subTasks.add(subTask); subTask.fork(); } for (CountTask task : subTasks) { //对结果进行join sum += task.join(); } } return sum; } public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool pool = new ForkJoinPool(); // 累加求和 0 -> 10000000L CountTask task = new CountTask(0, 10000000L); ForkJoinTask result = pool.submit(task); System.out.println("sum result : " + result.get()); } }
ForkJoin 线程池使用一个无锁的栈来管理空闲线程,如果一个工作线程暂时取不到可用的任务,则可能被挂起。挂起的线程将被压入由线程池维护的栈中,待将来有任务可用时,再从栈中唤醒这些线程。Java8 的并行流就是基于 ForkJoin,并进行了优化。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)