java开发在实际的项目中,会有取两个list中的交集的场景,可以通过传统的for循环进行处理,数据量1万以内,for循环的性能要高于foreach和stream的,如果数据量在十万、百万以上时,那么三者的差别不是很大,但是Stream提供了并行流parallelStream方法,可以充分利用CPU核数,大大的提升效率。
2.示例public static void main(String[] args) { ListlistOne = new ArrayList<>(); List listTwo = new ArrayList<>(); for(int i = 0;i<20000;i++){ listOne.add(i); } for(int i = 888;i<20000;i++){ listTwo.add(i); } long startOne = System.currentTimeMillis(); List intersectionList = listOne.parallelStream().filter(item -> listTwo.contains(item)).collect(Collectors.toList()); long endOne = System.currentTimeMillis(); System.out.println("---交集intersectionList的大小---"+intersectionList.size()); System.out.println("---parallelStream处理时间---"+(endOne-startOne)); }
【注】:电脑配置的原因,并未测试更大数据量了
类关系图:
源码:
default StreamparallelStream() { //调用StreamSupport的stream return StreamSupport.stream(spliterator(), true); }
public staticStream stream(Spliterator spliterator, boolean parallel) { Objects.requireNonNull(spliterator); //ReferencePipeline的静态内部实现类Head类 return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }
Head类重写了ReferencePipeline类的几个方法:
static class Headextends ReferencePipeline { Head(Supplier extends Spliterator>> source, int sourceFlags, boolean parallel) { super(source, sourceFlags, parallel); } Head(Spliterator> source, int sourceFlags, boolean parallel) { super(source, sourceFlags, parallel); } @Override final boolean opIsStateful() { throw new UnsupportedOperationException(); } @Override final Sink opWrapSink(int flags, Sink sink) { throw new UnsupportedOperationException(); } @Override public void forEach(Consumer super E_OUT> action) { //不是并行流 if (!isParallel()) { sourceStageSpliterator().forEachRemaining(action); } else { //是并行流,调用父类处理方法 super.forEach(action); } } @Override public void forEachOrdered(Consumer super E_OUT> action) { //不是并行流 if (!isParallel()) { sourceStageSpliterator().forEachRemaining(action); } else { //是并行流,调用父类处理方法 super.forEachOrdered(action); } } }
父类中的forEach和forEachOrdered方法会调用evaluate此方法,最终并行流会调用evaluateParallel方法,串行流调用evaluateSequential方法。
finalR evaluate(TerminalOp terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_linkED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }
并行流evaluateParallel的实现:
查看ForEachOps类的内部类的ForEachOp类中的evaluateParallel实现:
@Override publicVoid evaluateParallel(PipelineHelperhelper, Spliterator spliterator) { //有序 if (ordered) new ForEachOrderedTask<>(helper, spliterator, this).invoke(); else //无序 new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke(); return null; }
都调用ForkJoinTask的invoke方法:
public final V invoke() { int s; if ((s = doInvoke() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
在CountedCompleter中的实现类中的compute方法中调用了ForkJoinPool的fork方法,使用ForkJoinPool线程池并行计算来提高效率的,但是如果使用不当可能会发生线程安全的问题。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)