如何使Stream上的reduce() *** 作短路?

如何使Stream上的reduce() *** 作短路?,第1张

如何使Stream上的reduce() *** 作短路?

不幸的是,StreamAPI具有有限的功能来创建您自己的短路 *** 作。不是那么干净的解决方案是扔掉

RuntimeException
并抓住它。这是的实现
IntStream
,但也可以将其推广到其他流类型:

public static int reduceWithCancelEx(IntStream stream, int identity, IntBinaryOperator combiner, IntPredicate cancelCondition) {    class CancelException extends RuntimeException {        private final int val;        CancelException(int val) { this.val = val;        }    }    try {        return stream.reduce(identity, (a, b) -> { int res = combiner.applyAsInt(a, b); if(cancelCondition.test(res))     throw new CancelException(res); return res;        });    } catch (CancelException e) {        return e.val;    }}

用法示例:

int product = reduceWithCancelEx(        IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println),         1, (a, b) -> a * b, val -> val == 0);System.out.println("Result: "+product);

输出:

23450Result: 0

请注意,即使它适用于并行流,也不能保证其他并行任务之一一旦抛出异常便会完成。已经开始的子任务可能会一直运行到完成,因此您处理的元素可能超出预期。

更新
:更长的时间,但更并行友好的替代解决方案。它基于自定义分隔符,该分隔符最多返回一个元素(这是所有基础元素的累加结果)。在顺序模式下使用它时,它将在一次

tryAdvance
调用中完成所有工作。拆分时,每个部分都会生成相应的单个部分结果,Stream引擎会使用合并器功能将其减少。这是通用版本,但原始专业化也是可能的。

final static class CancellableReduceSpliterator<T, A> implements Spliterator<A>,        Consumer<T>, Cloneable {    private Spliterator<T> source;    private final BiFunction<A, ? super T, A> accumulator;    private final Predicate<A> cancelPredicate;    private final AtomicBoolean cancelled = new AtomicBoolean();    private A acc;    CancellableReduceSpliterator(Spliterator<T> source, A identity, BiFunction<A, ? super T, A> accumulator, Predicate<A> cancelPredicate) {        this.source = source;        this.acc = identity;        this.accumulator = accumulator;        this.cancelPredicate = cancelPredicate;    }    @Override    public boolean tryAdvance(Consumer<? super A> action) {        if (source == null || cancelled.get()) { source = null; return false;        }        while (!cancelled.get() && source.tryAdvance(this)) { if (cancelPredicate.test(acc)) {     cancelled.set(true);     break; }        }        source = null;        action.accept(acc);        return true;    }    @Override    public void forEachRemaining(Consumer<? super A> action) {        tryAdvance(action);    }    @Override    public Spliterator<A> trySplit() {        if(source == null || cancelled.get()) { source = null; return null;        }        Spliterator<T> prefix = source.trySplit();        if (prefix == null) return null;        try { @SuppressWarnings("unchecked") CancellableReduceSpliterator<T, A> result =      (CancellableReduceSpliterator<T, A>) this.clone(); result.source = prefix; return result;        } catch (CloneNotSupportedException e) { throw new InternalError();        }    }    @Override    public long estimateSize() {        // let's pretend we have the same number of elements        // as the source, so the pipeline engine parallelize it in the same way        return source == null ? 0 : source.estimateSize();    }    @Override    public int characteristics() {        return source == null ? SIZED : source.characteristics() & ORDERED;    }    @Override    public void accept(T t) {        this.acc = accumulator.apply(this.acc, t);    }}

Stream.reduce(identity, accumulator,combiner)
和类似的方法
Stream.reduce(identity,combiner)
,但具有
cancelPredicate

public static <T, U> U reduceWithCancel(Stream<T> stream, U identity,        BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner,        Predicate<U> cancelPredicate) {    return StreamSupport .stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity,         accumulator, cancelPredicate), stream.isParallel()).reduce(combiner) .orElse(identity);}public static <T> T reduceWithCancel(Stream<T> stream, T identity,        BinaryOperator<T> combiner, Predicate<T> cancelPredicate) {    return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate);}

让我们测试两个版本并计算实际处理的元素数量。让我们

0
结束一点。例外版本:

AtomicInteger count = new AtomicInteger();int product = reduceWithCancelEx(        IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)     .parallel().peek(i -> count.incrementAndGet()), 1,        (a, b) -> a * b, x -> x == 0);System.out.println("product: " + product + "/count: " + count);Thread.sleep(1000);System.out.println("product: " + product + "/count: " + count);

典型输出:

product: 0/count: 281721product: 0/count: 500001

因此,当仅处理某些元素时返回结果时,任务将继续在后台运行,并且计数器仍在增加。这是分离器版本:

AtomicInteger count = new AtomicInteger();int product = reduceWithCancel(        IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)     .parallel().peek(i -> count.incrementAndGet()).boxed(),      1, (a, b) -> a * b, x -> x == 0);System.out.println("product: " + product + "/count: " + count);Thread.sleep(1000);System.out.println("product: " + product + "/count: " + count);

典型输出:

product: 0/count: 281353product: 0/count: 281353

返回结果后,所有任务实际上都已完成。



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5475461.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-12
下一篇 2022-12-12

发表评论

登录后才能评论

评论列表(0条)

保存