netty框架学习及springboot整合集成

netty框架学习及springboot整合集成,第1张

netty框架学习及springboot整合集成
  • 1. Netty基本概念
  • 2. Netty框架
    • 2.1 Netty框架结构
    • 2.1 Netty NIO
    • 2.2 Reactor线程模型
  • 3. Springboot集成netty
    • 3.1 引入jar包依赖
    • 3.2 服务端
    • 3.3 客户端
  • 4. 参考资料

从很久以前就接触到netty,也在几个项目中使用netty进行网络通讯对接,包括对接车联网设备以及对接安防硬件设备,因此也一直有想法系统地学习netty相关的框架以及实现的原理等,包括前期对零拷贝技术的学习,也是在为学习netty做准备。因此本文,在学习线上相关技术资料、整合以前使用的案例的基础上整理出来,在此进行记录,为以后的深入学习做准备,也为后来者提供参考借鉴,文中不免疏漏之处,望读者予以指正,不胜感激!

1. Netty基本概念

Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

特点

(1)一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持
(2)使用更高效的socket底层,对epoll空轮询引起的cpu占用飙升在内部进行了处理,避免了直接使用NIO的陷阱,简化了NIO的处理方式。
(3)采用多种decoder/encoder 支持,对TCP粘包/分包进行自动化处理
(4)可使用接受/处理线程池,提高连接效率,对重连、心跳检测的简单支持
(5)可配置IO线程数、TCP参数, TCP接收和发送缓冲区使用直接内存代替堆内存,通过内存池的方式循环利用ByteBuf
(6)通过引用计数器及时申请释放不再引用的对象,降低了GC频率
(7)使用单线程串行化的方式,高效的Reactor线程模型
(8)大量使用了volitale、使用了CAS和原子类、线程安全类的使用、读写锁的使用

性能

(1)更高的吞吐量,更低的延迟
(2)减少资源消耗
(3)最小化不必要的内存复制

安全

完整的SSL / TLS和StartTLS支持
2. Netty框架 2.1 Netty框架结构


Netty 作为异步事件驱动的网络,高性能之处主要来自于其 I/O 模型和线程处理模型,前者决定如何收发数据,后者决定如何处理数据。

2.1 Netty NIO
  1. NIO概念
    NIO:同步非阻塞IO,服务器实现模式是一个线程处理多个请求,客户端发送的链接请求都会注册到多路复用器上,多路复用器轮询到链接有IO请求就进行处理。

  2. NIO核心组成部分
    NIO主要有三大核心部分 :Channel(通道),Buffer(缓冲区),Selector(选择器)。
    NIO基于Channel和Buffer进行 *** 作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector用于监听多个通道的事件(比如 :连接打开,数据到达)。因此使用单个线程就可以监听多个数据管道。
    NIO中Selector、Channel、Buffer的关系:
    1)每个channel都会对应一个buffer。
    2)一个selector对应一个线程,一个线程对应多个channel。
    3)程序切换到哪个channel是由Event(事件)决定的。
    4)selector会根据不同的事件,在各个通道上进行切换。
    5)buffer是一个内存块,底层有一个数组。
    6)数据的读取和写入是通过buffer,buffer可以切换读写,通过flip方法,但是BIO是单向输出,要么是输入流,要么是输出流。
    7)channel是双向的,可以返回底层 *** 作系统的情况,比如linux,底层的 *** 作系统通道就是双向的。

  3. Selector底层实现的三种方式
    Linux中支持IO多路复用的有select poll epoll ,最终还是选择了epoll
    epoll好处:
    1)支持一个进程打开的fd不受限制(当然小于OS的最大支持句柄)
    select最大缺陷:单进程打开的进程FD有限制,默认1024,对于要支持万个TCP的服务器来说太少。
    cat /proc/sys/fs/file -max 看最大句柄,1G内存机器约10W句柄
    2)IO不随FD数目增加线性下降
    传统select/poll:socket集合很大,由于网络延时、链路空闲,少部分socket活跃,select/poll 线性扫描,效率线性下降。
    Epoll:只对活跃的socket *** 作,epoll根据fd上的callback函数实现,只有活跃的socket才主动调用callback,idle的socket不会。
    当活跃socket多时,select/poll效率比epoll更高,当活跃socket少时,epoll效率更高
    3)使用mmap加速内核与用户空间的消息。
    epoll通过内核和用户空间mmap同一块内存来实现
    4)epoll Api比较简单


2.2 Reactor线程模型

Netty线程模型是典型的 Reactor 模型结构。

  1. Reactor线程模型
    常用的 Reactor 线程模型有三种,分别为:Reactor 单线程模型、Reactor 多线程模型和主从 Reactor 多线程模型。
    1)Reactor 单线程模型
    Reactor 单线程模型指的是所有的 IO *** 作都在同一个 NIO 线程上面完成。作为 NIO 服务端接收客户端的 TCP 连接,作为 NIO 客户端向服务端发起 TCP 连接,读取通信对端的请求或向通信对端发送消息请求或者应答消息。
    由于 Reactor 模式使用的是异步非阻塞 IO,所有的 IO *** 作都不会导致阻塞,理论上一个线程可以独立处理所有 IO 相关的 *** 作。
    2)Reactor 多线程模型
    对于一些小容量应用场景,可以使用单线程模型,但是对于高负载、大并发的应用却不合适,需要对该模型进行改进,演进为 Reactor 多线程模型。
    Rector 多线程模型与单线程模型最大的区别就是有一组 NIO 线程处理 IO *** 作。
    在该模型中有专门一个 NIO 线程 -Acceptor 线程用于监听服务端,接收客户端的 TCP 连接请求;而 1 个 NIO 线程可以同时处理N条链路,但是 1 个链路只对应 1 个 NIO 线程,防止发生并发 *** 作问题。
    网络 IO *** 作-读、写等由一个 NIO 线程池负责,线程池可以采用标准的 JDK 线程池实现,它包含一个任务队列和 N 个可用的线程,由这些 NIO 线程负责消息的读取、解码、编码和发送。
    3)主从 Reactor 多线程模型
    在并发极高的情况单独一个 Acceptor 线程可能会存在性能不足问题,为了解决性能问题,产生主从 Reactor 多线程模型。
    主从 Reactor 线程模型的特点是:服务端用于接收客户端连接的不再是 1 个单独的 NIO 线程,而是一个独立的 NIO 线程池。
    Acceptor 接收到客户端 TCP 连接请求处理完成后,将新创建的 SocketChannel 注册到 IO 线程池(sub reactor 线程池)的某个 IO 线程上,由它负责 SocketChannel 的读写和编解码工作。
    Acceptor 线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端 subReactor 线程池的 IO 线程上,由 IO 线程负责后续的 IO *** 作。

  2. Netty 线程模型
    Netty线程模型是基于主从reactor多线程模式的,并在此基础上做了一定程度上的优化:
    1)BossGroup线程池维护主Selector , 只关注accecpt
    2)当接收到accept事件后,获得对应的SocketChannel,封装成NioSocketChannel 注册到Worker线程循环
    3)worker线程监听到自己感兴趣的时间后,交由handler处理

  3. Netty Reactor线程执行过程

    1、netty抽象出两个线程池:BossGroup负责监听和建立连接 ;WorkerGroup 负责网络IO的读写
    2、BossGroup 和 WorkerGroup 类型都是NioEventLoopGroup , 相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环都是NioEventLoop
    3、NioEventLoop表示一个selector , 用户监听绑定在其上的socket网络通讯
    4、每一个Boos NioEventLoop循环执行3步:
    a、轮询accept事件
    b、建立连接,生成NioSocketChannel,并注册到workerGroup上
    c、处理任务队列中的任务,即RunAllTasks
    5、每个Worker NioEventLoop循环执行3步:
    a、轮询读写时间
    b、处理IO时间,在对应的NioSocketChannel上处理
    c、处理任务队列的任务,即RunAllTasks

3. Springboot集成netty 3.1 引入jar包依赖

   io.netty
   netty-all
   4.1.28.Final

3.2 服务端
// netty server类
@Component
public class NettyServer {

    @Value("${netty-port}")
    private int port;

    public void start() throws InterruptedException {
        /**
         * 创建两个线程组 bossGroup 和 workerGroup
         * bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成
         *  两个都是无线循环
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //设置两个线程组
            bootstrap.group(bossGroup, workerGroup)
                    //使用NioServerSocketChannel 作为服务器的通道实现
                    .channel(NioServerSocketChannel.class)
                    //设置线程队列得到连接个数
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //设置保持活动连接状态
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    //可以给 bossGroup 加个日志处理器
                    .handler(new LoggingHandler(LogLevel.INFO))
                    //给workerGroup 的 EventLoop 对应的管道设置处理器
                    .childHandler(new ChannelInitializer() {
                        //给pipeline 设置处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器
                            pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2));
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });

            //启动服务器并绑定一个端口并且同步生成一个 ChannelFuture 对象
            ChannelFuture cf = bootstrap.bind(port).sync();
            if (cf.isSuccess()) {
                System.out.println("socket server start---------------");
            }
            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } finally {
            //发送异常关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

// handler类
public class NettyServerHandler extends SimpleChannelInboundHandler {
    private static final Logger log = LoggerFactory.getLogger(NettyServerHandler.class);

    protected void channelRead0(ChannelHandlerContext context, Object obj) throws Exception {
        log.info(">>>>>>>>>>>服务端接收到客户端的消息:{}",obj);
        SocketChannel socketChannel = (SocketChannel) context.channel();
        /**
         * 服务器返回客户端消息
         */
        Map map =  new HashMap();
        map.put("msg","我是服务端,收到你的消息了");
        socketChannel.writeAndFlush(JSON.toJSONString(map));
        ReferenceCountUtil.release(obj);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}


// springboot 集成启动 netty server,同时不影响tomcat接口
@Component
public class NettyBoot implements CommandLineRunner {

    @Autowired
    private NettyServer nettyServer;

    public void run(String... args) throws Exception {
        try {
            nettyServer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
 
3.3 客户端 
// netty client客户端
@Component
public class NettyClient {

    private int port = 9999;
    private String host = "localhost";
    public Channel channel;

    public void start() {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .remoteAddress(host, port)
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器
                            pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2));
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect(host, port).sync();
            if (future.isSuccess()) {
                channel = future.channel();
                System.out.println("connect server  成功---------");
            }
//            给关闭通道进行监听
            future.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

    public void sendMsg(String msg) {
        this.channel.writeAndFlush(msg);
    }
}

// handler处理类
public class NettyClientHandler extends SimpleChannelInboundHandler {
    private static final Logger log = LoggerFactory.getLogger(NettyClientHandler.class);
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>>>连接");
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>>>退出");
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        log.info(">>>>>>>>>>>>>userEventTriggered:{}", evt);
    }

    /**
     * 客户端接收到服务端发的数据
     * @param channelHandlerContext
     * @param obj
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj)  {
        log.info(">>>>>>>>>>>>>客户端接收到消息:{}", obj);
        ReferenceCountUtil.release(obj);
    }

    /**
     * socket通道处于活动状态
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>>>>>socket建立了");
        super.channelActive(ctx);
    }

    /**
     * socket通道不活动了
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>>>>>socket关闭了");
        super.channelInactive(ctx);
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

// springboot集成,同时不影响tomact接口
@Component
public class NettyBoot implements CommandLineRunner {

    @Autowired
    private NettyClient nettyClient;

    public void run(String... args) throws Exception {
        nettyClient.start();
    }
}
 
4. 参考资料 

https://netty.io/
https://www.cnblogs.com/telwanggs/p/12119697.html
https://blog.csdn.net/lmdsoft/article/details/105618052
https://www.infoq.cn/article/netty-threading-model

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/724573.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)