一、Netty概述
1.1 简介1.2 原生NIO的问题1.3 Netty的优点 二、线程模型
2.1 传统阻塞IO服务模型
特点:问题: 2.2 Reactor模式
2.2.1 单Reactor单线程
流程说明:优缺点分析: 2.2.2 单Reactor多线程
流程说明:优缺点: 2.2.3 主从Reactor多线程
流程说明:优缺点: 总结 三、Netty模型
原理描述:快速入门demo
一、Netty概述 1.1 简介官网:https://netty.io/
Netty是一个NIO客户端服务器框架,它可以快速简单地开发网络应用程序,如协议服务器和客户端。它极大地简化和简化了网络编程,如TCP和UDP套接字服务器。
“快速和简单”并不意味着最终的应用程序将遭受可维护性或性能问题。Netty经过了精心的设计,使用了许多协议(如FTP、SMTP、HTTP和各种基于二进制和文本的遗留协议)的实现所获得的经验。因此,Netty成功地找到了一种方法,在不妥协的情况下,实现了易于开发、性能、稳定性和灵活性。
1.2 原生NIO的问题NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、 SocketChannel、ByteBuffer 等。需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到 JDK 1.7 版本该问题仍旧存在,没有被根本解决。 1.3 Netty的优点
设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展 的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池.使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制(Zero-copy)。安全:完整的 SSL/TLS 和 StartTLS 支持。社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入 二、线程模型
目前存在的线程模型有:
传统阻塞IO服务模型Reactor模式
单Reactor单线程单Reactor多线程主从Reactor多线程
Netty主要是 基于主从Reactor多线程 做了一些改进
2.1 传统阻塞IO服务模型 特点:采用阻塞式IO获取输入数据每有一个连接就需要建立一个独立的线程进行数据的输入,业务处理,最后返回数据 问题:
- 并发量很大时,会创建大量线程,占用很大的内存资源建立连接后,如果该线程没有要读的数据,就会阻塞在read,造成线程资源浪费
针对上述两个问题,Reactor模式提供了解决方案
- 针对第一个问题,Reactor采用线程池复用线程资源: 将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务针对第二个问题,Reactor采用IO复用模型: 多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等 待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时, *** 作系统通知应 用程序,线程从阻塞状态返回,开始进行业务处理
基于事件驱动:通过一个或多个输入同时传递给服务处理器的模式同步分发:服务端处理器ServiceHandler将多个请求同步分发Dispatch到各个处理线程,所以Reactor模式又叫Dispatch模式IO复用监听事件:Reactor 模式使用IO复用监听事件。 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键
核心组成:
Reactor:Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。Handlers:处理程序,执行IO事件想要完成的实际任务。Reactor通过调度实现非阻塞 *** 作 2.2.1 单Reactor单线程
NIO的群聊系统(Demo链接)就是单Reactor单线程的
流程说明:
Reactor 通过 select 监听多路的连接请求,监控客户端的请求事件,接收到事件后通过 Dispatch 进行分发
如果是建立连接的事件:Acceptor通过accept处理连接请求,创建一个Handler对象处理连接完成后的业务
如果不是建立连接的事件:Reactor分发给Handler来响应这个业务
注意:Handler需要完成 Read->业务处理->Send 一系列业务
优缺点分析:优点:简单,没有多线程之间进程通信、竞争的问题缺点:
性能问题:只有一个线程,无法完全发挥多核 CPU 的性能。Handler需要完成从读到处理到返回一系列任务,在处理某个连接的业务时,整个进程无法处理其他连接事件,容易导致性能瓶颈**(Handler瓶颈)**可靠性问题:线程意外终止,或者进入死循环,会导致整个系统通信模块不 可用,不能接收和处理外部消息,造成节点故障 使用场景:客户端的数量有限,业务处理非常快速,比如 Redis 在业务处理的时间复杂度 O(1) 的情况 2.2.2 单Reactor多线程
单Reactor单线程模式在Handler处会有瓶颈 ,为解决此瓶颈:
单Reactor多线程模式把Handler开了多个线程,并进行了划分,让Handler专注于read和send,把业务处理交给Worker线程池处理
流程说明:
Reactor 通过 select 监听多路的连接请求,监控客户端的请求事件,接收到事件后通过 Dispatch 进行分发
如果是建立连接的事件:Acceptor通过accept处理连接请求,创建一个Handler对象处理连接完成后的业务
如果不是建立连接的事件:Reactor分发给Handler来响应这个业务
Handler只负责读取和会送 ,把业务处理交给Worker线程池处理
Worker线程池分配独立的线程进行真正的业务 *** 作, *** 作完成后返回给Handler进行回送
优缺点:优点:可以充分的利用多核 cpu 的处理能力缺点:
性能问题:多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈**(Reactor瓶颈)** 2.2.3 主从Reactor多线程
单Reactor多线程解决了Handler瓶颈,但是又会在Reactor处产生瓶颈,为解决:
主从Reactor多线程,多开了Reactor,一个主Reactor专门用来建立连接,其他的从Reactor专门用来处理请求
流程说明:MainReactor 主线程通过 select 监听多路的连接请求,使用Acceptor处理连接事件当 Acceptor 处理连接事件后,MainReactor 将连接分配给 SubReactor 从线程SubReactor 从线程将连接放入连接队列中,之后就能用select进行监听了当SubReactor监听到被分配的连接有事件时,就调用对应的Handler进行处理(后续和单Reactor多线程一样) 优缺点:
优点:主从Reactor线程中父线程和子线程数据交互简单,职责明确,父线程只需要接收连接请求,子线程完成后续的业务处理缺点:编程复杂度变高应用场景:这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型, Memcached 主从多线程,Netty 主从多线程模型的支持 总结
Reactor模式具有以下优点:
响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程 的切换开销扩展性好,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源复用性好,Reactor 模型本身与具体事件处理逻辑无关,具有很高的复用性 三、Netty模型
前面说了:Netty主要是 基于主从Reactor多线程 并做了一些改进
BossGroup:相当于 MainReactor组 ,内部维护 Selector ,这个Selector只关注连接事件
WorkerGroup:相当于 SubReactor组
当BossGroup监听到Accept事件后,获取到对应的SockerChannel,然后封装为NIOSockerChannel并注册到WorkerGroup中去
当Worker线程监听到分发给自己的连接的其他事件后,使用handler进行处理
简版:
完整版:
原理描述:Netty抽象出两组线程池 BossGroup 专门负责接收客户端的连接,WorkerGroup 专门负责网络的读写BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroupNioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环NioEventLoop,即循环监听每个NioEventLoop都有一个selector , 用于监听绑定在其上的socket的网络通讯
每个Boss NioEventLoop 循环执行的步骤
- 轮询accept 事件处理accept 事件 , 与client建立连接 , 生成NioScocketChannel , 并将其注册到某个worker NIOEventLoop 上的 selector处理任务队列的任务,即 runAllTasks
每个Worker NIOEventLoop 循环执行的步骤
- 论询read, write 事件处理i/o事件,即read , write 事件,在对应NioScocketChannel 处理处理任务队列的任务,即 runAllTasks
每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道)
快速入门demo客户端服务端互发消息
1、导包
io.netty netty-all4.1.73.Final
2、编写服务器端
public class NettyServer { public static void main(String[] args) { //创建两个线程组bossGroup和workerGroup //bossGroup只处理连接请求,workerGroup处理真正的业务 //两个都是无限循环 // bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数,默认实际 cpu核数 * 2 // 有参构造可以设置开几个线程 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务器启动对象,用来配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup)//设置两个线程组 .channel(NioServerSocketChannel.class) //服务器通道NioServerSocketChannel .option(ChannelOption.SO_BACKLOG,128) //线程队列得到连接的个数 .childOption(ChannelOption.SO_KEEPALIVE,true) //设置保持活动连接状态 .childHandler(new ChannelInitializer() { //创建一个通道测试对象 //给pipeline设置处理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); } }); //给workerGroup的Handler管道设置处理器 System.out.println("... 服务器 ok ..."); //绑定一个端口并且同步,生成一个ChannelFuture对象并启动 ChannelFuture channelFuture = bootstrap.bind(6666).sync(); //对关闭通道进行监听 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //优雅地关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
Handler管道处理器
public class NettyServerHandler extends ChannelInboundHandlerAdapter { //读取数据事件,可以读取客户端读取的消息 //ChannelHandlerContext ctx:上下文对象,含有管道pipeline,通道channel,地址 //Object msg:客户端发过来的消息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel()); System.out.println("server ctx =" + ctx); System.out.println("看看channel 和 pipeline的关系"); //channel和pipeline里面各有一个变量指向对方channel.pipeline(); pipeline.channel(); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站 System.out.println("server ctx="+ctx); //ByteBuf不是NIO的ByteBuffer,是Netty包下的 ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发过来消息:"+buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址为:"+ctx.channel().remoteAddress()); } //读取完毕,可以会一个消息 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //writeAndFlush是write + flush,将数据写入缓冲并刷新 ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端",CharsetUtil.UTF_8)); } //处理异常,一般是关闭通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
3、编写客户端
public class NettyClient { public static void main(String[] args) { //客户端需要一个事件循环组 EventLoopGroup eventExecutors = new NioEventLoopGroup(); try { //创建客户端启动对象,用来配置参数 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventExecutors) //设置线程组 .channel(NioSocketChannel.class)//设置客户端通道实现类 .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler());//pipeline 的处理器 } }); System.out.println("客户端 ok"); //启动客户端连接 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync(); //关闭通道做监听 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { eventExecutors.shutdownGracefully(); } } }
客户端管道处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter { //当通道就绪时就会触发,客户端发消息 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client上下文:"+ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello server,我是客户端", CharsetUtil.UTF_8)); } //通道有读事件时触发 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器地址:"+ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
4、运行:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)