目录
1、概述
1.1、Netty是什么
1.2、Netty的地位
1.3、Netty的优势
2、Hello World
2.1、目标
2.2、服务器端
2.3、客户端
2.4、注意的问题
3、组件
3.1、EventLoop
3.2、Channel
3.2.1、ChannelFuture
3.2.2、为什么要异步
3.3、Future & Promise
3.3.1、JDK Future
3.3.2、Netty Future
3.3.3、netty Promise
3.4、Handler & Pipeline
3.5、EmbeddedChannel测试
3.6、ByteBuf
3.6.1、堆内存 vs 直接内存
3.6.2、池化 vs 非池化
3.6.3、组成
3.6.4、写入
3.6.5、扩容
3.6.6、读取
3.6.7、retain & release
3.6.8、slice & composite
3.6.9、duplicate
3.6.10、copy
3.6.11、Unpooled
3.6.12、ByteBuf的优势
1、概述 1.1、Netty是什么
Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。
1.2、Netty的地位Netty在Java网络应用框架中的地位就好比Spring框架在JavaEE开发中的地位。以下的框架都使用了Netty,因为它们有网络通信要求:
- Cassandra - nosql数据库
- Spark - 大数据分布式计算框架
- Hadoop - 大数据分布式存储框架
- RocketMQ - ali开源的消息队列
- ElasticSearch - 搜索引擎
- gRPC - rpc框架
- Dubbo - rpc框架
- Spring 5.x - flux api完全抛弃了tomcat,使用netty作为服务器端
- Zookeeper - 分布式协调框架
Netty vs NIO ,基于NIO开发工作量大,bug多:
- 需要自己构建协议;
- 解决TCP传输问题,如粘包、半包;
- epoll空轮询导致CPU 100%;
- 对API进行增强,使之更易用,如FastThreadLocal => ThreadLocal,ByteBuf => ByteBuf。
Netty vs 其他网络应用框架:
- Mina由apache维护,将来3.x版本可能会有较大重构,破坏API向下兼容性,Netty的开发迭代更迅速,API更简洁、文档更优秀。
- 久经考验,16年,Netty版本:2.x 2004, 3.x 2008, 4.x 2013, 5.x 已废弃(没用明显的性能提升,维护成本高)。
开发一个简单的服务器端和客户端:
- 客户端向服务器端发送hello,world;
- 服务器仅接收,不返回。
加入依赖:
pom.xml:
...
io.netty
netty-all
...
2.2、服务器端
package com.clp.netty3;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class HelloServer {
public static void main(String[] args) {
//1、ServerBootstrap:服务器端启动器,负责组装netty组件,启动服务器
new ServerBootstrap()
//2、NioEventLoopGroup(selector, thread),EventLoop中包含了线程和选择器
//16、由某个EventLoop处理read事件,接收到ByteBuf,然后交给后续的handler
.group(new NioEventLoopGroup()) //监听 accept read .. 事件
//3、选择服务器的ServerSocketChannel实现
.channel(NioServerSocketChannel.class)
//4、boss:负责处理连接;worker:处理读写,决定了worker(child)能执行哪些 *** 作(handler)
.childHandler(
//5、Channel代表和客户端进行数据读写的通道,Initializer初始化器,负责添加别的handler
new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//添加具体的handler
ch.pipeline().addLast(new StringDecoder()); //17、将ByteBuf转换为字符串,然后给下一个处理器
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //自定义handler
@Override //读事件
//18、执行channelRead()方法,打印hello
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//打印上一步转换好的字符串
System.out.println(msg);
}
});
}
})
//6、绑定监听端口
.bind(8080);
}
}
2.3、客户端
package com.clp.netty3;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
//7、创建启动器类
new Bootstrap()
//8、添加EventLoop
.group(new NioEventLoopGroup())
//9、选择客户端channel实现
.channel(NioSocketChannel.class)
//10、添加处理器
.handler(new ChannelInitializer() {
@Override //12、在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()); //15、将字符串转为ByteBuf类型,然后发送给服务器端
}
})
//11、连接到服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync() //13、阻塞方法,直到连接建立
.channel() //代表连接对象
//14、向服务器发送数据
.writeAndFlush("hello world"); //发送数据
}
}
2.4、注意的问题
一开始要树立正确的观念。
- 把channel理解为数据的通道;
- 把msg理解为流动的数据,最开始输入的是ByteBuf,但经过pipeline的加工,会变成其它类型对象,最后输出又变成ByteBuf;
- 把handler理解为数据的处理工序。工序有多道,合在一起就是pipeline,pipeline负责发布事件(读、读取完成)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法);handler分为Inbound(入站)和Outbound(出站)两类。
- 把EventLoop理解为处理数据的工人。工人可以管理多个channel的IO *** 作,并且一旦工人负责了某个channel,就要负责到底(绑定);工人既可以执行IO *** 作,也可以进行任务处理,每位工人有任务队列,队列里可以有多个channel的待处理任务,任务分为普通任务、定时任务;工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可以为每道工序指定不同的工人。
1、事件循环对象
EventLoop本质是一个单线程执行器(同时维护了一个Selector),里面有run()方法处理Channel上源源不断的IO事件。
它的关系比较复杂:
- 一条线是继承j.u.c.ScheduledExecutorService,因此包含了线程池中所有的方法;
- 另一条线是继承自netty自己的OrderedEventExecutor。提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop;提供了parent()方法来看看自己属于哪个EventLoopGroup。
2、事件循环组
EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register()方法来绑定其中一个EventLoop,后续这个Channel上的IO事件都由此EventLoop来处理(保证了IO事件处理时的线程安全)。
继承自netty自己的EventExecutorGroup:
- 实现了iterable接口提供的EventLoop的能力;
- 另有next()方法获取集合中下一个EventLoop。
package com.clp.netty3;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
public class TestEventLoop {
public static void main(String[] args) {
//1、创建事件循环
EventLoopGroup group = new NioEventLoopGroup(2); //处理 io 事件、普通任务、定时任务
// EventLoopGroup group = new DefaultEventLoop(); //处理 普通任务、定时任务
//2、获取下一个事件循环对象
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
//3、执行普通任务
group.next().submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("ok");
});
//4、执行定时任务
group.next().scheduleAtFixedRate(() -> {
log.debug("ok");
}, 0, 1, TimeUnit.SECONDS); //初始时间为0,间隔时间为1,单位为秒
log.debug("main");
}
}
package com.clp.netty3;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
//创建一个独立的EventGroup
EventLoopGroup group = new DefaultEventLoopGroup(); //只能处理普通任务和定时任务
new ServerBootstrap()
//boss 和 worker
//细分1:第一个参数:boss 只负责ServerSocketChannel上的accept事件;第二个参数:2个worker 只负责socketChannel上的读写
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override //ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg); //将消息传递给下一个handler
}
}).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
@Override //ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080); //绑定端口
}
}
package com.clp.netty3;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
//创建启动器类
Channel channel = new Bootstrap()
//添加EventLoop
.group(new NioEventLoopGroup())
//选择客户端channel实现
.channel(NioSocketChannel.class)
//添加处理器
.handler(new ChannelInitializer() {
@Override //12、在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()); //15、将字符串转为ByteBuf类型,然后发送给服务器端
}
})
//连接到服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync() //阻塞方法,直到连接建立
.channel();//代表连接对象
System.out.println(channel);
System.out.println("");
}
}
3、handler执行中如何换人?
3.2、Channelchannel的主要作用:
- close()可以用来关闭channel;
- closeFuture()用来处理用来处理channel的关闭。① sync()方法作用是同步等待channel关闭;② 而addListener()方法是异步等待channel关闭。
- pipeline()方法添加处理器;
- write()方法将数据写入;
- writeAndFlush()方法将数据写入并刷出。
1、ChannelFuture处理结果
package com.clp.netty3;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
//创建启动器类
//带有Future、Promise的类型都是和异步方法配套使用,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
//添加EventLoop
.group(new NioEventLoopGroup())
//选择客户端channel实现
.channel(NioSocketChannel.class)
//添加处理器
.handler(new ChannelInitializer() {
@Override //在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()); //将字符串转为ByteBuf类型,然后发送给服务器端
}
})
//连接到服务器
//异步非阻塞,主线程发起了调用,但是真正执行connect的是nio线程。
.connect(new InetSocketAddress("localhost", 8080)); //假设1s 后连接成功
// //方法1:使用sync()方法同步处理结果
// channelFuture.sync(); //阻塞住当前线程,直到nio线程连接建立完毕
//若不调用sync()方法,则会无阻塞向下执行,获取channel
// Channel channel = channelFuture.channel();//代表连接对象
// System.out.println(channel);
//方法2:使用addListener(回调对象)方法,异步处理结果(交给nio线程处理)
channelFuture.addListener(new ChannelFutureListener() {
//在nio线程连接建立好之后,会调用这个方法
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
System.out.println(channel);
}
});
}
}
2、ChannelFuture关闭问题
package com.clp.netty3;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Scanner;
@Slf4j
public class CloseFutureClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
channelFuture.sync(); //等待连接建立
Channel channel = channelFuture.channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while(true) {
String line = scanner.nextLine();
if("q".equals(line)) {
channel.close(); //异步 *** 作,交给其他线程执行关闭 *** 作
// log.debug("处理关闭之后的 *** 作"); //不能在这里善后
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
//获取ClosedFuture对象:1)同步处理关闭;2)异步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
// //1、同步处理关闭
// log.debug("waiting close...");
// closeFuture.sync(); //当channel调用close()方法之后,sync()才会继续往下执行
// log.debug("处理关闭之后的 *** 作");
//2、异步处理关闭
closeFuture.addListener(new ChannelFutureListener() {
@Override
//谁(线程)关闭谁调用
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.debug("处理关闭之后的 *** 作"); //通道channel关闭后...
group.shutdownGracefully(); //关闭正在运行的线程
}
});
}
}
3.2.2、为什么要异步
要点:
- 单线程没法异步提高效率,必须配合多线程、多核CPU才能发挥异步的优势;
- 异步并没有缩短响应时间,反而有所增加;
- 合理进行任务拆分,也是利用异步的关键。
在异步处理时,经常用到这两个接口。
首先要说明netty中的Future与jdk中的Future同名,但是是两个接口,netty的Future继承自jdk的Future,而Promise又对netty Future进行了扩展。
jdk Future <- netty Future <- promise。
- jdk Future只能同步等待任务结束(或成功、或失败)才能得到结果;
- netty Future可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
- netty Promise不仅有netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过isSuccess判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addListener | - | 添加回调,异步接收结果 | - |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
package com.clp.netty3;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class TestJdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1、创建线程池
ExecutorService service = Executors.newFixedThreadPool(2);
//2、提交任务
//future相当于线程间传递数据的容器(被动等待其他线程执行完并接收其结果)
Future future = service.submit(new Callable() {
@Override
public Integer call() throws Exception {
log.debug("执行计算...");
Thread.sleep(1000);
return 50;
}
});
//3、主线程通过future获取结果
log.debug("等待结果...");
//future.get():将主线程阻塞住,同步等待直到子线程执行结束
log.debug("结果是{}", future.get());
}
}
结果:
08:50:48.500 [pool-1-thread-1] DEBUG com.clp.netty3.TestJdkFuture - 执行计算...
08:50:48.500 [main] DEBUG com.clp.netty3.TestJdkFuture - 等待结果...
08:50:49.517 [main] DEBUG com.clp.netty3.TestJdkFuture - 结果是50
3.3.2、Netty Future
package com.clp.netty3;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//一个EventLoopGroup里面有多个EventLoop,一个EventLoop里只有一个线程
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
Future future = eventLoop.submit(new Callable() {
@Override
public Integer call() throws Exception {
log.debug("执行计算..");
Thread.sleep(1000);
return 70;
}
});
// log.debug("等待结果..");
// log.debug("结果是:{}", future.get()); //1、同步方式获取结果
//2、异步方式获取结果
future.addListener(new GenericFutureListener>() {
@Override
public void operationComplete(Future super Integer> future) throws Exception {
log.debug("接收结果:{}", future.getNow());
}
});
}
}
结果:
..
09:04:42.098 [nioEventLoopGroup-2-1] DEBUG com.clp.netty3.TestNettyFuture - 执行计算..
09:04:43.099 [nioEventLoopGroup-2-1] DEBUG com.clp.netty3.TestNettyFuture - 接收结果:70
3.3.3、netty Promise
package com.clp.netty3;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
@Slf4j
public class TestNettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1、准备一个EventLoop对象
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
//2、可以手动创建Promise对象,作为结果容器
DefaultPromise promise = new DefaultPromise(eventLoop);
new Thread(() -> {
//3、任意一个线程执行计算,计算完毕后向promise对象填充结果
try {
int i = 1/0; //手动产生一个异常
log.debug("开始计算..");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
promise.setFailure(e); //如果执行失败(如1/0),可以装填异常信息
}
promise.setSuccess(80); //装填正确结果
}).start();
//4、接收结果的线程
log.debug("等待结果..");
log.debug("结果是:{}", promise.get());
}
}
结果;
...
09:16:19.049 [main] DEBUG com.clp.netty3.TestNettyPromise - 等待结果..
Exception in thread "Thread-0" java.lang.ArithmeticException: / by zero
at com.clp.netty3.TestNettyPromise.lambda$main$0(TestNettyPromise.java:23)
at java.lang.Thread.run(Thread.java:748)
3.4、Handler & Pipeline
ChannelHandler用来处理Channel上的各种事件,分为入站、出站两种。所有ChannelHandler被连成一串,就是Pipeline。
- 入站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果。
- 出站处理器通常是ChannelOuntBoundHandlerAdapter的子类,主要对写回结果进行加工。
打个比喻,每个Channel是一个产品的加工车间,Pipeline是车间中的流水线,ChannelHandler就是流水线上的各道工序,而后面要讲的ByteBuf是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。
package com.clp.netty4;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//1、通过channel拿到pipeline
ChannelPipeline pipeline = ch.pipeline();
//2、添加入站处理器
//添加处理器 head -> ... -> tail ,netty会自动在头和尾添加这两个handler(基于双向链表)
//流水线pipeline:head -> handler01 -> handler02 -> handler03 ->
// handler04 -> handler05 -> handler06 -> tail
pipeline.addLast("handler01",new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
ByteBuf buf = (ByteBuf) msg;
String name = buf.toString(Charset.defaultCharset());
super.channelRead(ctx, name); //唤醒下一个入站handler,并传递执行的结果,如果不调用,调用链会断开
//另一个唤醒下一个入站handler并传递执行结果的方式:
// ctx.fireChannelRead(name); //将数据传递给下个handler,如果不调用,调用链会断开
}
});
pipeline.addLast("handler02",new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
log.debug("2");
Student student = new Student(name.toString());
super.channelRead(ctx, student); //唤醒下一个入站handler,并传递执行的结果
}
});
pipeline.addLast("handler03",new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3,结果:{}, class:{}", msg, msg.getClass());
//向channel写入数据,会触发出站处理器,会从tail往前找出站处理器
ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server..".getBytes()));
//下面这句,会从当前handler(handler03)往前找出站处理器
// ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server..".getBytes()));
// super.channelRead(ctx, msg); //不需要继续传递了,因为是最后一个入站处理器
}
});
//3、添加出站处理器(只有向channel里写数据才会触发)
//出站handler的执行顺序是倒过来的,即 6 -> 5 -> 4
pipeline.addLast("handler04",new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("handler05",new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("handler06",new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
//执行顺序: 1 -> 2 -> 3 -> 6 -> 5 -> 4
}
})
.bind(8080); //绑定监听端口
}
@Data
@AllArgsConstructor
static class Student {
private String name;
}
}
可以看到,ChannelInboundHandlerAdapter是按照addLast的顺序执行的,而ChannelOutboundAdapter是按照addLast的逆序执行的。ChannelPipeline的实现是一个ChannelHandlerContext(包装了ChannelHandler)组成的双向链表。
- 入站处理器中,ctx.fireChannelRead(msg)是调用下一个入站处理器。
- ctx.channel().write(msg)会从尾部开始触发后续出站处理器的执行。
- 类似地,出站处理器中,ctx.write(msg, promise)的调用也会触发上一个出站处理器。
- ctx.channel().write(msg) VS ctx.write(msg):① 都是触发出站处理器的执行;② ctx.channel.write(msg)从尾部开始查找出站处理器;③ ctx.write(msg)是从当前结点找上一个出站处理器。
下图是服务端pipeline触发的原始流程,图中数字代表了处理步骤的先后次序。
3.5、EmbeddedChannel测试package com.clp.netty4;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
//模拟入站 *** 作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes(
"hello".getBytes()
));
//模拟出站 *** 作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes(
"world".getBytes()
));
}
}
结果:
10:25:44.619 [main] DEBUG com.clp.netty4.TestEmbeddedChannel - 1
10:25:44.620 [main] DEBUG com.clp.netty4.TestEmbeddedChannel - 2
10:25:44.621 [main] DEBUG com.clp.netty4.TestEmbeddedChannel - 4
10:25:44.621 [main] DEBUG com.clp.netty4.TestEmbeddedChannel - 3
3.6、ByteBuf
package com.clp.netty4;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;
public class TestByteBuf {
public static void main(String[] args) {
//默认实现创建一个ByteBuf,默认容量256,可动态扩容。(ByteBuffer不能动态扩容)
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
log(buf);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 300; i++) {
sb.append("a"); //一共占300B
}
//将字符串转为字节数组,然后写入buf
buf.writeBytes(sb.toString().getBytes()); //300 > 256, 导致buf容量翻倍成 512
log(buf);
}
//查看ByteBuf的使用情况
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder bufSb = new StringBuilder(rows * 80 *2)
.append("read index:").append(buffer.readerIndex())
.append(" write index").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(bufSb, buffer);
System.out.println(bufSb.toString());
}
}
结果:
...
read index:0 write index0 capacity:256
read index:0 write index300 capacity:512
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000010| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000020| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000030| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000040| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000050| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000060| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000070| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000080| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000090| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|000000a0| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|000000b0| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|000000c0| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|000000d0| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|000000e0| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|000000f0| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000100| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000110| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000120| 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaa |
+--------+-------------------------------------------------+----------------+
3.6.1、堆内存 vs 直接内存
堆内存分配效率高,但读写效率低。可以使用下面的代码来创建池化基于堆的ByteBuf:
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
直接内存分配效率低,但读写效率高。可以使用下面的代码来创建基于直接内存的ByteBuf:
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用。
- 直接内存对GC压力小,因为这部分内存不受JVM垃圾回收的管理,但也要注意及时主动释放。
池化的最大意义在于可以重用ByteBuf,优点有:
- 没有池化,则每次都得创建新的ByteBuf实例,这个 *** 作对直接内存代价昂贵,就算是堆内存,也会增加GC压力;
- 有了池化,则可以重用池中ByteBuf实例,并且采用了与jemalloc类似地内存分配算法提升分配效率;
- 高并发时,池化功能更节约内存,减少内存溢出的可能。
池化功能是否开启,可以通过下面的系统环境变量来设置。
-Dio.netty.allocator.type={unpooled|pooled}
- 4.1以后,非Android平台默认启用池化实现,Android平台启用非池化实现;
- 4.1之前,池化功能还不成熟,默认是非池化实现。
ByteBuf由四部分组成:
最开始读写指针都在0位置。容量和最大容量之间的区域称为可扩容部分。
3.6.4、写入方法列表,省略一些不必要的方法。
方法签名 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) | 写入boolean值 | 用1字节 01 | 00 代表 true | false |
writeByte(int value) | 写入byte值 | |
writeShort(int value) | 写入short值 | |
writeInt(int value) | 写入int值 | Big Endian(大端写入),即0x250,写入后00 00 02 50 |
writeIntLE(int value) | 写入int值 | Little Endian(小端写入),即0x250,写入后50 02 00 00 |
writeLong(long value) | 写入long值 | |
writeChar(int value) | 写入char值 | |
writeFloat(float value) | 写入float值 | |
writeDouble(double value) | 写入double值 | |
writeBytes(ByteBuf src) | 写入netty的ByteBuf | |
writeBytes(byte[] src) | 写入byte[] | |
writeBytes(ByteBuffer src) | 写入nio的ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 |
注意:
- 这些方法的未指明返回值的,其返回值都是ByteBuf,意味着可以链式调用。
- 网络传输,默认习惯是Big Endian。
- 还有一类方法是set开头的一系列方法,也可以写入数据,但不会改变指针位置。
若写入一些数据,导致容量不够了,这时会引发扩容。
扩容规则是:
- 如果写入后数据大小未超过512,则选择下一个16的整数倍,例如写入后大小为12,则扩容后capacity是16;
- 如果写入后数据大小超过512,则选择下一个2^n,例如写入后大小为513,则扩容后capacity是2^10=1024(2^9=512已经不够了);
- 扩容不能超过max capacity,否则会报错。
读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分。如果需要重复读取int整数5,怎么办?
//可以在read前先做个标记mark:
buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);
//如果要重复读写的话,可以重置到标记位置 reset
buffer.resetReaderIndex();
log(buffer);
还有种办法是采用get开头的一系列方法,这些方法不会改变read index。
3.6.7、retain & release由于Netty有堆外内存的ByteBuf实现,堆外内存最好是手动来释放,而不是等GC垃圾回收。
- UnpooledHeapByteBuf使用的是JVM内存,只需等GC回收内存即可;
- UnpooledDirectByteBuf使用的就是直接内存了,需要特殊的方法来回收内存;
- PooledByteBuf和它的子类使用了池化机制,需要更复杂的规则来回收内存。
回收内存的源码实现,可以关注下面方法的不同实现:
protected abstract void deallocate();
Netty这里采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口。
- 每个ByteBuf对象的初始计数为1;
- 调用release()方法计数减1,如果计数为0,ByteBuf内存被回收;
- 调用retain()方法计数加1,表示调用者没用完之前,其它handler即使调用了release()也不会造成回收;
- 当计数为0时,底层内存会被回收,这时即使ByteBuf对象还在,其各个方法均无法正常使用。
谁来负责release呢?
因为pipeline的存在,一般需要将ByteBuf传递给下一个ChannelHandler,如果在finally中release了,就失去了传递性(当然,如果在这个ChannelHandler内这个ByteBuf已完成了它的使命,那么便无需再传递)。
基本规则是,谁是最后的使用者,谁负责release,详细分析如下:
起点,对于NIO实现来讲,在io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read()方法中首次创建ByteBuf放入pipeline(line 163 pipeline.fireChannelRead(byteBuf));
3.6.8、slice & composite【零拷贝】的体现之一,对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发生内存复制,还是使用ByteBuf的内存,切片后的ByteBuf维护独立的read,write指针。
package com.clp.netty4;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;
public class TestSlice {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
log(buf);
//切片。在切片过程中,没有发生数据复制
ByteBuf f1 = buf.slice(0, 5); //索引0-4
f1.retain(); //引用计数+1
ByteBuf f2 = buf.slice(5, 5); //索引5-9
f1.retain(); //引用计数+1
log(f1);
log(f2);
buf.release(); //引用计数-1
f1.release(); //引用计数-1
f2.release(); //最后由f2释放内存
System.out.println("*****************************************************************");
f1.setByte(0,'b');
log(f1);
log(buf);
}
//查看ByteBuf的使用情况
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder bufSb = new StringBuilder(rows * 80 *2)
.append("read index:").append(buffer.readerIndex())
.append(" write index").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(bufSb, buffer);
System.out.println(bufSb.toString());
}
}
结果:
read index:0 write index10 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 66 67 68 69 6a |abcdefghij |
+--------+-------------------------------------------------+----------------+
read index:0 write index5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 62 63 64 65 |abcde |
+--------+-------------------------------------------------+----------------+
read index:0 write index5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 66 67 68 69 6a |fghij |
+--------+-------------------------------------------------+----------------+
*****************************************************************
read index:0 write index5 capacity:5
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 63 64 65 |bbcde |
+--------+-------------------------------------------------+----------------+
read index:0 write index10 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 63 64 65 66 67 68 69 6a |bbcdefghij |
+--------+-------------------------------------------------+----------------+
package com.clp.netty4;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;
public class TestCompositeByteBuf {
public static void main(String[] args) {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
buf2.writeBytes(new byte[]{6,7,8,9,10});
// //会发生真正的数据复制
// ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// buffer.writeBytes(buf1).writeBytes(buf2);
// log(buffer);
//避免了内存的复制,但带来了复杂的维护
CompositeByteBuf cBuffer = ByteBufAllocator.DEFAULT.compositeBuffer();
cBuffer.addComponents(true, buf1, buf2);
cBuffer.retain(); //引用计数+1
log(cBuffer);
}
//查看ByteBuf的使用情况
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder bufSb = new StringBuilder(rows * 80 *2)
.append("read index:").append(buffer.readerIndex())
.append(" write index").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(bufSb, buffer);
System.out.println(bufSb.toString());
}
}
结果:
read index:0 write index10 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+
3.6.9、duplicate
【零拷贝】的体现之一,就好比截取了原始ByteBuf所有内容,并且没有max capacity的限制,也是与原始ByteBuf使用同一块底层内存,只是读写指针是独立的。
3.6.10、copy会将底层内存数据进行深拷贝,因此无论读写,都与原始ByteBuf无关。
3.6.11、UnpooledUnpooled是一个工具类,类如其名,提供了非池化的ByteBuf创建、组合、复制等 *** 作。
这里仅介绍其跟【零拷贝】相关的wrappedBuffer方法,可以用来包装ByteBuf。
package com.clp.netty4;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;
public class TestUnpooled {
public static void main(String[] args) {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
//当包装ByteBuf个数超过一个时,底层使用了CompositeByteBuf避免数据的发生
ByteBuf buffer = Unpooled.wrappedBuffer(buf1, buf2);
log(buffer);
}
//查看ByteBuf的使用情况
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder bufSb = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(bufSb, buffer);
System.out.println(bufSb.toString());
}
}
结果:
..
read index:0 write index10 capacity:10
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a |.......... |
+--------+-------------------------------------------------+----------------+
3.6.12、ByteBuf的优势
- 池化 - 可以重用池中ByteBuf实例,更节约内存,减少内存溢出的问题;
- 读写指针分离,不需要像ByteBuffer一样切换读写模式;
- 可以自动扩容;
- 支持链式调用,使用更流畅;
- 很多地方体现零拷贝,例如slice、duplicate、CompositeByteBuf。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)