浅谈Java-Nio

浅谈Java-Nio,第1张

浅谈Java-Nio 浅谈Java-Nio

Nio于jdk1.4引入,不同于Bio,Nio(No-Blocking)意为非阻塞IO,Nio是基于通道的,面向缓冲的io。其非阻塞和多路复用的特性使其能够支持高负载,高并发的网络应用netty,mina等底层都是使用的Nio

阻塞和非阻塞

我会用几段Nio的代码来展示下阻塞和非阻塞的区别

阻塞

服务端:

public class Bserver {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8080));
        ByteBuffer buffer = ByteBuffer.allocate(16);
        List list = new ArrayList<>();
        while (true){
            System.out.println("--connecting---");
            SocketChannel sc = serverSocketChannel.accept();
            System.out.println("--connected--- sc: " + sc);
            list.add(sc);
            for (SocketChannel socketChannel : list) {
                System.out.println("before read --- sc: " + socketChannel);
                socketChannel.read(buffer);
                buffer.flip();
                System.out.println(buffer.toString());
                buffer.clear();
                System.out.println("after read --- sc: " + socketChannel);
            }
        }
    }
}

客户端:

public class Bclient {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",7777));
        System.out.println("waiting---");
    }
}

代码很简单,Nio默认是阻塞的,启动server端,发现server端阻塞在accept,等待客户端链接

然后以debug的方式启动客户端,发现此时阻塞在来read方法来,等待读取数据

然后此时我们在启动一个新的客户端的时候就会发现,server端一直没有反应,这是因为其阻塞在了read方法,没法去处理新的链接,只有读取了数据,循环走到了accept才会处理新的链接。

通过上面的现象我们可以看出,在单线程中,阻塞的模式下,没法正常的工作,读取的时候没法处理另一个链接,处理链接的时候就没法读取另一个客户端数据。解决的办法就是使用多线程,一个线程处理一个客户端。但是多线程中也有一些问题

32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接 非阻塞

实现非阻塞的方式也很简单,server端修改几行代码即可,客户端不变

public class NBserver {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(7777));
        //设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        ByteBuffer buffer = ByteBuffer.allocate(16);
        List list = new ArrayList<>();
        while (true) {
            System.out.println("--connecting---");
            SocketChannel sc = serverSocketChannel.accept();
            if (sc != null){
                System.out.println("--connected--- sc: " + sc);
                sc.configureBlocking(false);
                list.add(sc);
            }

            for (SocketChannel socketChannel : list) {
                System.out.println("before read --- sc: " + socketChannel);
                int read = socketChannel.read(buffer);
                if (read > 0) {
                    buffer.flip();
                    System.out.println(buffer.toString());
                    buffer.clear();
                    System.out.println("after read --- sc: " + socketChannel);
                }
            }
        }
    }
}

还是启动server端,可以看见一直打印connecting,这是因为在非阻塞模式下,aceept不会被阻塞,如果没有连接,则会返回null,所以非阻塞模式不会阻塞,但是会一直循环去检查是否有新的连接

当有一个新连接进来时,read的也是不会阻塞,如果没有数据则会返回0,但是也是会一直循环检查是否有数据可以读

非阻塞模式下,accept和read就不会互相干扰来,即使在单线程的情况下也能够处理多个连接。

但是非阻塞模式下有个不足,即使没有连接建立,和可读数据,线程仍然在不断运行,一直在空转,白白浪费了cpu。解决办法就是待会会介绍的Nio组件之一的selector(多路复用)

Nio三大组件 Channel

channel(通道),channel跟Bio中的stream比较相似,但是channel是全双工的,读写双向,而且channel是不存储数据的,只是个通道,数据是存储在buffer(缓冲区)中,channel从buffer中读取数据和将数据写入buffer

常见的 Channel 有

FileChannel (文件)DatagramChannel (UDP)SocketChannel (TCP)ServerSocketChannel (TCP)

本次只介绍ServerSocketChannel和SocketChannel

ServerSocketChannel 可以创建一个服务端,监听一个端口,来获取连接

        //通过open创建一个ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //绑定一个端口
        serverSocketChannel.bind(new InetSocketAddress(7777));
        //设置为非阻塞模式,默认为阻塞
        serverSocketChannel.configureBlocking(false);
        //监听连接,返回值是一个SocketChannel
        SocketChannel sc = serverSocketChannel.accept();

SocketChannel 可以创建一个客户端,连接服务端,读写数据

        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",7777));
        ByteBuffer buffer = ByteBuffer.allocate(16);
        //往channel
        sc.write(Charset.defaultCharset().encode("hi!"));
        sc.read(buffer);
Buffer

buffer(缓冲区),用来缓冲读写数据的地方,常见的 buffer 有

ByteBuffer

DirectByteBufferHeapByteBuffer

当然还有其他类型的buffer,但是最常用的还是ByteBuffer,常用的有如下的两种方法

        //HeapByteBuffer
        ByteBuffer buffer = ByteBuffer.allocate(16);
        //DirectByteBuffer
        ByteBuffer direct = ByteBuffer.allocateDirect(16);

HeapByteBuffer 和 DirectByteBuffer 的区别是,heap是在jvm堆内创建的,而direct是直接在机器内存中创建的,两者各有特点,但是direct可以少一次数据拷贝的过程。

向buffer写入数据

//channel向buffer写入数据
socketChannel.read(buffer);
buffer.put((byte)'a');
buffer.out(new byte[2]);

从buffer读取数据

socketChannel.write(Charset.defaultCharset().encode("hi"));
buffer.get();
buffer.get(new byte[2]);
buffer.get(index);

Buffer的几个属性

    private int mark = -1;
    private int position = 0;
    private int limit;
    private int capacity;

上面的四个属性决定了Buffer的读取和写入位置,在源码中有一段注释,mark <= position <= limit <= capacity ,说明了各个属性之间的关系,最大不会超过capacity,接着进入源码看看,在创建buffer的时候,每个属性的值都是多少

    public static ByteBuffer allocate(int capacity) {
        if (capacity < 0)
            throw new IllegalArgumentException();
        return new HeapByteBuffer(capacity, capacity);
    }
    //中间跳过了几步
    Buffer(int mark, int pos, int lim, int cap) {       // package-private
        if (cap < 0)
            throw new IllegalArgumentException("Negative capacity: " + cap);
        this.capacity = cap;
        limit(lim);
        position(pos);
        if (mark >= 0) {
            if (mark > pos)
                throw new IllegalArgumentException("mark > position: ("
                                                   + mark + " > " + pos + ")");
            this.mark = mark;
        }
    }

通过源码可以知道,初始时limit和capacity的大小是一样的,mark为-1,position为0,limit表示能够读取和写入多少数据

初始化时

往buffer中写入三个字节后,position指向的是下一个字节写入的位置

当我们想要读取buffer中的数据时,我们可以使用flip将buffer切换成读模式。

    public final Buffer flip() {
        limit = position;
        position = 0;
        mark = -1;
        return this;
    }

通过源码可知,调用flip后,limit变成当成的position,position置为0,mark为-1

然后就可以读取数据

,如果position等于limit说明数据已经全部读完,如果此时在get就会报错

    final int nextGetIndex() {                          // package-private
        int p = position;
        if (p >= limit)
            throw new BufferUnderflowException();
        position = p + 1;
        return p;
    }

数据读取完只会,就可以调用clear,让position和limit变成一开始的状态

    public final Buffer clear() {
        position = 0;
        limit = capacity;
        mark = -1;
        return this;
    }

虽然buffer里面的数据没有被清除,但是往里面写数据的时候,他们会被新数据给覆盖掉。

还有一个mark属性,这个属性一般是配合mark()和reset()一起使用,表示记录当前写到或者读到的位置,然后下次可以回到记录的位置,继续重复读或者覆盖写

  public final Buffer mark() {
        mark = position;
        return this;
    }
  public final Buffer reset() {
        int m = mark;
        if (m < 0)
            throw new InvalidMarkException();
        position = m;
        return this;
    }
Selector

selector(选择器)也被称为多路复用,它的作用就是让Nio不用一直循环判断,它有一个事件的概念,它可以监听注册到它身上的channel的事件,如果有对应的事件发生,才会将其交给对应的channel去处理,如果没有事件发生,就会阻塞住,避免cpu空转。

事件总共有四种

OP_ACCEPT (接收事件)

OP_ConNECT (连接事件)

OP_WRITE (可写事件)

OP_READ (可读事件)

selector只能监听注册的事件,所以需要先将channel注册到selector,同时告诉selector这个channel关心哪些事件

//创建selector
Selector selector = Selector.open();
//注册selector,0:不关注任何事件
SelectionKey sscKey = serverSocketChannel.register(selector, 0, null);

注册后返回的SelectionKey,表示channel在selector的标识,每个channel向Selector注册时,都将会创建一个SelectionKey,选择键将channel与Selector建立了关系,并维护了channel事件.

所有的事件都是用一个不为0的整形表示,0表示不关心任何事件,可以在注册的时候就表明关心的事件,或者通过如下注册

sscKey.interestOps(SelectionKey.OP_ACCEPT);

SelectionKey里面使用

    private volatile int interestOps;
    private int readyOps;

来分别维护感兴趣事件集合和就绪事件结合,如何用一个整型来维护一个集合呢,就是通过位运算,比如想同时关心accept和read,可以这样

sscKey.interestOps(SelectionKey.OP_ACCEPT| SelectionKey.OP_READ);

如果要继续关心write事件,可以这样

sscKey.interestOps(sscKey.interestOps()| SelectionKey.OP_WRITE)

将当前感兴趣的事件和写事件作位运算。但不能想如下来进行连续的设置感兴趣事件

        sscKey.interestOps(SelectionKey.OP_ACCEPT);
        sscKey.interestOps(SelectionKey.OP_WRITE);
        sscKey.interestOps(SelectionKey.OP_READ);

通过源码可知,上面的连续 *** 作,最后只会关心读事件

Selector通过select()来监听是否有事件发生,如果没有就一直阻塞,当有事件发生时,就可以通过selector.selectedKeys() 来获取当前所有有事件发生的channel的SelectionKey,接着就可以通过迭代器的方式遍历每一个key,判断key触发的事件再来做具体的处理,整个具体的代码如下

public class NBserver {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(7777));
        serverSocketChannel.configureBlocking(false);
        //创建一个selector
        Selector selector = Selector.open();
        //注册accept事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT,null);
        while (true) {
            selector.select();
            Iterator iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                //记得取消事件
                iterator.remove();
                if (key.isAcceptable()){
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);
                    System.out.println("new connect: " + sc);
                    //注册读事件
                    sc.register(selector, SelectionKey.OP_READ,null);
                }else if(key.isReadable()){
                    //当触发的事件是读事件时
                    ByteBuffer buffer = ByteBuffer.allocate(16);
                    SocketChannel channel = (SocketChannel) key.channel();
                    channel.read(buffer);
                    ByteBufferUtil.debugAll(buffer);
                }
            }
        }
    }
}

有几个地方是需要注意的,selectedKeys 只会加元素,不会自动删除,所以当我们在处理完一个key的时候,需要把该key给取消掉,否则selectedKeys就会认为有事件没处理,一直通知去处理,从而导致报错的可能,把上面的remove注释掉后运行代码,就会触发一个accept后接着触发一次read就会报错

这是因为在处理来accept后,没有把该事件取消掉,导致selectedKeys以为该事件没有处理,在read到来后,selectedKeys里面就会有两个事件

随后的处理中selectedKeys就继续处理accept,但其实accept早已经被处理了,也没有新的accept,于是就导致了空指针异常了。所以我们在处理完事件后一定要记得清理事件,

可以通过

//使用迭代器来移除
iter.remove();
//通过cancel方法取消key
key.cancel();

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5715449.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存