java使用parallelStream并行流取两个list的交集【源码解析】

java使用parallelStream并行流取两个list的交集【源码解析】,第1张

java使用parallelStream并行流取两个list的交集【源码解析】 java使用parallelStream流取两个list的交集 1.背景

java开发在实际的项目中,会有取两个list中的交集的场景,可以通过传统的for循环进行处理,数据量1万以内,for循环的性能要高于foreach和stream的,如果数据量在十万、百万以上时,那么三者的差别不是很大,但是Stream提供了并行流parallelStream方法,可以充分利用CPU核数,大大的提升效率。

2.示例
    public static void main(String[] args) {
        List listOne = new ArrayList<>();
        List listTwo = new ArrayList<>();
        for(int i = 0;i<20000;i++){
            listOne.add(i);
        }
        for(int i = 888;i<20000;i++){
            listTwo.add(i);
        }

        long startOne = System.currentTimeMillis();
        List intersectionList = listOne.parallelStream().filter(item -> listTwo.contains(item)).collect(Collectors.toList());
        long endOne = System.currentTimeMillis();
        System.out.println("---交集intersectionList的大小---"+intersectionList.size());
        System.out.println("---parallelStream处理时间---"+(endOne-startOne));
        
    }


【注】:电脑配置的原因,并未测试更大数据量了

3.parallelStream源码解析

类关系图:

源码:

    default Stream parallelStream() {
        //调用StreamSupport的stream
        return StreamSupport.stream(spliterator(), true);
    }
    public static  Stream stream(Spliterator spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        //ReferencePipeline的静态内部实现类Head类
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

Head类重写了ReferencePipeline类的几个方法:

    static class Head extends ReferencePipeline {

        Head(Supplier> source,
             int sourceFlags, boolean parallel) {
            super(source, sourceFlags, parallel);
        }

        Head(Spliterator source,
             int sourceFlags, boolean parallel) {
            super(source, sourceFlags, parallel);
        }

        @Override
        final boolean opIsStateful() {
            throw new UnsupportedOperationException();
        }

        @Override
        final Sink opWrapSink(int flags, Sink sink) {
            throw new UnsupportedOperationException();
        }


        @Override
        public void forEach(Consumer action) {
            //不是并行流
            if (!isParallel()) {
                sourceStageSpliterator().forEachRemaining(action);
            }
            else {
            //是并行流,调用父类处理方法
                super.forEach(action);
            }
        }

        @Override
        public void forEachOrdered(Consumer action) {
            //不是并行流
            if (!isParallel()) {
                sourceStageSpliterator().forEachRemaining(action);
            }
            else {
            //是并行流,调用父类处理方法
                super.forEachOrdered(action);
            }
        }
    }

父类中的forEach和forEachOrdered方法会调用evaluate此方法,最终并行流会调用evaluateParallel方法,串行流调用evaluateSequential方法。

    final  R evaluate(TerminalOp terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_linkED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

并行流evaluateParallel的实现:

查看ForEachOps类的内部类的ForEachOp类中的evaluateParallel实现:

      @Override
        public  Void evaluateParallel(PipelineHelper helper,
                                         Spliterator spliterator) {
            //有序
            if (ordered)
                new ForEachOrderedTask<>(helper, spliterator, this).invoke();
            else
            //无序
                new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
            return null;
        }

都调用ForkJoinTask的invoke方法:

    public final V invoke() {
        int s;
        if ((s = doInvoke() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }

在CountedCompleter中的实现类中的compute方法中调用了ForkJoinPool的fork方法,使用ForkJoinPool线程池并行计算来提高效率的,但是如果使用不当可能会发生线程安全的问题。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/4968333.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-13
下一篇 2022-11-13

发表评论

登录后才能评论

评论列表(0条)