网络编程与Netty(四) Netty源码-(Ⅰ)

网络编程与Netty(四) Netty源码-(Ⅰ),第1张

网络编程与Netty(四) Netty源码-(Ⅰ)

前面主要是介绍了在Netty中每个组件的使用和作用,在运行的时候,Netty中组件一起基于Netty的网络通信做出了贡献,而且也有说到一些并未给出证据的结论,例如,在每个EventLoop中是否真的是有一个线程和一个队列?在ChannelPipeline中是否真的默认会有一个head和一个tail共两个ChannelHandler?那么接下来我们来一一解析Netty中的组件,包括ChannelPipeline、ChannelHandlerContext、ChannelHandler、EventLoop、Channel、Unsafe、ByteBuf等等,并且还会将这些组件在服务器启动、各种事件发生时Netty内部的运行机制通过源码来进行讲解。

ChannelPipeline

​ ChannelPipeline是用于存储ChannelHandler(ChannelHandlerContext)的组件,handler在pipeline中形成一个双向链表,有点类似于责任链的一种变形,channel中所有的事件都会按顺序流经pipeline中的handler并进行处理。

​ 根据我们在前面对 Netty 的使用,我们知道,我们不需要自己创建 pipeline,因为使用ServerBootstrap 或者 Bootstrap 启动服务端或者客户端时,Netty 会为每个 Channel 连接创建一个独立的 pipeline。对于使用者而言,只需要将自定义的拦截器加入到 pipeline 中即可。

​ ChannelPipeline 支持运行态动态的添加或者删除 ChannelHandler,在某些场景下这个特性非常实用。例如当业务高峰期需要对系统做拥塞保护时,就可以根据当前的系统时间进行判断,如果处于业务高峰期,则动态地将系统拥塞保护 ChannelHandler 添加到当前的ChannelPipeline 中,当高峰期过去之后,就可以动态删除拥塞保护 ChannelHandler 了。

​ ChannelPipeline 是线程安全的,这意味着 N 个业务线程可以并发地 *** 作 ChannelPipeline而不存在多线程并发问题。但是,ChannelHandler 却不是线程安全的,这意味着尽管ChannelPipeline 是线程安全的,但是用户仍然需要自己保证 ChannelHandler 的线程安全。

​ Netty将Channel的数据管道抽象为ChannelPipeline,消息在ChannelPipeline中流动和传递。ChannelPipeline持有IO事件拦截器ChannelHandler的链表,由 ChannelHandler 对 IO 事件进行拦截和处理,可以方便地通过新增和删除ChannelHandler 来实现不同的业务逻辑定制,不需要对已有的ChannelHandler 进行修改,能够实现对修改封闭和对扩展的支持。接下来先看看ChannelPipeline的源码:(需要注意的是,因为大家的版本可能不一样,源码可能会略有不同,但是整体的思路是肯定一样的)

public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable> {
//add开头的方法
    ChannelPipeline addFirst(String var1, ChannelHandler var2);
    ChannelPipeline addFirst(EventExecutorGroup var1, String var2, ChannelHandler var3);
    ChannelPipeline addLast(String var1, ChannelHandler var2);
    ChannelPipeline addLast(EventExecutorGroup var1, String var2, ChannelHandler var3);
    ChannelPipeline addBefore(String var1, String var2, ChannelHandler var3);
    ChannelPipeline addBefore(EventExecutorGroup var1, String var2, String var3, ChannelHandler var4);
    ChannelPipeline addAfter(String var1, String var2, ChannelHandler var3);
    ChannelPipeline addAfter(EventExecutorGroup var1, String var2, String var3, ChannelHandler var4);
    ChannelPipeline addFirst(ChannelHandler... var1);
    ChannelPipeline addFirst(EventExecutorGroup var1, ChannelHandler... var2);
    ChannelPipeline addLast(ChannelHandler... var1);
    ChannelPipeline addLast(EventExecutorGroup var1, ChannelHandler... var2);
//remove相关的方法
    ChannelPipeline remove(ChannelHandler var1);
    ChannelHandler remove(String var1);
     T remove(Class var1);
    ChannelHandler removeFirst();
    ChannelHandler removeLast();
    ChannelPipeline replace(ChannelHandler var1, String var2, ChannelHandler var3);
    ChannelHandler replace(String var1, String var2, ChannelHandler var3);
     T replace(Class var1, String var2, ChannelHandler var3);
    ChannelHandler first();
    ChannelHandlerContext firstContext();
    ChannelHandler last();
    ChannelHandlerContext lastContext();
    ChannelHandler get(String var1);
     T get(Class var1);
    ChannelHandlerContext context(ChannelHandler var1);
    ChannelHandlerContext context(String var1);
    ChannelHandlerContext context(Class var1);
//获取到当前pipeline相关的channel
    Channel channel();
    List names();
    Map toMap();

//fire开头的方法
    ChannelPipeline fireChannelRegistered();
    ChannelPipeline fireChannelUnregistered();
    ChannelPipeline fireChannelActive();
    ChannelPipeline fireChannelInactive();
    ChannelPipeline fireExceptionCaught(Throwable var1);
    ChannelPipeline fireUserEventTriggered(Object var1);
    ChannelPipeline fireChannelRead(Object var1);
    ChannelPipeline fireChannelReadComplete();
    ChannelPipeline fireChannelWritabilityChanged();
    ChannelPipeline flush();
}

​ 可以看到,ChannelPipeline其实是一个接口,是ChannelHandler的管理容器,里面定义了一些方法,这些方法,主要是添加删除Handler、获取到相关的channel、还有传播不同事件的方法。具体的实现我们需要通过子类来看,然而我们发现他只有一个DefaultChannelPipeline这个子类,所以我们就主要进入这个子类来看一下:


​ 来观察下DefaultChannelPipeline这个类的一些属性和构造函数,在属性中我们可以看到有一个head和一个tail,还有一个Channel,这个就是与当前Pipeline进行绑定的Channel,结合下面的构造函数,我们可以清楚地看到,在创建当前pipeline的时候就会创建这个head和tail,并且形成一个双向链表。接下来选一个add方法来看看:

public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
//synchronized关键字,保证了当前的pipeline *** 作是线程安全的,但并不是说pipeline中的handler是线程安全的
        synchronized(this) {
            checkMultiplicity(handler);
            name = this.filterName(name, handler);
       //将handler封装成ctx(也就是ChannelHandlerContext)
            newCtx = this.newContext(group, name, handler);
       //真正的添加handler的方法
            this.addFirst0(newCtx);
            if (!this.registered) {
                newCtx.setAddPending();
                this.callHandlerCallbackLater(newCtx, true);
                return this;
            }
	//在执行完了添加 *** 作后,还需要发起callHandlerAdded0事件向后传播,而这里会使用到后续十分常用的模板
    //在这里会首先检查当前的线程是否为与当前channel绑定的eventLoop中的线程,如果不是则需要加入eventLoop中
    //的队列来完成callHandlerAdded0的调用,如果是则直接this.callHandlerAdded0(newCtx);即可。
    //这也证实了前面我们说的,对channel中的 *** 作都只能是与它绑定的eventLoop中的线程完成。
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    public void run() {
                        DefaultChannelPipeline.this.callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }

        this.callHandlerAdded0(newCtx);
        return this;
    }
private void addFirst0(AbstractChannelHandlerContext newCtx) {
    //在这个方法里添加了handler,可以看到head永远是pipeline中的第一个,在addLast方法中也可以看到tail永远是最后一个
        AbstractChannelHandlerContext nextCtx = this.head.next;
        newCtx.prev = this.head;
        newCtx.next = nextCtx;
        this.head.next = newCtx;
        nextCtx.prev = newCtx;
    }

​ 可以看到,在Pipeline中的元素实际上并不是ChannelHandler本身,而是把ChannelHandler包装进ChannelHandlerContext的实例DefaultChannelHandlerContext,然后把ChannelHandlerContext作为元素组成链表。

​ 由于 ChannelPipeline 支持运行期动态修改,因此存在两种潜在的多线程并发访问场景。

  1. IO 线程和用户业务线程的并发访问;
  2. 用户多个线程之间的并发访问。

​ 为了保证 ChannelPipeline 的线程安全性,需要通过线程安全容器或者锁来保证并发访问的安全,此处 Netty 直接使用了 synchronized 关键字,保证同步块内的所有 *** 作的原子性。 在加入链表之前,Netty 还会检查该 Handler 是否已经被添加过及其名字是否有重复,如果 该 Handler 不是共享的,而且被添加过抛出异常,如果名字重复,也会抛出异常。

​ 完成链表 *** 作后,后面的部分基本上做的就是一件事,执行方法 callHandlerAdded0,只是根据条件不同进行同步或者异步执行,callHandlerAdded0 的主要作用是在 handler 添加被加入链表之后做一些额外工作,Netty 本身对这个方法的实现,在 ChannelHandlerAdapter 类中,是个空 *** 作。只有我们自身需要做特别得需求时,才会去重写这个方法。

​ 前面有说到很重要的一点,当我们调用pipeline中的出站方法时(包括write、read等),实际上并不是从将事件传递给下一个出站Handler,而是从pipeline尾部开始向前传递,也就是tail,现在来看看源码是否如此。

​ 上面的方法addFirest0中源码可以发现,pipeline中的出站方法是调用tail中的相对应方法,而tail又永远都是pipeline中的最后一个Handler,也就证明我们前面说的结论是正确的。

​ 在Netty中所有的数据都是以事件来传播的,而事件又分为两种----出站和入站。入站事件,在pipeline中入站事件主要是调用HeadHandler对应的fire开头方法。出站事件包括发起绑定bind,发起读写read和write,刷新数据flush等。

进入到了源码阶段,也许会有些许的不适,但只有坚持下去才能让自己成长,读源码最重要的就是坚持,看别人写的东西,看不懂就会觉得很无聊,看懂了,就会发现很神奇,希望大家能从文章学习到知识,如果有任何的疑问或文章的错误,欢迎提出共同探讨!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5435468.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-11
下一篇 2022-12-11

发表评论

登录后才能评论

评论列表(0条)

保存