Java网络编程Netty学习笔记2——Netty入门

Java网络编程Netty学习笔记2——Netty入门,第1张

目录

1、概述

1.1、Netty是什么

1.2、Netty的地位

1.3、Netty的优势

2、Hello World

2.1、目标

2.2、服务器端

2.3、客户端

2.4、注意的问题

3、组件

3.1、EventLoop

3.2、Channel

3.2.1、ChannelFuture

3.2.2、为什么要异步

3.3、Future & Promise

3.3.1、JDK Future

3.3.2、Netty Future

3.3.3、netty Promise

3.4、Handler & Pipeline

3.5、EmbeddedChannel测试

3.6、ByteBuf

3.6.1、堆内存 vs 直接内存

3.6.2、池化 vs 非池化

3.6.3、组成

3.6.4、写入

3.6.5、扩容

3.6.6、读取

3.6.7、retain & release

3.6.8、slice & composite

3.6.9、duplicate

3.6.10、copy

3.6.11、Unpooled

3.6.12、ByteBuf的优势


1、概述 1.1、Netty是什么

Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。

1.2、Netty的地位

Netty在Java网络应用框架中的地位就好比Spring框架在JavaEE开发中的地位。以下的框架都使用了Netty,因为它们有网络通信要求:

  • Cassandra - nosql数据库
  • Spark - 大数据分布式计算框架
  • Hadoop - 大数据分布式存储框架
  • RocketMQ - ali开源的消息队列
  • ElasticSearch - 搜索引擎
  • gRPC - rpc框架
  • Dubbo - rpc框架
  • Spring 5.x - flux api完全抛弃了tomcat,使用netty作为服务器端
  • Zookeeper - 分布式协调框架
1.3、Netty的优势

Netty vs NIO ,基于NIO开发工作量大,bug多:

  • 需要自己构建协议;
  • 解决TCP传输问题,如粘包、半包;
  • epoll空轮询导致CPU 100%;
  • 对API进行增强,使之更易用,如FastThreadLocal => ThreadLocal,ByteBuf => ByteBuf。

Netty vs 其他网络应用框架:

  • Mina由apache维护,将来3.x版本可能会有较大重构,破坏API向下兼容性,Netty的开发迭代更迅速,API更简洁、文档更优秀。
  • 久经考验,16年,Netty版本:2.x 2004, 3.x 2008, 4.x 2013, 5.x 已废弃(没用明显的性能提升,维护成本高)。
2、Hello World 2.1、目标

开发一个简单的服务器端和客户端:

  • 客户端向服务器端发送hello,world;
  • 服务器仅接收,不返回。

加入依赖:

pom.xml:

        ...
        
            io.netty
            netty-all
        
        
        ...
2.2、服务器端
package com.clp.netty3;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class HelloServer {
    public static void main(String[] args) {
        //1、ServerBootstrap:服务器端启动器,负责组装netty组件,启动服务器
        new ServerBootstrap()
                //2、NioEventLoopGroup(selector, thread),EventLoop中包含了线程和选择器
                //16、由某个EventLoop处理read事件,接收到ByteBuf,然后交给后续的handler
                .group(new NioEventLoopGroup()) //监听 accept read .. 事件
                //3、选择服务器的ServerSocketChannel实现
                .channel(NioServerSocketChannel.class)
                //4、boss:负责处理连接;worker:处理读写,决定了worker(child)能执行哪些 *** 作(handler)
                .childHandler(
                        //5、Channel代表和客户端进行数据读写的通道,Initializer初始化器,负责添加别的handler
                        new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //添加具体的handler
                        ch.pipeline().addLast(new StringDecoder()); //17、将ByteBuf转换为字符串,然后给下一个处理器
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //自定义handler
                            @Override //读事件
                            //18、执行channelRead()方法,打印hello
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                //打印上一步转换好的字符串
                                System.out.println(msg);
                            }
                        });
                    }
                })
                //6、绑定监听端口
                .bind(8080);
    }
}
2.3、客户端
package com.clp.netty3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        //7、创建启动器类
        new Bootstrap()
                //8、添加EventLoop
                .group(new NioEventLoopGroup())
                //9、选择客户端channel实现
                .channel(NioSocketChannel.class)
                //10、添加处理器
                .handler(new ChannelInitializer() {
                    @Override //12、在连接建立后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder()); //15、将字符串转为ByteBuf类型,然后发送给服务器端
                    }
                })
                //11、连接到服务器
                .connect(new InetSocketAddress("localhost", 8080))
                .sync() //13、阻塞方法,直到连接建立
                .channel() //代表连接对象
                //14、向服务器发送数据
                .writeAndFlush("hello world"); //发送数据
    }
}
2.4、注意的问题

一开始要树立正确的观念。

  • 把channel理解为数据的通道;
  • 把msg理解为流动的数据,最开始输入的是ByteBuf,但经过pipeline的加工,会变成其它类型对象,最后输出又变成ByteBuf;
  • 把handler理解为数据的处理工序。工序有多道,合在一起就是pipeline,pipeline负责发布事件(读、读取完成)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法);handler分为Inbound(入站)和Outbound(出站)两类。
  • 把EventLoop理解为处理数据的工人。工人可以管理多个channel的IO *** 作,并且一旦工人负责了某个channel,就要负责到底(绑定);工人既可以执行IO *** 作,也可以进行任务处理,每位工人有任务队列,队列里可以有多个channel的待处理任务,任务分为普通任务、定时任务;工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可以为每道工序指定不同的工人。
3、组件 3.1、EventLoop

1、事件循环对象

EventLoop本质是一个单线程执行器(同时维护了一个Selector),里面有run()方法处理Channel上源源不断的IO事件。

它的关系比较复杂:

  • 一条线是继承j.u.c.ScheduledExecutorService,因此包含了线程池中所有的方法;
  • 另一条线是继承自netty自己的OrderedEventExecutor。提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop;提供了parent()方法来看看自己属于哪个EventLoopGroup。

2、事件循环组

EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register()方法来绑定其中一个EventLoop,后续这个Channel上的IO事件都由此EventLoop来处理(保证了IO事件处理时的线程安全)。

继承自netty自己的EventExecutorGroup:

  • 实现了iterable接口提供的EventLoop的能力;
  • 另有next()方法获取集合中下一个EventLoop。
package com.clp.netty3;

import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

@Slf4j
public class TestEventLoop {
    public static void main(String[] args) {
        //1、创建事件循环
        EventLoopGroup group = new NioEventLoopGroup(2); //处理 io 事件、普通任务、定时任务
//        EventLoopGroup group = new DefaultEventLoop(); //处理 普通任务、定时任务

        //2、获取下一个事件循环对象
        System.out.println(group.next());
        System.out.println(group.next());
        System.out.println(group.next());

        //3、执行普通任务
        group.next().submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("ok");
        });

        //4、执行定时任务
        group.next().scheduleAtFixedRate(() -> {
            log.debug("ok");
        }, 0, 1, TimeUnit.SECONDS); //初始时间为0,间隔时间为1,单位为秒

        log.debug("main");
    }
}
package com.clp.netty3;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        //创建一个独立的EventGroup
        EventLoopGroup group = new DefaultEventLoopGroup(); //只能处理普通任务和定时任务
        new ServerBootstrap()
                //boss 和 worker
                //细分1:第一个参数:boss 只负责ServerSocketChannel上的accept事件;第二个参数:2个worker 只负责socketChannel上的读写
                .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioServerSocketChannel ch) throws Exception {
                        ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
                            @Override //ByteBuf
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(Charset.defaultCharset()));
                                ctx.fireChannelRead(msg); //将消息传递给下一个handler
                            }
                        }).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
                            @Override //ByteBuf
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8080); //绑定端口
    }
}
package com.clp.netty3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        //创建启动器类
        Channel channel = new Bootstrap()
                //添加EventLoop
                .group(new NioEventLoopGroup())
                //选择客户端channel实现
                .channel(NioSocketChannel.class)
                //添加处理器
                .handler(new ChannelInitializer() {
                    @Override //12、在连接建立后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder()); //15、将字符串转为ByteBuf类型,然后发送给服务器端
                    }
                })
                //连接到服务器
                .connect(new InetSocketAddress("localhost", 8080))
                .sync() //阻塞方法,直到连接建立
                .channel();//代表连接对象
        System.out.println(channel);
        System.out.println("");
    }
}

3、handler执行中如何换人?

3.2、Channel

channel的主要作用:

  • close()可以用来关闭channel;
  • closeFuture()用来处理用来处理channel的关闭。① sync()方法作用是同步等待channel关闭;② 而addListener()方法是异步等待channel关闭。
  • pipeline()方法添加处理器;
  • write()方法将数据写入;
  • writeAndFlush()方法将数据写入并刷出。
3.2.1、ChannelFuture

1、ChannelFuture处理结果

package com.clp.netty3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        //创建启动器类
        //带有Future、Promise的类型都是和异步方法配套使用,用来处理结果
        ChannelFuture channelFuture = new Bootstrap()
                //添加EventLoop
                .group(new NioEventLoopGroup())
                //选择客户端channel实现
                .channel(NioSocketChannel.class)
                //添加处理器
                .handler(new ChannelInitializer() {
                    @Override //在连接建立后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder()); //将字符串转为ByteBuf类型,然后发送给服务器端
                    }
                })
                //连接到服务器
                //异步非阻塞,主线程发起了调用,但是真正执行connect的是nio线程。
                .connect(new InetSocketAddress("localhost", 8080)); //假设1s 后连接成功

//        //方法1:使用sync()方法同步处理结果
//        channelFuture.sync(); //阻塞住当前线程,直到nio线程连接建立完毕
        //若不调用sync()方法,则会无阻塞向下执行,获取channel
//        Channel channel = channelFuture.channel();//代表连接对象
//        System.out.println(channel);

        //方法2:使用addListener(回调对象)方法,异步处理结果(交给nio线程处理)
        channelFuture.addListener(new ChannelFutureListener() {
            //在nio线程连接建立好之后,会调用这个方法
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                Channel channel = channelFuture.channel();
                System.out.println(channel);
            }
        });
    }
}

2、ChannelFuture关闭问题

package com.clp.netty3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.Scanner;

@Slf4j
public class CloseFutureClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        channelFuture.sync(); //等待连接建立
        Channel channel = channelFuture.channel();

        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while(true) {
                String line = scanner.nextLine();
                if("q".equals(line)) {
                    channel.close(); //异步 *** 作,交给其他线程执行关闭 *** 作
//                    log.debug("处理关闭之后的 *** 作"); //不能在这里善后
                    break;
                }
                channel.writeAndFlush(line);
            }
        }, "input").start();

        //获取ClosedFuture对象:1)同步处理关闭;2)异步处理关闭
        ChannelFuture closeFuture = channel.closeFuture();
//        //1、同步处理关闭
//        log.debug("waiting close...");
//        closeFuture.sync(); //当channel调用close()方法之后,sync()才会继续往下执行
//        log.debug("处理关闭之后的 *** 作");
        //2、异步处理关闭
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            //谁(线程)关闭谁调用
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                log.debug("处理关闭之后的 *** 作"); //通道channel关闭后...
                group.shutdownGracefully(); //关闭正在运行的线程
            }
        });

    }
}
3.2.2、为什么要异步

要点:

  • 单线程没法异步提高效率,必须配合多线程、多核CPU才能发挥异步的优势;
  • 异步并没有缩短响应时间,反而有所增加;
  • 合理进行任务拆分,也是利用异步的关键。
3.3、Future & Promise

在异步处理时,经常用到这两个接口。

首先要说明netty中的Future与jdk中的Future同名,但是是两个接口,netty的Future继承自jdk的Future,而Promise又对netty Future进行了扩展。

jdk Future <- netty Future <- promise。

  • jdk Future只能同步等待任务结束(或成功、或失败)才能得到结果;
  • netty Future可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
  • netty Promise不仅有netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。
功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过isSuccess判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addListener-添加回调,异步接收结果-
setSuccess--设置成功结果
setFailure--设置失败结果
3.3.1、JDK Future
package com.clp.netty3;

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;

@Slf4j
public class TestJdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1、创建线程池
        ExecutorService service = Executors.newFixedThreadPool(2);
        //2、提交任务
        //future相当于线程间传递数据的容器(被动等待其他线程执行完并接收其结果)
        Future future = service.submit(new Callable() {
            @Override
            public Integer call() throws Exception {
                log.debug("执行计算...");
                Thread.sleep(1000);
                return 50;
            }
        });
        //3、主线程通过future获取结果
        log.debug("等待结果...");
        //future.get():将主线程阻塞住,同步等待直到子线程执行结束
        log.debug("结果是{}", future.get());
    }
}


结果:
08:50:48.500 [pool-1-thread-1] DEBUG com.clp.netty3.TestJdkFuture - 执行计算...
08:50:48.500 [main] DEBUG com.clp.netty3.TestJdkFuture - 等待结果...
08:50:49.517 [main] DEBUG com.clp.netty3.TestJdkFuture - 结果是50
3.3.2、Netty Future
package com.clp.netty3;

import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

@Slf4j
public class TestNettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //一个EventLoopGroup里面有多个EventLoop,一个EventLoop里只有一个线程
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();

        Future future = eventLoop.submit(new Callable() {
            @Override
            public Integer call() throws Exception {
                log.debug("执行计算..");
                Thread.sleep(1000);
                return 70;
            }
        });

//        log.debug("等待结果..");
//        log.debug("结果是:{}", future.get()); //1、同步方式获取结果

        //2、异步方式获取结果
        future.addListener(new GenericFutureListener>() {
            @Override
            public void operationComplete(Future future) throws Exception {
                log.debug("接收结果:{}", future.getNow());
            }
        });
    }
}


结果:
..
09:04:42.098 [nioEventLoopGroup-2-1] DEBUG com.clp.netty3.TestNettyFuture - 执行计算..
09:04:43.099 [nioEventLoopGroup-2-1] DEBUG com.clp.netty3.TestNettyFuture - 接收结果:70
3.3.3、netty Promise
package com.clp.netty3;

import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;

@Slf4j
public class TestNettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1、准备一个EventLoop对象
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();
        //2、可以手动创建Promise对象,作为结果容器
        DefaultPromise promise = new DefaultPromise(eventLoop);

        new Thread(() -> {
            //3、任意一个线程执行计算,计算完毕后向promise对象填充结果
            try {
                int i = 1/0; //手动产生一个异常
                log.debug("开始计算..");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
                promise.setFailure(e); //如果执行失败(如1/0),可以装填异常信息
            }
            promise.setSuccess(80); //装填正确结果
        }).start();

        //4、接收结果的线程
        log.debug("等待结果..");
        log.debug("结果是:{}", promise.get());
    }
}


结果;
...
09:16:19.049 [main] DEBUG com.clp.netty3.TestNettyPromise - 等待结果..
Exception in thread "Thread-0" java.lang.ArithmeticException: / by zero
	at com.clp.netty3.TestNettyPromise.lambda$main$0(TestNettyPromise.java:23)
	at java.lang.Thread.run(Thread.java:748)
3.4、Handler & Pipeline

ChannelHandler用来处理Channel上的各种事件,分为入站、出站两种。所有ChannelHandler被连成一串,就是Pipeline。

  • 入站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果。
  • 出站处理器通常是ChannelOuntBoundHandlerAdapter的子类,主要对写回结果进行加工。

打个比喻,每个Channel是一个产品的加工车间,Pipeline是车间中的流水线,ChannelHandler就是流水线上的各道工序,而后面要讲的ByteBuf是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。

package com.clp.netty4;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;

@Slf4j
public class TestPipeline {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //1、通过channel拿到pipeline
                        ChannelPipeline pipeline = ch.pipeline();
                        //2、添加入站处理器
                        //添加处理器 head -> ... -> tail ,netty会自动在头和尾添加这两个handler(基于双向链表)
                        //流水线pipeline:head -> handler01 -> handler02 -> handler03 ->
                        // handler04 -> handler05 -> handler06 -> tail
                        pipeline.addLast("handler01",new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("1");
                                ByteBuf buf = (ByteBuf) msg;
                                String name = buf.toString(Charset.defaultCharset());
                                super.channelRead(ctx, name); //唤醒下一个入站handler,并传递执行的结果,如果不调用,调用链会断开
                                //另一个唤醒下一个入站handler并传递执行结果的方式:
                                // ctx.fireChannelRead(name); //将数据传递给下个handler,如果不调用,调用链会断开
                            }
                        });
                        pipeline.addLast("handler02",new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
                                log.debug("2");
                                Student student = new Student(name.toString());
                                super.channelRead(ctx, student); //唤醒下一个入站handler,并传递执行的结果
                            }
                        });
                        pipeline.addLast("handler03",new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("3,结果:{}, class:{}", msg, msg.getClass());
                                
                                //向channel写入数据,会触发出站处理器,会从tail往前找出站处理器
                                ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server..".getBytes()));
                                //下面这句,会从当前handler(handler03)往前找出站处理器
//                                ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server..".getBytes()));

//                                super.channelRead(ctx, msg); //不需要继续传递了,因为是最后一个入站处理器
                            }
                        });

                        //3、添加出站处理器(只有向channel里写数据才会触发)
                        //出站handler的执行顺序是倒过来的,即 6 -> 5 -> 4
                        pipeline.addLast("handler04",new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("4");
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("handler05",new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("5");
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("handler06",new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("6");
                                super.write(ctx, msg, promise);
                            }
                        });

                        //执行顺序: 1 -> 2 -> 3 -> 6 -> 5 -> 4
                    }
                })
                .bind(8080); //绑定监听端口
    }

    @Data
    @AllArgsConstructor
    static class Student {
        private String name;
    }
}

可以看到,ChannelInboundHandlerAdapter是按照addLast的顺序执行的,而ChannelOutboundAdapter是按照addLast的逆序执行的。ChannelPipeline的实现是一个ChannelHandlerContext(包装了ChannelHandler)组成的双向链表。

  • 入站处理器中,ctx.fireChannelRead(msg)是调用下一个入站处理器。
  • ctx.channel().write(msg)会从尾部开始触发后续出站处理器的执行。
  • 类似地,出站处理器中,ctx.write(msg, promise)的调用也会触发上一个出站处理器。
  • ctx.channel().write(msg) VS ctx.write(msg):① 都是触发出站处理器的执行;② ctx.channel.write(msg)从尾部开始查找出站处理器;③ ctx.write(msg)是从当前结点找上一个出站处理器。

下图是服务端pipeline触发的原始流程,图中数字代表了处理步骤的先后次序。

3.5、EmbeddedChannel测试
package com.clp.netty4;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TestEmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("1");
                super.channelRead(ctx, msg);
            }
        };
        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("2");
                super.channelRead(ctx, msg);
            }
        };
        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("3");
                super.write(ctx, msg, promise);
            }
        };
        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("4");
                super.write(ctx, msg, promise);
            }
        };

        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);

        //模拟入站 *** 作
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes(
                "hello".getBytes()
        ));

        //模拟出站 *** 作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes(
                "world".getBytes()
        ));
    }
}


结果:
10:25:44.619 [main] DEBUG com.clp.netty4.TestEmbeddedChannel - 1
10:25:44.620 [main] DEBUG com.clp.netty4.TestEmbeddedChannel - 2
10:25:44.621 [main] DEBUG com.clp.netty4.TestEmbeddedChannel - 4
10:25:44.621 [main] DEBUG com.clp.netty4.TestEmbeddedChannel - 3
3.6、ByteBuf
package com.clp.netty4;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;

public class TestByteBuf {
    public static void main(String[] args) {
        //默认实现创建一个ByteBuf,默认容量256,可动态扩容。(ByteBuffer不能动态扩容)
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();

        log(buf);
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 300; i++) {
            sb.append("a"); //一共占300B
        }
        //将字符串转为字节数组,然后写入buf
        buf.writeBytes(sb.toString().getBytes()); //300 > 256, 导致buf容量翻倍成 512
        log(buf);
    }

    //查看ByteBuf的使用情况
    public static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder bufSb = new StringBuilder(rows * 80 *2)
                .append("read index:").append(buffer.readerIndex())
                .append(" write index").append(buffer.writerIndex())
                .append(" capacity:").append(buffer.capacity())
                .append(NEWLINE);
        appendPrettyHexDump(bufSb, buffer);
        System.out.println(bufSb.toString());
    }
}


结果:
...
read index:0 write index0 capacity:256

read index:0 write index300 capacity:512
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000010| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000020| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000030| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000040| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000050| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000060| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000070| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000080| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000090| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|000000a0| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|000000b0| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|000000c0| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|000000d0| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|000000e0| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|000000f0| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000100| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000110| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000120| 61 61 61 61 61 61 61 61 61 61 61 61             |aaaaaaaaaaaa    |
+--------+-------------------------------------------------+----------------+
3.6.1、堆内存 vs 直接内存

堆内存分配效率高,但读写效率低。可以使用下面的代码来创建池化基于堆的ByteBuf:

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);

直接内存分配效率低,但读写效率高。可以使用下面的代码来创建基于直接内存的ByteBuf:

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);

  • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用。
  • 直接内存对GC压力小,因为这部分内存不受JVM垃圾回收的管理,但也要注意及时主动释放。
3.6.2、池化 vs 非池化

池化的最大意义在于可以重用ByteBuf,优点有:

  • 没有池化,则每次都得创建新的ByteBuf实例,这个 *** 作对直接内存代价昂贵,就算是堆内存,也会增加GC压力;
  • 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似地内存分配算法提升分配效率;
  • 高并发时,池化功能更节约内存,减少内存溢出的可能。

池化功能是否开启,可以通过下面的系统环境变量来设置。

-Dio.netty.allocator.type={unpooled|pooled}

  • 4.1以后,非Android平台默认启用池化实现,Android平台启用非池化实现;
  • 4.1之前,池化功能还不成熟,默认是非池化实现。
3.6.3、组成

ByteBuf由四部分组成:

最开始读写指针都在0位置。容量和最大容量之间的区域称为可扩容部分。

3.6.4、写入

方法列表,省略一些不必要的方法。

方法签名含义备注
writeBoolean(boolean value)写入boolean值用1字节 01 | 00 代表 true | false
writeByte(int value)写入byte值
writeShort(int value)写入short值
writeInt(int value)写入int值Big Endian(大端写入),即0x250,写入后00 00 02 50
writeIntLE(int value)写入int值Little Endian(小端写入),即0x250,写入后50 02 00 00
writeLong(long value)写入long值
writeChar(int value)写入char值
writeFloat(float value)写入float值
writeDouble(double value)写入double值
writeBytes(ByteBuf src)写入netty的ByteBuf
writeBytes(byte[] src)写入byte[]
writeBytes(ByteBuffer src)写入nio的ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset)写入字符串

注意:

  • 这些方法的未指明返回值的,其返回值都是ByteBuf,意味着可以链式调用。
  • 网络传输,默认习惯是Big Endian。
  • 还有一类方法是set开头的一系列方法,也可以写入数据,但不会改变指针位置。
3.6.5、扩容

若写入一些数据,导致容量不够了,这时会引发扩容。

扩容规则是:

  • 如果写入后数据大小未超过512,则选择下一个16的整数倍,例如写入后大小为12,则扩容后capacity是16;
  • 如果写入后数据大小超过512,则选择下一个2^n,例如写入后大小为513,则扩容后capacity是2^10=1024(2^9=512已经不够了);
  • 扩容不能超过max capacity,否则会报错。
3.6.6、读取

读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分。如果需要重复读取int整数5,怎么办?

//可以在read前先做个标记mark:
buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);

//如果要重复读写的话,可以重置到标记位置 reset
buffer.resetReaderIndex();
log(buffer);

还有种办法是采用get开头的一系列方法,这些方法不会改变read index。

3.6.7、retain & release

由于Netty有堆外内存的ByteBuf实现,堆外内存最好是手动来释放,而不是等GC垃圾回收。

  • UnpooledHeapByteBuf使用的是JVM内存,只需等GC回收内存即可;
  • UnpooledDirectByteBuf使用的就是直接内存了,需要特殊的方法来回收内存;
  • PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存。

回收内存的源码实现,可以关注下面方法的不同实现:

protected abstract void deallocate();

Netty这里采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口。

  • 每个ByteBuf对象的初始计数为1;
  • 调用release()方法计数减1,如果计数为0,ByteBuf内存被回收;
  • 调用retain()方法计数加1,表示调用者没用完之前,其它handler即使调用了release()也不会造成回收;
  • 当计数为0时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用。

谁来负责release呢?

因为pipeline的存在,一般需要将ByteBuf传递给下一个ChannelHandler,如果在finally中release了,就失去了传递性(当然,如果在这个ChannelHandler内这个ByteBuf已完成了它的使命,那么便无需再传递)。

基本规则是,谁是最后的使用者,谁负责release,详细分析如下:

起点,对于NIO实现来讲,在io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read()方法中首次创建ByteBuf放入pipeline(line 163 pipeline.fireChannelRead(byteBuf));

3.6.8、slice & composite

【零拷贝】的体现之一,对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发生内存复制,还是使用ByteBuf的内存,切片后的ByteBuf维护独立的read,write指针。

package com.clp.netty4;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;

public class TestSlice {
    public static void main(String[] args) {
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
        buf.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
        log(buf);

        //切片。在切片过程中,没有发生数据复制
        ByteBuf f1 = buf.slice(0, 5); //索引0-4
        f1.retain(); //引用计数+1
        ByteBuf f2 = buf.slice(5, 5); //索引5-9
        f1.retain(); //引用计数+1
        log(f1);
        log(f2);

        buf.release(); //引用计数-1
        f1.release(); //引用计数-1
        f2.release(); //最后由f2释放内存

        System.out.println("*****************************************************************");
        f1.setByte(0,'b');
        log(f1);
        log(buf);
    }

    //查看ByteBuf的使用情况
    public static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder bufSb = new StringBuilder(rows * 80 *2)
                .append("read index:").append(buffer.readerIndex())
                .append(" write index").append(buffer.writerIndex())
                .append(" capacity:").append(buffer.capacity())
                .append(NEWLINE);
        appendPrettyHexDump(bufSb, buffer);
        System.out.println(bufSb.toString());
    }
}


结果:
read index:0 write index10 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 66 67 68 69 6a                   |abcdefghij      |
+--------+-------------------------------------------------+----------------+
read index:0 write index5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65                                  |abcde           |
+--------+-------------------------------------------------+----------------+
read index:0 write index5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 67 68 69 6a                                  |fghij           |
+--------+-------------------------------------------------+----------------+
*****************************************************************
read index:0 write index5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 63 64 65                                  |bbcde           |
+--------+-------------------------------------------------+----------------+
read index:0 write index10 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 63 64 65 66 67 68 69 6a                   |bbcdefghij      |
+--------+-------------------------------------------------+----------------+
package com.clp.netty4;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;

import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;

public class TestCompositeByteBuf {
    public static void main(String[] args) {
        ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
        buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});

        ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
        buf2.writeBytes(new byte[]{6,7,8,9,10});

//        //会发生真正的数据复制
//        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
//        buffer.writeBytes(buf1).writeBytes(buf2);
//        log(buffer);

        //避免了内存的复制,但带来了复杂的维护
        CompositeByteBuf cBuffer = ByteBufAllocator.DEFAULT.compositeBuffer();
        cBuffer.addComponents(true, buf1, buf2);
        cBuffer.retain(); //引用计数+1
        log(cBuffer);
    }

    //查看ByteBuf的使用情况
    public static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder bufSb = new StringBuilder(rows * 80 *2)
                .append("read index:").append(buffer.readerIndex())
                .append(" write index").append(buffer.writerIndex())
                .append(" capacity:").append(buffer.capacity())
                .append(NEWLINE);
        appendPrettyHexDump(bufSb, buffer);
        System.out.println(bufSb.toString());
    }
}


结果:
read index:0 write index10 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a                   |..........      |
+--------+-------------------------------------------------+----------------+
3.6.9、duplicate

【零拷贝】的体现之一,就好比截取了原始ByteBuf所有内容,并且没有max capacity的限制,也是与原始ByteBuf使用同一块底层内存,只是读写指针是独立的。

3.6.10、copy

会将底层内存数据进行深拷贝,因此无论读写,都与原始ByteBuf无关。

3.6.11、Unpooled

Unpooled是一个工具类,类如其名,提供了非池化的ByteBuf创建、组合、复制等 *** 作。

这里仅介绍其跟【零拷贝】相关的wrappedBuffer方法,可以用来包装ByteBuf。

package com.clp.netty4;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;

import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;

public class TestUnpooled {
    public static void main(String[] args) {
        ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
        buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});

        ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
        buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

        //当包装ByteBuf个数超过一个时,底层使用了CompositeByteBuf避免数据的发生
        ByteBuf buffer = Unpooled.wrappedBuffer(buf1, buf2);
        log(buffer);
    }

    //查看ByteBuf的使用情况
    public static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder bufSb = new StringBuilder(rows * 80 * 2)
                .append("read index:").append(buffer.readerIndex())
                .append(" write index").append(buffer.writerIndex())
                .append(" capacity:").append(buffer.capacity())
                .append(NEWLINE);
        appendPrettyHexDump(bufSb, buffer);
        System.out.println(bufSb.toString());
    }
}


结果:
..
read index:0 write index10 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a                   |..........      |
+--------+-------------------------------------------------+----------------+
3.6.12、ByteBuf的优势
  • 池化 - 可以重用池中ByteBuf实例,更节约内存,减少内存溢出的问题;
  • 读写指针分离,不需要像ByteBuffer一样切换读写模式;
  • 可以自动扩容;
  • 支持链式调用,使用更流畅;
  • 很多地方体现零拷贝,例如slice、duplicate、CompositeByteBuf。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/726840.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存