Netty21——Netty中处理耗时任务的方式

Netty21——Netty中处理耗时任务的方式,第1张

 如果在 Netty 中做耗时的、不可预料的 *** 作,比如数据库、网络请求等,会严重影响 Netty 对 Socket 的处理速度。解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步 *** 作来讲,有2种方式:
 在Handler 中加入线程池;
 在Context 中添加线程池;

一、在Handler中将任务加入线程池

 将EchoServerHandler的channelRead()方法进行修改,该为异步的方式:

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter{
	static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

	@Override
	public void channelRead(ChannelHandlerContext ctx,Object msg) throws UnsupportedEncodingException,InterruptedException{
		final Object msgCop = msg;
		//缓存上下文对象
		final ChannelHandlerContext ctxCop = ctx;
		//将耗时任务加入到异步线程池
		group.submit(new Callable<Object>(){
			@Override
			public Object call() throws Exception{
				ByteBuf buf = (ByteBuf)msgCop;
				byte[] req = new byte[buf.readableBytes()];
				buf.readBytes(req);
				String body = new String(req,"UTF-8");
				Thread.sleep(10000);// 模拟慢 *** 作
				System.err.println(body + "" + Thread.currrentThread().getName());
				String reqString = "Hello i am server...";
				ByteBuf resp = UnpooledcopiedBuffer(reqString.getBytes());
				cxtCop.writeAndFlush(resp);
				return null;
			}
		});
		System.out.println("go on ...");
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx){
		ctx.flush();
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
		//Close the connection when an exeception is raised
		cause.printStackTrace();
		ctx.close();
	}
}

 在channelRead()中,我们将耗时的任务提交到了一个自定义的业务线程池中,这样就不会阻塞Netty自身的IO线程了。

 当IO线程轮询到一个socket事件,然后IO线程开始处理,当处理到耗时handler的时候,handler将耗时任务交给业务线程池处理。当耗时任务执行完毕再执行pipeline 的write方法的时候(代码中使用的是context的writeAndFlush方法——最终会调用下面的write(),上图画的是pipeline方法,是一个意思)会将这个任务交给IO线程。

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {//没有将任务放入自定义的线程池时该处的值就会是true
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {//将任务放入自定义的线程池时会走下面的逻辑
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

 当判定下个outbound的executor线程不是当前线程的时候,会将当前的工作封装成task,然后放入mpsc队列中,等待IO任务执行完毕后执行队列中的任务。若我们在上面的Handler中将任务加入到了自定义的线程池中,就会走safeExecute(executor, task, promise, m),将任务放入mpsc队列中。

二、在Context中将任务加入线程池

 在向pipeline中添加handler的时候,添加一个线程池,将指定handler的业务处理交给这个线程池完成:

static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,worderGroup)
	.channel(NioServerSocketChannel.class)
	.option(ChannelOption.SO_BACKLOG,100)
	.handler(new LoggingHandler(LogLevel.INFO))
	.childHandler(new ChannelInitializer<SocketChannel>(){
		@Override
		public void initChannel(SocketChannel ch) throws Exception{
			ChannelPipeline p = ch.pipeline();
			if(sslCtx != null){
				p.addLast(sslCtx.newHandler(ch.alloc()));
			}
			// 将EchoServerHandler的执行交给group线程池处理
			p.addLast(group,new EchoServerHandler());
		}
	});

 这样,handler中(上例中的EchoServerHandler)的代码就可以使用普通的方式来处理耗时业务,用户就不必在handler中将耗时任务加入到线程池中了。在调用addLast()方法添加线程池后,handler将优先使用这个线程池,如果这里没有添加这个线程池,将会使用IO线程。
 当代码执行到AbstractChannelHandlerContext的invokeChannelRead()方法的时候,executor.inEventLoop()是不会通过的,因为当前线程是IO线程,Context(或者说Handler)的executor是业务线程(另外一个线程),所以会异步执行。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

 如果我们将p.addLast(group,new EchoServerHandler())改为p.addLast(new EchoServerHandler()),会发现上述代码不会异步执行,后面的整个流程就和第一种方式一样了。

三、两种方式的比较

 1、第一种方式在Handler中添加异步,可能更加的灵活。比如如果需要访问数据库,那我就异步,如果不需要,就不异步。异步会拖长接口响应时间,因为需要将任务放进mpscTask中。如果IO时间很短,task很多,可能一个循环下来,都没有时间执行整个task,导致响应时间达不到要求。
 2、第二种方式是Netty的标准方式,但这会将整个Handler都交给业务线程池,不论耗不耗时,不够灵活。
 使用时,需要自己根据业务斟酌。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/992523.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存