2021SC@SDUSC
- 前言
- 一、代码
- 二、常见问题
- 三、@Sharable的检查
- 四、编解码器相关
- 五、总结
在继续阅读netty的编解码器相关的源码之前,先补充一下netty框架的@Sharable注解,通过该注解,可以在多个ChannelPipeline中共享同一个ChannelHandler。
一、代码首先,修改部分之前写的示例代码,来体验Sharable注解的使用。
新建TestSharableHandler类,采用单例模式。
package com.homework.server_client.server.handler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicInteger; @ChannelHandler.Sharable public class TestSharableHandler extends ChannelInboundHandlerAdapter { private final AtomicInteger COUNT = new AtomicInteger(0); private static final String SOURCE = "来自TestSharableHandler,"; private TestSharableHandler() {} private static class HandlerHolder { public final static TestSharableHandler TEST_SHARABLE_HANDLER = new TestSharableHandler(); } public static TestSharableHandler getInstance() { return HandlerHolder.TEST_SHARABLE_HANDLER; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { int temp = COUNT.incrementAndGet(); System.out.println(SOURCE + "当前人数是:" + temp); SocketAddress address = ctx.channel().remoteAddress(); System.out.println(SOURCE + "新加入的地址是:" + address); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { int temp = COUNT.decrementAndGet(); System.out.println(SOURCE + "当前人数是:" + temp); SocketAddress address = ctx.channel().remoteAddress(); System.out.println(SOURCE + "离开的地址是:" + address); } }
修改ServerInitializer类,在pipeline中添加处理器。
package com.homework.server_client.server.handler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; public class ServerInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //添加编解码器 pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); //自定义handler pipeline.addLast(new ServerHandler()); pipeline.addLast(TestSharableHandler.getInstance()); } }
具体的实验效果就不在这里做展示了,可以看出,添加了@Sharable注解的处理器由于能够共享数据,适合于做一些信息统计的处理。
二、常见问题首先是关于不添加注解会产生的结果。
在TestSharableHandler类中,将注解注释掉。
// @ChannelHandler.Sharable public class TestSharableHandler extends ChannelInboundHandlerAdapter
而后,运行两个Client程序。
来自TestSharableHandler,当前人数是:1 来自TestSharableHandler,新加入的地址是:/127.0.0.1:58876 十一月 20, 2021 4:06:58 下午 io.netty.channel.ChannelInitializer exceptionCaught 警告: Failed to initialize a channel. Closing: [id: 0x2bd10b48, L:/127.0.0.1:8080 - R:/127.0.0.1:60320] io.netty.channel.ChannelPipelineException: com.homework.server_client.server.handler.TestSharableHandler is not a @Sharable handler, so can't be added or removed multiple times. at io.netty.channel.DefaultChannelPipeline.checkMultiplicity(DefaultChannelPipeline.java:600) at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:202) at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:381) at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:370) at com.homework.server_client.server.handler.ServerInitializer.initChannel(ServerInitializer.java:20) at com.homework.server_client.server.handler.ServerInitializer.initChannel(ServerInitializer.java:10) at io.netty.channel.ChannelInitializer.initChannel(ChannelInitializer.java:129) at io.netty.channel.ChannelInitializer.handlerAdded(ChannelInitializer.java:112) at io.netty.channel.AbstractChannelHandlerContext.callHandlerAdded(AbstractChannelHandlerContext.java:938) at io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(DefaultChannelPipeline.java:609) at io.netty.channel.DefaultChannelPipeline.access0(DefaultChannelPipeline.java:46) at io.netty.channel.DefaultChannelPipeline$PendingHandlerAddedTask.execute(DefaultChannelPipeline.java:1463) at io.netty.channel.DefaultChannelPipeline.callHandlerAddedForAllHandlers(DefaultChannelPipeline.java:1115) at io.netty.channel.DefaultChannelPipeline.invokeHandlerAddedIfNeeded(DefaultChannelPipeline.java:650) at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:514) at io.netty.channel.AbstractChannel$AbstractUnsafe.access0(AbstractChannel.java:429) at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:486) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)
可以看到如果没有添加@Sharable注解,在第二个Client连接并且初始化时,会抛出异常,所以,如果希望某一个ChannelHandler可以被共享,必须添加@Sharable注解。相关的检查将在本篇博客的第三部分分析。
第二个问题,首先还是修改一下TestSharableHandler类的代码。
package com.homework.server_client.server.handler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicInteger; @ChannelHandler.Sharable public class TestSharableHandler extends ChannelInboundHandlerAdapter { private final AtomicInteger COUNT = new AtomicInteger(0); private static final String SOURCE = "来自TestSharableHandler,"; private TestSharableHandler() {} private static class HandlerHolder { public final static TestSharableHandler TEST_SHARABLE_HANDLER = new TestSharableHandler(); } public static TestSharableHandler getInstance() { // return HandlerHolder.TEST_SHARABLE_HANDLER; return new TestSharableHandler(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { int temp = COUNT.incrementAndGet(); System.out.println(SOURCE + "当前人数是:" + temp); SocketAddress address = ctx.channel().remoteAddress(); System.out.println(SOURCE + "新加入的地址是:" + address); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { int temp = COUNT.decrementAndGet(); System.out.println(SOURCE + "当前人数是:" + temp); SocketAddress address = ctx.channel().remoteAddress(); System.out.println(SOURCE + "离开的地址是:" + address); } }
可以看到,这里只是没有使用原本的单例模式的设计,即在每个pipeline中添加的都会是一个新的TestSharableHadler类的对象。
来自TestSharableHandler,当前人数是:1 来自TestSharableHandler,新加入的地址是:/127.0.0.1:65140 来自TestSharableHandler,当前人数是:1 来自TestSharableHandler,新加入的地址是:/127.0.0.1:65162
依然是运行两个Client类,可以看到,当前人数的输出一直是1,与预期的效果不同。结论是,@Sharable注解仅仅只是提供一个标记,如果没有添加该注解,则一个handler对象不能被添加到复数的pipeline中,而如果添加了该注解,并不保证所有的pipeline中的该类的handler对象是同一个,单例模式需要自己实现。
三、@Sharable的检查在这一部分,会分析一下当handler被加入pipeline中,对重复添加的检查。
首先是在DefaultChannelPipeline类中的addlast方法。
@Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } callHandlerAdded0(newCtx); return this; }
首先,会获得锁,之后调用checkMultiplicity方法,而对是否可以添加到复数的pipeline中的检查就是在这个方法完成的。
private static void checkMultiplicity(ChannelHandler handler) { if (handler instanceof ChannelHandlerAdapter) { ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; if (!h.isSharable() && h.added) { throw new ChannelPipelineException( h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times."); } h.added = true; } }
在该方法中,首先判断handler是否是ChannelHandlerAdapter类的子类的对象,之后,调用isSharable方法以及added属性。
public boolean isSharable() { Class> clazz = getClass(); Map, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = cache.get(clazz); if (sharable == null) { sharable = clazz.isAnnotationPresent(Sharable.class); cache.put(clazz, sharable); } return sharable; }
而在isSharable方法中,首先获得了该类的class对象,注意,在测试中,调用该方法的handler是TestSharableHandler类的对象,因此,获得的class对象也是TestSharableHandler类的class对象,之后,从cache中获得对应的信息,如果不存在,则去判断它是否有@Sharable注解,返回sharable。在之前的测试中,由于将注解注释了,因此这里的sharable会变成false,而当第二个Client连接并且初始化对应的pipeline时,返回值为false,因此报错。
四、编解码器相关之所以在分析编解码器时提到@Sharable注解,是因为在网上有看到netty的解码器不允许添加该注解,最开始的时候,我以为编解码器的工作相对简单重读,因此如果能够被共享,可以节省内存空间,后来,才知道,由于粘包拆包的问题,解码器往往需要保存一些中减状态,因此不能够共享,所以,不能够添加@Sharable注解。
五、总结在本篇博客中,分析了一些@Sharable注解相关的问题,在原先的测试中添加了被@Sharable注解的ChannelHandler,用于做一些信息的统计等问题,之后,分析了使用该注解时常见的问题,最后,分析了netty框架中在添加handler时,对于重复添加的逻辑处理。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)