如果在 Netty 中做耗时的、不可预料的 *** 作,比如数据库、网络请求等,会严重影响 Netty 对 Socket 的处理速度。解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步 *** 作来讲,有2种方式:
在Handler 中加入线程池;
在Context 中添加线程池;
将EchoServerHandler的channelRead()方法进行修改,该为异步的方式:
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter{
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws UnsupportedEncodingException,InterruptedException{
final Object msgCop = msg;
//缓存上下文对象
final ChannelHandlerContext ctxCop = ctx;
//将耗时任务加入到异步线程池
group.submit(new Callable<Object>(){
@Override
public Object call() throws Exception{
ByteBuf buf = (ByteBuf)msgCop;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req,"UTF-8");
Thread.sleep(10000);// 模拟慢 *** 作
System.err.println(body + "" + Thread.currrentThread().getName());
String reqString = "Hello i am server...";
ByteBuf resp = UnpooledcopiedBuffer(reqString.getBytes());
cxtCop.writeAndFlush(resp);
return null;
}
});
System.out.println("go on ...");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx){
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
//Close the connection when an exeception is raised
cause.printStackTrace();
ctx.close();
}
}
在channelRead()中,我们将耗时的任务提交到了一个自定义的业务线程池中,这样就不会阻塞Netty自身的IO线程了。
当IO线程轮询到一个socket事件,然后IO线程开始处理,当处理到耗时handler的时候,handler将耗时任务交给业务线程池处理。当耗时任务执行完毕再执行pipeline 的write方法的时候(代码中使用的是context的writeAndFlush方法——最终会调用下面的write(),上图画的是pipeline方法,是一个意思)会将这个任务交给IO线程。
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {//没有将任务放入自定义的线程池时该处的值就会是true
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {//将任务放入自定义的线程池时会走下面的逻辑
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
当判定下个outbound的executor线程不是当前线程的时候,会将当前的工作封装成task,然后放入mpsc队列中,等待IO任务执行完毕后执行队列中的任务。若我们在上面的Handler中将任务加入到了自定义的线程池中,就会走safeExecute(executor, task, promise, m),将任务放入mpsc队列中。
二、在Context中将任务加入线程池在向pipeline中添加handler的时候,添加一个线程池,将指定handler的业务处理交给这个线程池完成:
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,worderGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) throws Exception{
ChannelPipeline p = ch.pipeline();
if(sslCtx != null){
p.addLast(sslCtx.newHandler(ch.alloc()));
}
// 将EchoServerHandler的执行交给group线程池处理
p.addLast(group,new EchoServerHandler());
}
});
这样,handler中(上例中的EchoServerHandler)的代码就可以使用普通的方式来处理耗时业务,用户就不必在handler中将耗时任务加入到线程池中了。在调用addLast()方法添加线程池后,handler将优先使用这个线程池,如果这里没有添加这个线程池,将会使用IO线程。
当代码执行到AbstractChannelHandlerContext的invokeChannelRead()方法的时候,executor.inEventLoop()是不会通过的,因为当前线程是IO线程,Context(或者说Handler)的executor是业务线程(另外一个线程),所以会异步执行。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
如果我们将p.addLast(group,new EchoServerHandler())改为p.addLast(new EchoServerHandler()),会发现上述代码不会异步执行,后面的整个流程就和第一种方式一样了。
三、两种方式的比较 1、第一种方式在Handler中添加异步,可能更加的灵活。比如如果需要访问数据库,那我就异步,如果不需要,就不异步。异步会拖长接口响应时间,因为需要将任务放进mpscTask中。如果IO时间很短,task很多,可能一个循环下来,都没有时间执行整个task,导致响应时间达不到要求。
2、第二种方式是Netty的标准方式,但这会将整个Handler都交给业务线程池,不论耗不耗时,不够灵活。
使用时,需要自己根据业务斟酌。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)