Mono/
Flux
2,如果您 的一劳永逸 确实阻止了I / Opublic Flux<Data> search(SearchRequest request){ return searchService.search(request) .collectList() .doonNext(data -> doThisAsync(data).subscribe()) // add error logging here or inside doThisAsync .flatMapMany(Flux::fromIterable);}public Mono<Void> doThisAsync(List<Data> data) { //do some async/non-blocking processing here like calling WebClient}
public Flux<Data> search(SearchRequest request){ return searchService.search(request) .collectList() .doonNext(data -> Mono.fromRunnable(() -> doThisAsync(data)) .subscribeOn(Schedulers.elastic()) // delegate to proper thread to not block main flow .subscribe()) // add error logging here or inside doThisAsync .flatMapMany(Flux::fromIterable);}public void doThisAsync(List<Data> data) { //do some blocking I/O on calling thread}
请注意,在上述两种情况下,您都会失去反压支持。如果由于
doAsyncThis某种原因减慢了速度,那么数据生产者将不在乎并继续生产项目。这是“火与火”机制的自然结果。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)