不幸的是,StreamAPI具有有限的功能来创建您自己的短路 *** 作。不是那么干净的解决方案是扔掉
RuntimeException并抓住它。这是的实现
IntStream,但也可以将其推广到其他流类型:
public static int reduceWithCancelEx(IntStream stream, int identity, IntBinaryOperator combiner, IntPredicate cancelCondition) { class CancelException extends RuntimeException { private final int val; CancelException(int val) { this.val = val; } } try { return stream.reduce(identity, (a, b) -> { int res = combiner.applyAsInt(a, b); if(cancelCondition.test(res)) throw new CancelException(res); return res; }); } catch (CancelException e) { return e.val; }}
用法示例:
int product = reduceWithCancelEx( IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println), 1, (a, b) -> a * b, val -> val == 0);System.out.println("Result: "+product);
输出:
23450Result: 0
请注意,即使它适用于并行流,也不能保证其他并行任务之一一旦抛出异常便会完成。已经开始的子任务可能会一直运行到完成,因此您处理的元素可能超出预期。
更新
:更长的时间,但更并行友好的替代解决方案。它基于自定义分隔符,该分隔符最多返回一个元素(这是所有基础元素的累加结果)。在顺序模式下使用它时,它将在一次
tryAdvance调用中完成所有工作。拆分时,每个部分都会生成相应的单个部分结果,Stream引擎会使用合并器功能将其减少。这是通用版本,但原始专业化也是可能的。
final static class CancellableReduceSpliterator<T, A> implements Spliterator<A>, Consumer<T>, Cloneable { private Spliterator<T> source; private final BiFunction<A, ? super T, A> accumulator; private final Predicate<A> cancelPredicate; private final AtomicBoolean cancelled = new AtomicBoolean(); private A acc; CancellableReduceSpliterator(Spliterator<T> source, A identity, BiFunction<A, ? super T, A> accumulator, Predicate<A> cancelPredicate) { this.source = source; this.acc = identity; this.accumulator = accumulator; this.cancelPredicate = cancelPredicate; } @Override public boolean tryAdvance(Consumer<? super A> action) { if (source == null || cancelled.get()) { source = null; return false; } while (!cancelled.get() && source.tryAdvance(this)) { if (cancelPredicate.test(acc)) { cancelled.set(true); break; } } source = null; action.accept(acc); return true; } @Override public void forEachRemaining(Consumer<? super A> action) { tryAdvance(action); } @Override public Spliterator<A> trySplit() { if(source == null || cancelled.get()) { source = null; return null; } Spliterator<T> prefix = source.trySplit(); if (prefix == null) return null; try { @SuppressWarnings("unchecked") CancellableReduceSpliterator<T, A> result = (CancellableReduceSpliterator<T, A>) this.clone(); result.source = prefix; return result; } catch (CloneNotSupportedException e) { throw new InternalError(); } } @Override public long estimateSize() { // let's pretend we have the same number of elements // as the source, so the pipeline engine parallelize it in the same way return source == null ? 0 : source.estimateSize(); } @Override public int characteristics() { return source == null ? SIZED : source.characteristics() & ORDERED; } @Override public void accept(T t) { this.acc = accumulator.apply(this.acc, t); }}
与
Stream.reduce(identity, accumulator,combiner)和类似的方法
Stream.reduce(identity,combiner),但具有
cancelPredicate:
public static <T, U> U reduceWithCancel(Stream<T> stream, U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner, Predicate<U> cancelPredicate) { return StreamSupport .stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity, accumulator, cancelPredicate), stream.isParallel()).reduce(combiner) .orElse(identity);}public static <T> T reduceWithCancel(Stream<T> stream, T identity, BinaryOperator<T> combiner, Predicate<T> cancelPredicate) { return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate);}
让我们测试两个版本并计算实际处理的元素数量。让我们
0结束一点。例外版本:
AtomicInteger count = new AtomicInteger();int product = reduceWithCancelEx( IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0) .parallel().peek(i -> count.incrementAndGet()), 1, (a, b) -> a * b, x -> x == 0);System.out.println("product: " + product + "/count: " + count);Thread.sleep(1000);System.out.println("product: " + product + "/count: " + count);
典型输出:
product: 0/count: 281721product: 0/count: 500001
因此,当仅处理某些元素时返回结果时,任务将继续在后台运行,并且计数器仍在增加。这是分离器版本:
AtomicInteger count = new AtomicInteger();int product = reduceWithCancel( IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0) .parallel().peek(i -> count.incrementAndGet()).boxed(), 1, (a, b) -> a * b, x -> x == 0);System.out.println("product: " + product + "/count: " + count);Thread.sleep(1000);System.out.println("product: " + product + "/count: " + count);
典型输出:
product: 0/count: 281353product: 0/count: 281353
返回结果后,所有任务实际上都已完成。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)