<2021SC@SDUSC>netty@Sharable

<2021SC@SDUSC>netty@Sharable,第1张

<2021SC@SDUSC>netty@Sharable

2021SC@SDUSC

文章目录
  • 前言
  • 一、代码
  • 二、常见问题
  • 三、@Sharable的检查
  • 四、编解码器相关
  • 五、总结

前言

在继续阅读netty的编解码器相关的源码之前,先补充一下netty框架的@Sharable注解,通过该注解,可以在多个ChannelPipeline中共享同一个ChannelHandler。

一、代码

首先,修改部分之前写的示例代码,来体验Sharable注解的使用。
新建TestSharableHandler类,采用单例模式。

package com.homework.server_client.server.handler;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicInteger;

@ChannelHandler.Sharable
public class TestSharableHandler extends ChannelInboundHandlerAdapter {

    private final AtomicInteger COUNT = new AtomicInteger(0);
    private static final String SOURCE = "来自TestSharableHandler,";

    private TestSharableHandler() {}

    private static class HandlerHolder {
        public final static TestSharableHandler TEST_SHARABLE_HANDLER
                = new TestSharableHandler();
    }

    public static TestSharableHandler getInstance() {
        return HandlerHolder.TEST_SHARABLE_HANDLER;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        int temp = COUNT.incrementAndGet();
        System.out.println(SOURCE + "当前人数是:" + temp);
        SocketAddress address = ctx.channel().remoteAddress();
        System.out.println(SOURCE + "新加入的地址是:" + address);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        int temp = COUNT.decrementAndGet();
        System.out.println(SOURCE + "当前人数是:" + temp);
        SocketAddress address = ctx.channel().remoteAddress();
        System.out.println(SOURCE + "离开的地址是:" + address);
    }

}

修改ServerInitializer类,在pipeline中添加处理器。

package com.homework.server_client.server.handler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class ServerInitializer extends ChannelInitializer {

    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        //添加编解码器
        pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
        //自定义handler
        pipeline.addLast(new ServerHandler());
        pipeline.addLast(TestSharableHandler.getInstance());
    }

}

具体的实验效果就不在这里做展示了,可以看出,添加了@Sharable注解的处理器由于能够共享数据,适合于做一些信息统计的处理。

二、常见问题

首先是关于不添加注解会产生的结果。
在TestSharableHandler类中,将注解注释掉。

// @ChannelHandler.Sharable
public class TestSharableHandler extends ChannelInboundHandlerAdapter

而后,运行两个Client程序。

来自TestSharableHandler,当前人数是:1
来自TestSharableHandler,新加入的地址是:/127.0.0.1:58876
十一月 20, 2021 4:06:58 下午 io.netty.channel.ChannelInitializer exceptionCaught
警告: Failed to initialize a channel. Closing: [id: 0x2bd10b48, L:/127.0.0.1:8080 - R:/127.0.0.1:60320]
io.netty.channel.ChannelPipelineException: com.homework.server_client.server.handler.TestSharableHandler is not a @Sharable handler, so can't be added or removed multiple times.
	at io.netty.channel.DefaultChannelPipeline.checkMultiplicity(DefaultChannelPipeline.java:600)
	at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:202)
	at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:381)
	at io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:370)
	at com.homework.server_client.server.handler.ServerInitializer.initChannel(ServerInitializer.java:20)
	at com.homework.server_client.server.handler.ServerInitializer.initChannel(ServerInitializer.java:10)
	at io.netty.channel.ChannelInitializer.initChannel(ChannelInitializer.java:129)
	at io.netty.channel.ChannelInitializer.handlerAdded(ChannelInitializer.java:112)
	at io.netty.channel.AbstractChannelHandlerContext.callHandlerAdded(AbstractChannelHandlerContext.java:938)
	at io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(DefaultChannelPipeline.java:609)
	at io.netty.channel.DefaultChannelPipeline.access0(DefaultChannelPipeline.java:46)
	at io.netty.channel.DefaultChannelPipeline$PendingHandlerAddedTask.execute(DefaultChannelPipeline.java:1463)
	at io.netty.channel.DefaultChannelPipeline.callHandlerAddedForAllHandlers(DefaultChannelPipeline.java:1115)
	at io.netty.channel.DefaultChannelPipeline.invokeHandlerAddedIfNeeded(DefaultChannelPipeline.java:650)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:514)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.access0(AbstractChannel.java:429)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:486)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

可以看到如果没有添加@Sharable注解,在第二个Client连接并且初始化时,会抛出异常,所以,如果希望某一个ChannelHandler可以被共享,必须添加@Sharable注解。相关的检查将在本篇博客的第三部分分析。

第二个问题,首先还是修改一下TestSharableHandler类的代码。

package com.homework.server_client.server.handler;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicInteger;

@ChannelHandler.Sharable
public class TestSharableHandler extends ChannelInboundHandlerAdapter {

    private final AtomicInteger COUNT = new AtomicInteger(0);
    private static final String SOURCE = "来自TestSharableHandler,";

    private TestSharableHandler() {}

    private static class HandlerHolder {
        public final static TestSharableHandler TEST_SHARABLE_HANDLER
                = new TestSharableHandler();
    }

    public static TestSharableHandler getInstance() {
        // return HandlerHolder.TEST_SHARABLE_HANDLER;
        return new TestSharableHandler();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        int temp = COUNT.incrementAndGet();
        System.out.println(SOURCE + "当前人数是:" + temp);
        SocketAddress address = ctx.channel().remoteAddress();
        System.out.println(SOURCE + "新加入的地址是:" + address);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        int temp = COUNT.decrementAndGet();
        System.out.println(SOURCE + "当前人数是:" + temp);
        SocketAddress address = ctx.channel().remoteAddress();
        System.out.println(SOURCE + "离开的地址是:" + address);
    }

}

可以看到,这里只是没有使用原本的单例模式的设计,即在每个pipeline中添加的都会是一个新的TestSharableHadler类的对象。

来自TestSharableHandler,当前人数是:1
来自TestSharableHandler,新加入的地址是:/127.0.0.1:65140
来自TestSharableHandler,当前人数是:1
来自TestSharableHandler,新加入的地址是:/127.0.0.1:65162

依然是运行两个Client类,可以看到,当前人数的输出一直是1,与预期的效果不同。结论是,@Sharable注解仅仅只是提供一个标记,如果没有添加该注解,则一个handler对象不能被添加到复数的pipeline中,而如果添加了该注解,并不保证所有的pipeline中的该类的handler对象是同一个,单例模式需要自己实现。

三、@Sharable的检查

在这一部分,会分析一下当handler被加入pipeline中,对重复添加的检查。
首先是在DefaultChannelPipeline类中的addlast方法。

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

首先,会获得锁,之后调用checkMultiplicity方法,而对是否可以添加到复数的pipeline中的检查就是在这个方法完成的。

    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }

在该方法中,首先判断handler是否是ChannelHandlerAdapter类的子类的对象,之后,调用isSharable方法以及added属性。

    public boolean isSharable() {
        
        Class clazz = getClass();
        Map, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
        Boolean sharable = cache.get(clazz);
        if (sharable == null) {
            sharable = clazz.isAnnotationPresent(Sharable.class);
            cache.put(clazz, sharable);
        }
        return sharable;
    }

而在isSharable方法中,首先获得了该类的class对象,注意,在测试中,调用该方法的handler是TestSharableHandler类的对象,因此,获得的class对象也是TestSharableHandler类的class对象,之后,从cache中获得对应的信息,如果不存在,则去判断它是否有@Sharable注解,返回sharable。在之前的测试中,由于将注解注释了,因此这里的sharable会变成false,而当第二个Client连接并且初始化对应的pipeline时,返回值为false,因此报错。

四、编解码器相关

之所以在分析编解码器时提到@Sharable注解,是因为在网上有看到netty的解码器不允许添加该注解,最开始的时候,我以为编解码器的工作相对简单重读,因此如果能够被共享,可以节省内存空间,后来,才知道,由于粘包拆包的问题,解码器往往需要保存一些中减状态,因此不能够共享,所以,不能够添加@Sharable注解。

五、总结

在本篇博客中,分析了一些@Sharable注解相关的问题,在原先的测试中添加了被@Sharable注解的ChannelHandler,用于做一些信息的统计等问题,之后,分析了使用该注解时常见的问题,最后,分析了netty框架中在添加handler时,对于重复添加的逻辑处理。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5563734.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存