在阻塞模型下,当channel调用accept()和read()方法时候都会导致当前线程阻塞,accept方法阻塞到有客户端发起连接并且建立好连接后才会执行之后的代码(等待连接、建立连接),read()阻塞到服务器接受完客户端的数据后才会执行之后的代码(等待数据、复制数据),在这样的场景下,若是前面的连接等待时间较长或者数据等待时间较久则可能导致之后的连接超时。
代码
// 使用 nio 来理解阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List channels = new ArrayList<>();
while (true) {
// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
log.debug("connecting...");
SocketChannel sc = ssc.accept(); // 阻塞方法,线程停止运行
log.debug("connected... {}", sc);
channels.add(sc);
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
log.debug("before read... {}", channel);
channel.read(buffer); // 阻塞方法,线程停止运行
buffer.flip();
debugRead(buffer);
buffer.clear();
log.debug("after read...{}", channel);
}
}
(二)非阻塞
在非阻塞下,若channel调用accept方法发现没有用户的连接就绪则不会阻塞而继续往下执行,read也是同理。这样如果之前的请求数据等待时间很长也不会影响之后的请求接收。但是这种非阻塞模式会一直导致线程空轮转,白白狼粪cpu资源
(三)多路复用采用事件监听机制,当没有事件发生时阻塞线程让出cpu资源,当事件就绪后执行处理事件的代码,这样的好处是可以把等待连接、等待数据的时间节省出来,专心处理已经就绪的请求,可以让一个线程在一定时间内处理完更多的请求。
代码
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
// int count = selector.selectNow();
log.debug("select count: {}", count);
// if(count <= 0) {
// continue;
// }
// 获取所有事件
Set keys = selector.selectedKeys();
// 遍历所有事件,逐一处理
Iterator iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
log.debug("{}", sc);
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
(四)对比
注意:当selector监听到事件的发生后,若是既不处理事件又不取消事件则下次事件仍会触发,这会让selector失去阻塞的功能导致线程空轮转
(一) read@Slf4j
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
// int count = selector.selectNow();
log.debug("select count: {}", count);
// if(count <= 0) {
// continue;
// }
// 获取所有事件
Set keys = selector.selectedKeys();
// 遍历所有事件,逐一处理
Iterator iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
log.debug("连接已建立: {}", sc);
} else if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
if(read == -1) { //说明用户连接正常释放
key.cancel();
sc.close();
} else {
buffer.flip();
debug(buffer);
}
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) { //用户连接异常中断
e.printStackTrace();
}
}
}
(二)write事件
有同学可能会有疑惑,write不是一个 *** 作嘛,它不需要事件监听呀,别急听我娓娓道来,首先给大家介绍一个概念,叫消息边界
消息边界在网络通信中常常出现客户端两次的请求数据挤压在了一起或者一分请求数据被拆分成两次传入到服务器,也就是粘包和半包问题,对于服务器来说就是要将每次请求的数据区分开来,避免混在一起,这也就是我们说的消息边界
处理消息边界方法:
- 客户端与服务端商量好数据大小,每次按这个大小来(容易浪费带宽)
- 客户端在每条消息后加入特定分隔符,服务端按照分隔符拆分消息(效率低)
- TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
当服务端一次写出的数据量过大导致socket缓存满的时候需要要分多次写出
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
selector.select();
Iterator iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
// 1. 向客户端发送内容
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
int write = sc.write(buffer);
// 3. write 表示实际写了多少字节
System.out.println("实际写入字节:" + write);
// 4. 如果有剩余未读字节,才需要关注写事件
SocketChannel channel = key.channel();
while (buffer.hasRemaining()) {
int num = channel.write(buffer); //返回写出的字节数
log.debug("写出:{}",num);
}
}
}
}
}
在上述代码中我们会发现服务器多次写出0个字节,这是由于数据把socket缓冲区占满后不会立即将数据写出,我们需要等到它将数据写出后才能继续往里写数据,那么这样当数据量很大迟迟没有写出的话该连接会把这个线程的其它连接都阻塞掉。
这时候我们可以监听write事件,当socket缓冲区有空间了再执行写 *** 作,否则让selector去执行其它线程的请求
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
selector.select();
Iterator iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
// 1. 向客户端发送内容
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
int write = sc.write(buffer);
// 3. write 表示实际写了多少字节
System.out.println("实际写入字节:" + write);
// 4. 如果有剩余未读字节,才需要关注写事件
if (buffer.hasRemaining()) {
// read 1 write 4
// 在原有关注事件的基础上,多关注 写事件
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
// 把 buffer 作为附件加入 sckey
sckey.attach(buffer);
}
} else if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println("实际写入字节:" + write);
if (!buffer.hasRemaining()) { // 写完了
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
}
}
}
三、零拷贝
(一)传统IO
1. java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用 *** 作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞, *** 作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu
2. 内核缓冲–>用户缓冲,此时从内核态切换成用户态,并且需要cpu参与数据拷贝
3. 用户缓冲–>socket缓冲,cpu拷贝数据,不涉及 *** 作系统状态切换
4. socket缓冲–>网卡,从用户态切换成内核态,由 *** 作系统拷贝数据
总共涉及三次状态切换和四次数据拷贝
1. channel调用transferTo(), *** 作系统从用户态切换到内核态,并将数据拷贝到内核缓冲区
**2. 内核缓冲区只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
3. 使用DMA将数据从内核缓冲区读入网卡
整个过程只涉及一次状态切换,且不利用cpu,使用小文件的传输
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)