前面主要是介绍了在Netty中每个组件的使用和作用,在运行的时候,Netty中组件一起基于Netty的网络通信做出了贡献,而且也有说到一些并未给出证据的结论,例如,在每个EventLoop中是否真的是有一个线程和一个队列?在ChannelPipeline中是否真的默认会有一个head和一个tail共两个ChannelHandler?那么接下来我们来一一解析Netty中的组件,包括ChannelPipeline、ChannelHandlerContext、ChannelHandler、EventLoop、Channel、Unsafe、ByteBuf等等,并且还会将这些组件在服务器启动、各种事件发生时Netty内部的运行机制通过源码来进行讲解。
ChannelPipeline ChannelPipeline是用于存储ChannelHandler(ChannelHandlerContext)的组件,handler在pipeline中形成一个双向链表,有点类似于责任链的一种变形,channel中所有的事件都会按顺序流经pipeline中的handler并进行处理。
根据我们在前面对 Netty 的使用,我们知道,我们不需要自己创建 pipeline,因为使用ServerBootstrap 或者 Bootstrap 启动服务端或者客户端时,Netty 会为每个 Channel 连接创建一个独立的 pipeline。对于使用者而言,只需要将自定义的拦截器加入到 pipeline 中即可。
ChannelPipeline 支持运行态动态的添加或者删除 ChannelHandler,在某些场景下这个特性非常实用。例如当业务高峰期需要对系统做拥塞保护时,就可以根据当前的系统时间进行判断,如果处于业务高峰期,则动态地将系统拥塞保护 ChannelHandler 添加到当前的ChannelPipeline 中,当高峰期过去之后,就可以动态删除拥塞保护 ChannelHandler 了。
ChannelPipeline 是线程安全的,这意味着 N 个业务线程可以并发地 *** 作 ChannelPipeline而不存在多线程并发问题。但是,ChannelHandler 却不是线程安全的,这意味着尽管ChannelPipeline 是线程安全的,但是用户仍然需要自己保证 ChannelHandler 的线程安全。
Netty将Channel的数据管道抽象为ChannelPipeline,消息在ChannelPipeline中流动和传递。ChannelPipeline持有IO事件拦截器ChannelHandler的链表,由 ChannelHandler 对 IO 事件进行拦截和处理,可以方便地通过新增和删除ChannelHandler 来实现不同的业务逻辑定制,不需要对已有的ChannelHandler 进行修改,能够实现对修改封闭和对扩展的支持。接下来先看看ChannelPipeline的源码:(需要注意的是,因为大家的版本可能不一样,源码可能会略有不同,但是整体的思路是肯定一样的)
public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable> { //add开头的方法 ChannelPipeline addFirst(String var1, ChannelHandler var2); ChannelPipeline addFirst(EventExecutorGroup var1, String var2, ChannelHandler var3); ChannelPipeline addLast(String var1, ChannelHandler var2); ChannelPipeline addLast(EventExecutorGroup var1, String var2, ChannelHandler var3); ChannelPipeline addBefore(String var1, String var2, ChannelHandler var3); ChannelPipeline addBefore(EventExecutorGroup var1, String var2, String var3, ChannelHandler var4); ChannelPipeline addAfter(String var1, String var2, ChannelHandler var3); ChannelPipeline addAfter(EventExecutorGroup var1, String var2, String var3, ChannelHandler var4); ChannelPipeline addFirst(ChannelHandler... var1); ChannelPipeline addFirst(EventExecutorGroup var1, ChannelHandler... var2); ChannelPipeline addLast(ChannelHandler... var1); ChannelPipeline addLast(EventExecutorGroup var1, ChannelHandler... var2); //remove相关的方法 ChannelPipeline remove(ChannelHandler var1); ChannelHandler remove(String var1); T remove(Class var1); ChannelHandler removeFirst(); ChannelHandler removeLast(); ChannelPipeline replace(ChannelHandler var1, String var2, ChannelHandler var3); ChannelHandler replace(String var1, String var2, ChannelHandler var3); T replace(Class var1, String var2, ChannelHandler var3); ChannelHandler first(); ChannelHandlerContext firstContext(); ChannelHandler last(); ChannelHandlerContext lastContext(); ChannelHandler get(String var1); T get(Class var1); ChannelHandlerContext context(ChannelHandler var1); ChannelHandlerContext context(String var1); ChannelHandlerContext context(Class extends ChannelHandler> var1); //获取到当前pipeline相关的channel Channel channel(); List names(); Map toMap(); //fire开头的方法 ChannelPipeline fireChannelRegistered(); ChannelPipeline fireChannelUnregistered(); ChannelPipeline fireChannelActive(); ChannelPipeline fireChannelInactive(); ChannelPipeline fireExceptionCaught(Throwable var1); ChannelPipeline fireUserEventTriggered(Object var1); ChannelPipeline fireChannelRead(Object var1); ChannelPipeline fireChannelReadComplete(); ChannelPipeline fireChannelWritabilityChanged(); ChannelPipeline flush(); }
可以看到,ChannelPipeline其实是一个接口,是ChannelHandler的管理容器,里面定义了一些方法,这些方法,主要是添加删除Handler、获取到相关的channel、还有传播不同事件的方法。具体的实现我们需要通过子类来看,然而我们发现他只有一个DefaultChannelPipeline这个子类,所以我们就主要进入这个子类来看一下:
来观察下DefaultChannelPipeline这个类的一些属性和构造函数,在属性中我们可以看到有一个head和一个tail,还有一个Channel,这个就是与当前Pipeline进行绑定的Channel,结合下面的构造函数,我们可以清楚地看到,在创建当前pipeline的时候就会创建这个head和tail,并且形成一个双向链表。接下来选一个add方法来看看:
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; //synchronized关键字,保证了当前的pipeline *** 作是线程安全的,但并不是说pipeline中的handler是线程安全的 synchronized(this) { checkMultiplicity(handler); name = this.filterName(name, handler); //将handler封装成ctx(也就是ChannelHandlerContext) newCtx = this.newContext(group, name, handler); //真正的添加handler的方法 this.addFirst0(newCtx); if (!this.registered) { newCtx.setAddPending(); this.callHandlerCallbackLater(newCtx, true); return this; } //在执行完了添加 *** 作后,还需要发起callHandlerAdded0事件向后传播,而这里会使用到后续十分常用的模板 //在这里会首先检查当前的线程是否为与当前channel绑定的eventLoop中的线程,如果不是则需要加入eventLoop中 //的队列来完成callHandlerAdded0的调用,如果是则直接this.callHandlerAdded0(newCtx);即可。 //这也证实了前面我们说的,对channel中的 *** 作都只能是与它绑定的eventLoop中的线程完成。 EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { public void run() { DefaultChannelPipeline.this.callHandlerAdded0(newCtx); } }); return this; } } this.callHandlerAdded0(newCtx); return this; }
private void addFirst0(AbstractChannelHandlerContext newCtx) { //在这个方法里添加了handler,可以看到head永远是pipeline中的第一个,在addLast方法中也可以看到tail永远是最后一个 AbstractChannelHandlerContext nextCtx = this.head.next; newCtx.prev = this.head; newCtx.next = nextCtx; this.head.next = newCtx; nextCtx.prev = newCtx; }
可以看到,在Pipeline中的元素实际上并不是ChannelHandler本身,而是把ChannelHandler包装进ChannelHandlerContext的实例DefaultChannelHandlerContext,然后把ChannelHandlerContext作为元素组成链表。
由于 ChannelPipeline 支持运行期动态修改,因此存在两种潜在的多线程并发访问场景。
- IO 线程和用户业务线程的并发访问;
- 用户多个线程之间的并发访问。
为了保证 ChannelPipeline 的线程安全性,需要通过线程安全容器或者锁来保证并发访问的安全,此处 Netty 直接使用了 synchronized 关键字,保证同步块内的所有 *** 作的原子性。 在加入链表之前,Netty 还会检查该 Handler 是否已经被添加过及其名字是否有重复,如果 该 Handler 不是共享的,而且被添加过抛出异常,如果名字重复,也会抛出异常。
完成链表 *** 作后,后面的部分基本上做的就是一件事,执行方法 callHandlerAdded0,只是根据条件不同进行同步或者异步执行,callHandlerAdded0 的主要作用是在 handler 添加被加入链表之后做一些额外工作,Netty 本身对这个方法的实现,在 ChannelHandlerAdapter 类中,是个空 *** 作。只有我们自身需要做特别得需求时,才会去重写这个方法。
前面有说到很重要的一点,当我们调用pipeline中的出站方法时(包括write、read等),实际上并不是从将事件传递给下一个出站Handler,而是从pipeline尾部开始向前传递,也就是tail,现在来看看源码是否如此。
上面的方法addFirest0中源码可以发现,pipeline中的出站方法是调用tail中的相对应方法,而tail又永远都是pipeline中的最后一个Handler,也就证明我们前面说的结论是正确的。
在Netty中所有的数据都是以事件来传播的,而事件又分为两种----出站和入站。入站事件,在pipeline中入站事件主要是调用HeadHandler对应的fire开头方法。出站事件包括发起绑定bind,发起读写read和write,刷新数据flush等。
进入到了源码阶段,也许会有些许的不适,但只有坚持下去才能让自己成长,读源码最重要的就是坚持,看别人写的东西,看不懂就会觉得很无聊,看懂了,就会发现很神奇,希望大家能从文章学习到知识,如果有任何的疑问或文章的错误,欢迎提出共同探讨!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)