Java响应式编程–handle用法

Java响应式编程–handle用法,第1张

Java响应式编程–handle用法

JAVA响应式编程reactor中如果需要对一个flux中的数据进行提前返回 可以使用handle
具体如下:

@GetMapping("/a")
    public Mono a() throws InterruptedException {
        long begin = System.currentTimeMillis();
        Mono m5 = Mono.just("").map(s -> {
            try {
                TimeUnit.SECONDS.sleep(5);
                System.out.println("5");
                System.out.println(System.currentTimeMillis() - begin);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "5";
        }).subscribeOn(Schedulers.boundedElastic());

        Mono m8 = Mono.just("").map(s -> {
            try {
                TimeUnit.SECONDS.sleep(8);
                System.out.println("8");
                System.out.println(System.currentTimeMillis() - begin);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "8";
        }).subscribeOn(Schedulers.boundedElastic());
        Mono m10 = Mono.just("").map(s -> {
            try {
                TimeUnit.SECONDS.sleep(15);
                System.out.println("15");
                System.out.println(System.currentTimeMillis() - begin);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "15";
        }).subscribeOn(Schedulers.boundedElastic());
        
        Mono single = Flux.merge(m8, m5, m10).handle(((s, synchronousSink) -> {
            if ("8".equals(s)) {
                synchronousSink.next("卢本伟牛逼");
                synchronousSink.complete();
            }
        })).defaultIfEmpty("null").single();
        return single;
    }

 

subscribeOn(Schedulers.boundedElastic())是让这些mono进行异步处理

上述定义了3个Mono进行请求 使用Sleep进行模拟请求中业务处理耗时
m5是需要执行3s m8是需要执行8s m10是需要执行15s
如果我们有多个耗时处理的请求 需要同时请求 然后又需要合并结果 并且返回我们需要的结果 进行提前返回。
例如:我需要同时请求这些接口 当8这个接口有结果返回的时候 由于其他请求时间太长了 需要提前返回一个值 放弃其他结果处理 所以我们可以进行判断.

//这个s代表前面传入的元素 执行完会传入这个handle  
// synchronousSink.next() 代表我执行完这个流返回给下一个进行处理  next(Object)  
// synchronousSink.complete(); 代表我们执行完取消其他的流
Flux.merge(m8, m5, m10).handle(((s, synchronousSink) -> {

            if ("8".equals(s)) {
                synchronousSink.next("卢本伟牛逼");
                synchronousSink.complete();
            }
        })).subscribe(s -> {
                    System.err.println(s);
                });

由于上面当s为8的时候 将卢本伟牛逼传入下一个流 然后执行完成 所以还有个m10不会执行完 就会直接丢弃
我们进行测试
发现控制台打印
我们如果使用传统的mvc执行m5我们会耗时5s m8会耗时8s 加起来就是13s
我们使用响应式编程则只会耗时8s 当如果多个耗时 *** 作拼接在一起 我们需要多个返回结果的时候我们可以使用handle进行提前返回

返回结果为卢本伟牛逼 也就是我们之前当s为8时执行的 ,synchronousSink.next(“卢本伟牛逼”);,将这个字符串传给最终结果的流,当然我们也可以根据自己的逻辑 发放多个synchronousSink.next。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5677588.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)