ForkJoin框架

ForkJoin框架,第1张

目录

一、介绍

二、工作窃取算法

三、Fork/Join实例

1. 分割任务

2. ForkJoinPool线程池执行任务

四、Fork/Join框架实现原理

1. ForkJoinPool线程池

2. ForkJoinTask

3. fork()

4. join()

五、参考资料


一、介绍

        Fork/Join框架是一个并行执行任务的框架,目的是把大任务分割成若干个子任务(每个任务否是一个Fork,且每个任务还可以再分割),最终每个任务执行结果合并到总结果。如下图所示,是Fork/Join的运行流程图。

Fork/Join的运行流程图

        如上图看出,Fork就是把一个大任务切分为若干子任务并行的执行;Join就是合并子任务的执行结果

二、工作窃取算法

        工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。如下图所示,线程1负责处理A队列里的任务,线程2负责处理B队列里的任务。线程2已经把自己队列里的任务干完,于是线程2就去线程1的队列里窃取一个任务来执行。        

        工作窃取算法的特性:

  • 优点:并行计算,来提高执行效率;双端队列,减少线程间竞争
  • 缺点:双端队列只有一个任务时,也会产生线程间竞争。

        如上图所示, 采用双端队列,是为了减少窃取任务线程和被窃取任务线程之间的竞争。被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

三、Fork/Join实例 1. 分割任务

        分割任务就是把大任务分割成多个子任务。如下代码所示,分割阈值THRESHOLD为20,较小的任务不用分割,直接计算累加求和。

/**
 * @description 任务分割
 * @author TCM
 * @version 1.0
 * @date 2022/4/13 16:11
 **/
public class SumTask extends RecursiveTask {

    // 分割子任务阈值
    private static final int THRESHOLD = 20;
    private int start;
    private int end;

    public SumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        boolean canCompute = (end - start) <= THRESHOLD;
        // 如果小于阈值,直接调用最小任务的计算方法
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任务大于阈值,就分裂成两个子任务计算,即:2个fork
            int middle = (start + end) / 2;
            SumTask leftTask = new SumTask(start, middle);
            SumTask rightTask = new SumTask(middle + 1, end);
            // 执行子任务
            leftTask.fork();
            rightTask.fork();
            // 等待子任务执行完,并得到其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任务
            sum = leftResult + rightResult;
            LogUtil.info("当前计算结果为:" + sum);
        }

        return sum;
    }
}
2. ForkJoinPool线程池执行任务

        创建任务后及任务分割后的子任务,都交给ForkJoinPool线程池来执行任务。

public static void main(String[] args) {
    // ForkJoinPool执行分割的任何
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    // 生成一个计算任务,负责计算1+2+3+4+...+100
    SumTask task = new SumTask(1, 100);
    // 执行一个任务
    Future result = forkJoinPool.submit(task);
    try {
        System.out.println(result.get());
    } catch (Exception e) {
        e.printStackTrace();
    }
}
//执行结果
20:13:51.871 [ForkJoinPool-1-worker-2] INFO LOG_INFO - 127.0.0.1||^当前计算结果为:1575
20:13:51.871 [ForkJoinPool-1-worker-4] INFO LOG_INFO - 127.0.0.1||^当前计算结果为:950
20:13:51.871 [ForkJoinPool-1-worker-9] INFO LOG_INFO - 127.0.0.1||^当前计算结果为:325
20:13:51.871 [ForkJoinPool-1-worker-6] INFO LOG_INFO - 127.0.0.1||^当前计算结果为:2200
20:13:51.874 [ForkJoinPool-1-worker-9] INFO LOG_INFO - 127.0.0.1||^当前计算结果为:1275
20:13:51.874 [ForkJoinPool-1-worker-11] INFO LOG_INFO - 127.0.0.1||^当前计算结果为:3775
20:13:51.874 [ForkJoinPool-1-worker-9] INFO LOG_INFO - 127.0.0.1||^当前计算结果为:5050
计算结果:1+2+3+4+...+100 = 5050
四、Fork/Join框架实现原理 1. ForkJoinPool线程池

        ForkJoinPool线程池会创建多个ForkJoinWorkerThread实例的线程,ForkJoinWorkerThread线程用来执行任务。每个线程都有一个工作队列java.util.concurrent.ForkJoinPool.WorkQueue,该队列有属性ForkJoinTask[] array,用来存放当前线程需要执行的任务。所以,ForkJoinTask数组负责存放提交的任务,ForkJoinWorkerThread线程负责执行这些任务。而如上章节《介绍》图中所示。

2. ForkJoinTask

         ForkJoinTask定义了执行的任务。如下图所示是ForkJoinTask的实现类。如上章节《Fork/Join实例》中的分割任务,不需要直接继承ForkJoinTask类,只需要继承它的子类即可

        覆写子类中的compute(),规定符合阈值时,任务再分割多个子任务。

3. fork()

        如下代码所示。任务调用fork(),push(this)把当前任务存放到ForkJoinTask[]中,并通过线程池调用signalWork()方法来唤醒和创建一个工作线程ForkJoinWorkerThread执行任务

// 执行子任务
public final ForkJoinTask fork() {
    Thread t;
    // 判定当前线程是否是ForkJoinWorkerThread实例线程
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        // 把调用fork()的任务加入到ForkJoinWorkerThread实例线程的workQueue工作队列的ForkJoinTask[]中
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

// 当前任务添加到ForkJoinTask[]中,并唤醒线程执行任务
final void push(ForkJoinTask task) {
    ForkJoinTask[] a; ForkJoinPool p;
    int b = base, s = top, n;
    if ((a = array) != null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
        U.putOrderedInt(this, QTOP, s + 1);
        if ((n = s - b) <= 1) {
            if ((p = pool) != null)
                // 唤醒和创建一个工作线程ForkJoinWorkerThread来执行任务
                p.signalWork(p.workQueues, this);
        }
        else if (n >= m)
            growArray();
    }
}
4. join()

         如下代码所示。任务调用join(),阻塞执行当前任务的线程,来获取执行结果。根据doJoin()返回的任务状态,只有状态等于NORMAL时,表示任务正常结束。

// 获取当前线程执行结果
public final V join() {
    int s;
    // doJoin()返回任务状态来判断执行结果
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    // 状态 == NORMAL时,表示任务正常结束,获取执行结果
    return getRawResult();
}
五、参考资料

Fork/Join框架原理解析_华山拎壶冲的博客-CSDN博客_fork join

ForkJoin框架使用和原理剖析_codingtu的博客-CSDN博客_forkjointask

Fork/Join框架的理解和使用 - 简书

Java并发编程|Fork/Join框架机制详解

Fork/Join框架详解 - 木易森林 - 博客园

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/789911.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-05
下一篇 2022-05-05

发表评论

登录后才能评论

评论列表(0条)

保存