reactive stream与spring-webflux初体验

reactive stream与spring-webflux初体验,第1张

目录

写在前面

一、reactive stream(jdk9)

 发布-订阅

二、异步servlet

同步servlet

异步servlet 

三、reactor = jdk8 stream + jdk9 reactive stream

四、webflux初体验

五、SSE(server sent events)


写在前面

Spring Framework 中包含的原始 Web 框架 Spring Web MVC 是专门为 Servlet API 和 Servlet 容器构建的。反应式堆栈 Web 框架 Spring WebFlux 是在 5.0 版的后期添加的。它是完全非阻塞的,支持反应式流(Reactive Stream)背压,并在Netty,Undertow和Servlet 3.1 +容器等服务器上运行。

Spring WebFlux 是一个异步非阻塞式 IO 模型,通过少量的容器线程就可以支撑大量的并发访问。底层使用的是 Netty 容器,这点也和传统的 SpringMVC 不一样,SpringMVC 是基于 Servlet 的。

在此,咱们也简单体验一下spring-webflux到底是个什么东西。

学习webflux之前一定要将jdk8的stream学习明白了哟~

函数式编程-lambda函数与jdk8自带的函数接口_秃了也弱了。的博客-CSDN博客

jdk8-stream深入详解与运行机制_秃了也弱了。的博客-CSDN博客

一、reactive stream(jdk9)

1.Publisher接口发布者
subscribe(Subscriber):void:保证发布者和订阅者之间通过此方法建立订阅关系


2.Subscriber订阅者
onSubscribe(Subscription):void:第一此签署订阅关系,输入就是Subscription对象
onNext(T):void:接收到一条数据
onError(Throwable):void:数据出错
onComplete():void:数据处理完了


3.Subscription发布者与订阅者之间的关系
request(long):void:告诉发布者需要多少资源
cancel():void


4.Processor接口继承了Publisher和Subscriber,表示既可以做消费者,又可以做发布者,承担中间角色
defaultBufferSize():int
 

 发布-订阅
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;

public class FlowDemo {

    public static void main(String[] args) throws Exception {
        // 1. 定义发布者, 发布的数据类型是 Integer
        // 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口
        SubmissionPublisher publiser = new SubmissionPublisher();

        // 2. 定义订阅者
        Subscriber subscriber = new Subscriber() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存订阅关系, 需要用它来给发布者响应
                this.subscription = subscription;

                // 请求一个数据
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受到一个数据, 处理
                System.out.println("接受到数据: " + item);

                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                // 处理完调用request再请求一个数据
                this.subscription.request(1);

                // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出现了异常(例如处理数据的时候产生了异常)
                throwable.printStackTrace();


                // 我们可以告诉发布者, 后面不接受数据了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部数据处理完了(发布者关闭了)
                System.out.println("处理完了!");
            }

        };

        // 3. 发布者和订阅者 建立订阅关系
        publiser.subscribe(subscriber);

        // 4. 生产数据, 并发布
        // 这里忽略数据生产过程
        for (int i = 0; i < 1000; i++) {
            System.out.println("生成数据:" + i);
            // submit是个block方法
            publiser.submit(i);
        }

        // 5. 结束后 关闭发布者
        // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
        publiser.close();

        // 主线程延迟停止, 否则数据没有消费就退出
        Thread.currentThread().join(1000);
        // debug的时候, 下面这行需要有断点
        // 否则主线程结束无法debug
        System.out.println();
    }

}
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

/**
* 带 process 的 flow demo
*/

/**
* Processor, 需要继承SubmissionPublisher并实现Processor接口
*
* 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
*/
class MyProcessor extends SubmissionPublisher
        implements Processor {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        // 保存订阅关系, 需要用它来给发布者响应
        this.subscription = subscription;

        // 请求一个数据
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        // 接受到一个数据, 处理
        System.out.println("处理器接受到数据: " + item);

        // 过滤掉小于0的, 然后发布出去
        if (item > 0) {
            this.submit("转换后的数据:" + item);
        }

        // 处理完调用request再请求一个数据
        this.subscription.request(1);

        // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
        // 出现了异常(例如处理数据的时候产生了异常)
        throwable.printStackTrace();


        // 我们可以告诉发布者, 后面不接受数据了
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        // 全部数据处理完了(发布者关闭了)
        System.out.println("处理器处理完了!");
        // 关闭发布者
        this.close();
    }
}

public class FlowDemo2 {

    public static void main(String[] args) throws Exception {
        // 1. 定义发布者, 发布的数据类型是 Integer
        // 直接使用jdk自带的SubmissionPublisher
        SubmissionPublisher publiser = new SubmissionPublisher();

        // 2. 定义处理器, 对数据进行过滤, 并转换为String类型
        MyProcessor processor = new MyProcessor();

        // 3. 发布者 和 处理器 建立订阅关系
        publiser.subscribe(processor);

        // 4. 定义最终订阅者, 消费 String 类型数据
        Subscriber subscriber = new Subscriber() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存订阅关系, 需要用它来给发布者响应
                this.subscription = subscription;

                // 请求一个数据
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                // 接受到一个数据, 处理
                System.out.println("接受到数据: " + item);

                // 处理完调用request再请求一个数据
                this.subscription.request(1);

                // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出现了异常(例如处理数据的时候产生了异常)
                throwable.printStackTrace();


                // 我们可以告诉发布者, 后面不接受数据了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部数据处理完了(发布者关闭了)
                System.out.println("处理完了!");
            }

        };

        // 5. 处理器 和 最终订阅者 建立订阅关系
        processor.subscribe(subscriber);

        // 6. 生产数据, 并发布
        // 这里忽略数据生产过程
        publiser.submit(-111);
        publiser.submit(111);

        // 7. 结束后 关闭发布者
        // 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
        publiser.close();

        // 主线程延迟停止, 否则数据没有消费就退出
        Thread.currentThread().join(1000);
    }

}
二、异步servlet

普通的springmvc就是同步servlet,请求处理线程是阻塞的。

webflux就像是异步servlet,请求接收之后,业务处理交给后台的业务处理线程来执行。

同步servlet
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
* Servlet implementation class SyncServlet
*/
@WebServlet("/SyncServlet")
public class SyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    /**
     * @see HttpServlet#HttpServlet()
     */
    public SyncServlet() {
        super();
        // TODO Auto-generated constructor stub
    }

    /**
     * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();

        // 执行业务代码
        doSomeThing(request, response);

        System.out.println("sync use:" + (System.currentTimeMillis() - t1));
    }

    private void doSomeThing(HttpServletRequest request,
            HttpServletResponse response) throws IOException {

        // 模拟耗时 *** 作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
        }

        //
        response.getWriter().append("done");
    }

    /**
     * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // TODO Auto-generated method stub
        doGet(request, response);
    }

}
异步servlet 
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
* Servlet implementation class AsyncServlet
*/
@WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" })
public class AsyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    /**
     * @see HttpServlet#HttpServlet()
     */
    public AsyncServlet() {
        super();
        // TODO Auto-generated constructor stub
    }

    /**
     * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();

        // 开启异步
        AsyncContext asyncContext = request.startAsync();

        // 执行业务代码,为了方便使用jdk8的future
        CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
                asyncContext.getRequest(), asyncContext.getResponse()));

        System.out.println("async use:" + (System.currentTimeMillis() - t1));
    }

    private void doSomeThing(AsyncContext asyncContext,
            ServletRequest servletRequest, ServletResponse servletResponse) {

        // 模拟耗时 *** 作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
        }

        //
        try {
            servletResponse.getWriter().append("done");
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 业务代码处理完毕, 通知结束
        asyncContext.complete();
    }

    /**
     * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // TODO Auto-generated method stub
        doGet(request, response);
    }

}
三、reactor = jdk8 stream + jdk9 reactive stream
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;

public class ReactorDemo {

   public static void main(String[] args) {
      // reactor = jdk8 stream + jdk9 reactive stream
      // Mono 0-1个元素
      // Flux 0-N个元素
      String[] strs = { "1", "2", "3" };

      // 2. 定义订阅者
      Subscriber subscriber = new Subscriber() {

         private Subscription subscription;

         @Override
         public void onSubscribe(Subscription subscription) {
            // 保存订阅关系, 需要用它来给发布者响应
            this.subscription = subscription;

            // 请求一个数据
            this.subscription.request(1);
         }

         @Override
         public void onNext(Integer item) {
            // 接受到一个数据, 处理
            System.out.println("接受到数据: " + item);

            try {
               TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
            
            // 处理完调用request再请求一个数据
            this.subscription.request(1);

            // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
            // this.subscription.cancel();
         }

         @Override
         public void onError(Throwable throwable) {
            // 出现了异常(例如处理数据的时候产生了异常)
            throwable.printStackTrace();


            // 我们可以告诉发布者, 后面不接受数据了
            this.subscription.cancel();
         }

         @Override
         public void onComplete() {
            // 全部数据处理完了(发布者关闭了)
            System.out.println("处理完了!");
         }

      };
      
      // 这里就是jdk8的stream
      Flux.fromArray(strs).map(s -> Integer.parseInt(s))
      // 最终 *** 作
      // 这里就是jdk9的reactive stream
      .subscribe(subscriber);

   }
}
四、webflux初体验

 官方的比对,说是webflux是非阻塞的、支持reactive stream、并且不支持关系型数据库只支持非关系型数据库。

		
			org.springframework.boot
			spring-boot-starter-webflux
		

我们可以看到,get1就是普通的mvc,get2就是webflux模式,对于用户来说这两种模式体验是一样的。但是对服务器来说,get1是阻塞的,get2是非阻塞的(其实无非是将业务处理放到后台线程,我感觉最终消耗的系统资源其实并没有少,也没有网上吹嘘的那么好,只是并发量高了之后可以接收更多的用户请求,但是处理请求不见得会快多少)。

get3的形式就是会逐个输出每一条数据。

import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;


@RestController
@Slf4j
public class TestController {


   @GetMapping("/1")
   private String get1() {
      log.info("get1 start");
      String result = createStr();
      log.info("get1 end.");
      return result;
   }


   @GetMapping("/2")
   private Mono get2() {
      log.info("get2 start");
      Mono result = Mono.fromSupplier(() -> createStr());
      log.info("get2 end.");
      return result;
   }


   /**
    * Flux : 返回0-n个元素
    *
    * @return
    */
   @GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
   private Flux flux() {
      Flux result = Flux
            .fromStream(IntStream.range(1, 5).mapToObj(i -> {
               try {
                  TimeUnit.SECONDS.sleep(1);
               } catch (InterruptedException e) {
               }
               return "flux data--" + i;
            }));
      return result;
   }


   private String createStr() {
      try {
         TimeUnit.SECONDS.sleep(5);
      } catch (InterruptedException e) {
      }
      return "some string";
   }


}
五、SSE(server sent events)

webflux的Flux,就像是SSE一样,可以实现类似向服务器推送资源的功能。

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
* Servlet implementation class SSE
*/
@WebServlet("/SSE")
public class SSE extends HttpServlet {
    private static final long serialVersionUID = 1L;

    /**
     * @see HttpServlet#HttpServlet()
     */
    public SSE() {
        super();
        // TODO Auto-generated constructor stub
    }

    /**
     * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");

        for (int i = 0; i < 5; i++) {
            // 指定事件标识
            response.getWriter().write("event:me\n");
            // 格式: data: + 数据 + 2个回车
            response.getWriter().write("data:" + i + "\n\n");
            response.getWriter().flush();

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
        }

    }

    /**
     * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // TODO Auto-generated method stub
        doGet(request, response);
    }

}




Insert title here


    

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/877585.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存