HTTP 作为一种无状态的协议采用的是请求-应答的模式,每当客户端发起的请求到达服务器,Servlet 容器通常会为每个请求使用一个线程来处理。为了避免线程创建和销毁的资源消耗,一般会采用线程池,而线程池中的线程数量是有限的,当线程池中的线程被全部使用,客户端只能等待有空闲线程处理请求。
实际场景中,部分线程可能因为等待数据库查询结果或远程 Web 资源被阻塞,如果阻塞时间过长,线程池中的线程很快就被耗尽,从而导致无法处理其他请求。
为了提高系统的吞吐量,我们需要尽量使处理请求的线程处于非空闲状态。如果能够将那些长时间阻塞的线程利用起来处理新请求,由其他线程等资源满足时再继续处理前面的请求,这样对吞吐量的提升就会有很大的帮助。
Java EE 自 Servlet 3.0 开始对 Servlet 和 Filter 提供了异步支持,如果 Servlet 和 Filter 在处理请求时可能会发生阻塞,可以将阻塞请求线程的 *** 作分配到异步线程,然后将处理请求的线程归还到 Servlet 容器中的线程池,而不产生响应,当异步线程中的 *** 作完成,异步线程可以直接产生响应或将请求重新分派到容器中的 Servlet 处理。
如果你已经对 Servlet 异步处理有所熟悉,可跳过下面的部分直接看 Spring MVC 异步处理。
先通过一个案例了解如何使用 Servlet 中的异步处理。
默认情况下 Servlet 和 Filter 都不支持异步,需要在部署描述符或注解中开启异步支持。
部署描述符开启异步支持示例如下。
asyncA com.zzuhkp.mvc.AsyncServlet trueasyncA /async/a asyncFilter com.zzuhkp.mvc.AsyncFilter trueasyncFilter asyncA
部署描述符开启异步支持的重点是设置 servlet 或 filter 标签下的 async-supported 值为 true。
注解开启异步支持的示例如下。
@WebFilter(value = "/async/a", asyncSupported = true) public class AsyncFilter implements Filter { @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { chain.doFilter(request, response); } } @WebServlet(urlPatterns = "/async/a", asyncSupported = true) public class AsyncServlet extends HttpServlet { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { // 1. 开启异步处理 AsyncContext asyncContext = req.startAsync(req, resp); Executors.newSingleThreadExecutor().submit(new Runnable() { @Override public void run() { try { // 2. 使用新线程执行耗时 *** 作 Thread.sleep(10000L); // 3. 耗时 *** 作完成后进行响应 asyncContext.getResponse().getWriter().write("this is a async servlet"); // 4. 通知容器异步 *** 作完成 asyncContext.complete(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } }); } }
通过注解开启异步支持的重点是设置 @WebFilter 或 @WebServlet 中的 asyncSupported 为 true。
注意上述 Servlet 还列出了进行异步 *** 作的常用步骤。
- 先使用 ServletRequest#startAsync(ServletRequest, ServletResponse) 开启异步。
- 开启异步后使用新线程进行异步处理,执行耗时 *** 作。
- 新线程耗时 *** 作完成后可以使用取到的资源信息发起响应。
- 最后调用第一步开启异步支持返回的异步上下文 AsyncContext#complete 方法通知容器异步处理已经结束。
下面介绍异步处理常见的 *** 作及对应 API,这些 API 将在 Spring MVC 异步处理中使用。
开启异步支持:
开启异步支持有两个方法,分别如下。
- ServletRequest#startAsync(ServletRequest,ServletResponse)
- ServletRequest#startAsync()
这两个参数都将返回一个异步处理的上下文 AsyncContext,不同的是如果使用了无参的 #startAsync 方法,AsyncContext 内部持有的 request、response 将是原始的,无论 Filter 是否对 request、response 进行了包装。
结束异步处理:
异步处理完成后有两种结束的方式。一种如上面的示例通知容器返回响应到客户端,另一种是通知容器使用其他 Servlet 继续处理请求。关联的方法有4个。
- AsyncContext#complete
- AsyncContext#dispatch()
- AsyncContext#dispatch(String)
- AsyncContext#dispatch(ServletContext, String)
AsyncContext 中的 #complete 用于在异步线程中通知容器向客户端发出响应,此后异步线程不可再产生响应。
AsyncContext 中的 #dispatch 用于通知容器重新派发请求。无参数的重载方法重新派发请求到当前请求路径,有参数的重载方法可以指定派发请求的路径。
派发类型判断:
由于异步处理后可以重新派发请求到当前 URL,因此需要判断派发类型,知道当前请求是从哪里产生的,从而使用不同处理逻辑,这可以通过 ServletRequest#getDispatcherType 方法来实现,这个方法返回的是一个 DispatcherType 枚举类型,每个枚举值的含义如下。
public enum DispatcherType { // request.getRequestDispatcher("/path").forward(request,response) 产生的请求 FORWARD, // request.getRequestDispatcher("/path").include(request,response) 产生的请求 INCLUDE, // 客户端正常发起请求 REQUEST, // 异步处理 AsyncContext#dispatch 分派的请求 ASYNC, // Servlet 产生错误,转发请求到错误页面 ERROR }
异步处理监听:
异步处理开始和结束之间,容器还会产生一些事件,可以通过 AsyncContext#addListener(AsyncListener) 方法添加对异步事件的监听,具体可以监听的事件如下。
public interface AsyncListener extends EventListener { // 异步处理完成 public void onComplete(AsyncEvent event) throws IOException; // 异步处理超时 public void onTimeout(AsyncEvent event) throws IOException; // 异步处理发生异常 public void onError(AsyncEvent event) throws IOException; // ServletRequest#startAsync 重新开启异步 public void onStartAsync(AsyncEvent event) throws IOException; }
异步处理默认的超时时间是 30 秒,可以通过 AsyncContext#setTimeout 设置超时时间,以设置时间重新计算。
Spring MVC 异步处理Spring MVC 结合自身特性,对 Servlet 中的异步处理进行了封装,使异步处理更为简便。
快速体验 Spring MVC 异步处理Spring MVC 手动配置 DispatcherServlet 需要指定 async-supported 为 true,Spring Boot 环境下已经默认开启了异步处理的支持。
在 Spring MVC 中使用异步处理最简单的方式是在 controller 方法中直接返回 Callable 类型,示例代码如下。
@RestController public class AsyncController { @GetMapping("/test") public Callabletest() { Callable callable = new Callable () { @Override public String call() throws Exception { return "this is a test"; } }; return callable; } }
controller 方法返回 Callable 类型之后,Spring 会自动使用异步线程池调用 Callable#call 方法,然后对 #call 方法返回值重新解析,解析方式和普通的 controller 方法一致,上述示例代码将向浏览器输出一段文字。
Spring MVC 异步处理常用的两种方式 CallableCallable 作为 controller 方法返回值是最常用的一种方式,这种方式会使用 Spring 默认的线程池进行异步处理。具体可以参见上面的示例。
DeferredResult如果需要指定异步处理的线程池,将 DeferredResult 作为 controller 方法的返回值是更好的选择,DeferredResult 不仅可以手动指定线程池,还可以配置异步处理的回调,如超时、完成、错误。示例代码如下。
@RestController public class AsyncController { @GetMapping("/test") public DeferredResulttest() { DeferredResult deferredResult = new DeferredResult<>(); Executors.newSingleThreadExecutor().submit(new Runnable() { @SneakyThrows @Override public void run() { // 模拟耗时的 *** 作 Thread.sleep(5000L); // 设置异步处理结果 deferredResult.setResult("this is a test"); } }); // 设置异步处理回调 deferredResult.onTimeout(() -> System.out.println("异步处理超时")); deferredResult.onCompletion(() -> System.out.println("异步处理完成")); deferredResult.onError((throwable) -> System.out.println("异步处理错误:" + throwable.getMessage())); return deferredResult; } }
上述代码将 DeferredResult 作为 controller 返回值,然后在线程池中手动设置了返回的结果,相对来说更为灵活。
Spring MVC 异步处理的其他方式除了上述 Callable 和 DeferredResult 两种类型作为 controller 方法返回值,还有其他几种使用相对没那么频繁的类型可以作为 controller 方法的返回值类型,这几种类型与 Callable 或 DeferredResult 相互适配。
StreamingResponseBody、ResponseEntityStreamingResponseBody 可以使用原始的方式输出响应,Spring 内部将这个类适配为 Callable,在异步处理的时候回调这个接口然后输出响应。
ResponseEntity
StreamingResponseBody 示例代码如下。
@RestController public class AsyncController { @GetMapping("/test") public StreamingResponseBody test() { StreamingResponseBody body = new StreamingResponseBody() { @Override public void writeTo(OutputStream outputStream) throws IOException { BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream)); writer.write("this is a test"); } }; return body; } }WebAsyncTask
WebAsyncTask 是 Callable 最底层的实现,Callable 最终将适配为 WebAsyncTask,这个类和 DeferredResult 功能类似,可以指定异步执行线程池、异步执行回调,由于底层使用了 Callable ,因此不能手动指定何时产生响应。示例代码如下。
@RestController public class AsyncController { @GetMapping("/test") public WebAsyncTaskListenableFuturetest() { // 设置超时时间、线程池、异步任务 WebAsyncTask task = new WebAsyncTask<>(5000L, new SimpleAsyncTaskExecutor(), new Callable () { @Override public String call() throws Exception { // 模拟耗时的 *** 作 Thread.sleep(5000L); // 返回异步处理结果 return "this ia a test"; } }); // 设置异步处理回调 task.onTimeout(() -> "异步处理超时"); task.onCompletion(() -> System.out.println("异步处理完成")); task.onError(() -> "异步处理错误"); return task; } }
ListenableFuture 是 Spring 对 Future 扩展提出的接口,可以在任务执行成功或者失败时回调给定的接口方法。在异步处理中,如果 controller 方法返回这个类型,Spring 会将其适配为 DeferredResult,异步任务执行成功后设置异步处理的结果。从功能上来说弱于 DeferredResult,不能设置超时时间及超时回调。 示例代码如下。
@RestController public class AsyncController { @GetMapping("/test") public ListenableFutureCompletionStagetest() { ListenableFutureTask task = new ListenableFutureTask<>(new Callable () { @Override public String call() throws Exception { // 模拟耗时的 *** 作 Thread.sleep(5000L); // 返回异步处理结果 return "this is a test"; } }); task.addCallback(new ListenableFutureCallback () { @Override public void onFailure(Throwable ex) { System.out.println("异步任务异常:" + ex.getMessage()); } @Override public void onSuccess(String result) { System.out.println("异步任务执行完成"); } }); // 提交异步任务 Executors.newSingleThreadExecutor().submit(task); return task; } }
CompletionStage 是 JDK 1.8 提供的表示异步执行的其中一个阶段,可以在当前阶段完成后进入下一个阶段,典型的实现是 CompletableFuture。
使用 CompletableFuture 作为 controller 作为返回值,Spring 会将其适配为 DeferredResult,在当前阶段完成后设置异步处理的结果,从功能上来说强于 Callable,可以设置线程池,但不能设置回调和设置超时时间。示例代码如下。
@RestController public class AsyncController { @GetMapping("/test") public CompletionStageResponseBodyEmitter、ResponseEntitytest() { CompletionStage future = CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { return "this is a test"; } }, Executors.newSingleThreadExecutor()); return future; } }
ResponseBodyEmitter 类型的作用类似于 Servlet 异步处理原生的 API,支持用户多次发出响应,这个类型作为 controller 方法返回类型后,Spring 同样会将这个类型适配为 DeferredResult。这个类型支持异步处理回调、设置超时时间,指定线程池等。
ResponseEntity
ResponseBodyEmitter 示例代码如下。
@RestController public class AsyncController { @GetMapping("/test") public ResponseBodyEmitter test() { ResponseBodyEmitter emitter = new ResponseBodyEmitter(5000L); // 异步线程池中执行耗时任务 Executors.newSingleThreadExecutor().submit(new Runnable() { @SneakyThrows @Override public void run() { // 设置异步处理回调 emitter.onCompletion(() -> System.out.println("异步处理完成")); emitter.onTimeout(() -> System.out.println("异步处理")); emitter.onError((throwable) -> System.out.println("异步处理异常:" + throwable.getMessage())); // 模拟耗时 *** 作 Thread.sleep(3000L); // 发送响应 emitter.send("this is "); emitter.send("a test"); // 通知容器异步处理完成 emitter.complete(); } }); return emitter; } }
需要注意的是由于 Spring 需要等待 controller 方法返回后才能真正设置回调,因此如果异步任务如果在 controller 方法返回前就已经执行结束,回调将无法生效。
Spring MVC 异步处理方式总结这里总结几种 controller 方法返回类型的异同,上述中的几种类型的适配关系可以如下图所示。
图中下面的类型可以适配到上面的类型,最终由 WebAsyncManager 使用来开启异步处理。
各类型功能异同如下表,可根据需求选择合适的类型进行异步处理。
到了这里文章的内容已经很长了,但为了文章的完整性还是简单介绍下 Spring 在内部如何实现异步处理的吧。
首先 Spring 将按照正常的流程执行 controller 方法,方法返回后 Spring 处理和异步有关的几个类型值,然后开始异步处理。以 Callable 类型为例,处理这个返回值类型的代码如下。
public class CallableMethodReturnValueHandler implements HandlerMethodReturnValueHandler { @Override public boolean supportsReturnType(MethodParameter returnType) { return Callable.class.isAssignableFrom(returnType.getParameterType()); } @Override public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { if (returnValue == null) { mavContainer.setRequestHandled(true); return; } Callable> callable = (Callable>) returnValue; // 开启异步处理 WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(callable, mavContainer); } }
Spring 先调用 WebAsyncUtils.getAsyncManager 方法获取异步管理器 WebAsyncManager,WebAsyncManager 是异步处理的核心类,WebAsyncManager 获取之后会 Spring 会将实例存储到 request 的属性中。代码如下。
public abstract class WebAsyncUtils { public static WebAsyncManager getAsyncManager(WebRequest webRequest) { int scope = RequestAttributes.SCOPE_REQUEST; WebAsyncManager asyncManager = null; Object asyncManagerAttr = webRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, scope); if (asyncManagerAttr instanceof WebAsyncManager) { asyncManager = (WebAsyncManager) asyncManagerAttr; } if (asyncManager == null) { asyncManager = new WebAsyncManager(); // 将实例存储至 request 属性 webRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager, scope); } return asyncManager; } }
然后 Spring 调用 WebAsyncManager#startCallableProcessing(Callable>, Object...) 开始异步处理,包括设置回调、开启异步处理、执行异步任务等等,这里将用到 Servlet 原生的 API,由于代码较多,不再展示。执行异步任务后 Spring 会调用 AsyncContext#dispatch() 将请求重新派发到当前 controller。
当请求转发到当前 controller 时,RequestMappingHandlerAdapter 会再次执行 controller 方法,此时从 request 属性中取出 WebAsyncManager,发现已经产生异步处理的结果,然后对表示 controller 方法的 ServletInvocableHandlerMethod 加以包装,使其直接返回异步处理结果,后面和正常流程一样,最终将结果输出到客户端。这块代码可参考 RequestMappingHandlerAdapter#invokeHandlerMethod,不再具体展示。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)