NIO基于Channel(通道)和Buffer(缓冲区)进行 *** 作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道
selector、channel、buffer关系图
关系说明:
1.每个channel都会对应一个buffer
2.selector对应一个线程,一个selector可以监听多个channel(连接)
3.程序根据selector监听切换到哪个channel是由事件决定的
4.selector会根据监听到的不同的事件在各个通道上切换
5.buffer就是一个内存块,底层有一个数组
6.NIO中的buffer是可以读也可以写,但需要flip等方法进行切换
7.channel是双向的,可以同时进行读写,可以实现异步读写数据,可以从缓冲读数据,也可以写数据到缓冲
ServerSocketChannel和SocketChannel用于TCP的数据读写。
ServerSocketChannel和SocketChannel的作用:简单理解ServerSocketChannel是当客户端向服务器端发起连接请求时,服务器端通过ServerSocketChannel分配给此客户端一个SocketChannel,然后客户端与服务器端的通讯就依赖这个SocketChannel。无论是客户端还是服务器端,channel都需要通过buffer进行连接。
每一个客户端都有一个SocketChannel与服务器端进行通讯,channel.read(buffer),将channel里的内容写入buffer(从通道里面读取内容),channel.write(buffer),将buffer的内容写入channel(将内容写入通道)。
selector: Selector能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
OP_READ 事件不仅仅只有可读时才触发,以下情况都会触发:channel 中数据读取完毕、连接管道的另一端被关闭、有一个错误的 pending、对方发送消息过来。
简单的服务器端、客户端连接(控制台显示)
服务器端代码:
package com.haust.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; public class server { public static void main(String[] args) throws IOException { //创建ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //得到一个selector对象 Selector selector = Selector.open(); //绑定一个本地6666端口,在服务器进行监听 serverSocketChannel.socket().bind(new InetSocketAddress(6666)); //将channel设置为非阻塞 serverSocketChannel.configureBlocking(false); //把serverSocketChannel注册到selector,监听的事件为OP_ACCEPT serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true){ //这里我们等待1秒,如果没有事件发生,返回 if (selector.select(1000) == 0){//在1秒内没有事件发生 System.out.println("服务器等待1秒,无连接"); continue; } //如果返回的>0,就获取到相关的selectionKey集合 //selector.keys()表示注册到selector上channel的数量 //1.如果返回的>0,表示已经获取到关注的事件 //2.selector.selectedKeys()返回监听到的事件集合 //通过selectionKeys反向获取通道 SetselectionKeys = selector.selectedKeys(); //遍历Set ,在这里我们使用迭代器遍历 Iterator keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()){ //获取到Selectionkey SelectionKey key = keyIterator.next(); //根据key对应的通道发生的事件做相应的处理 if (key.isAcceptable()){//如果是accept事件表示有新的客户端连接 //为该客户端生成一个socketChannel SocketChannel socketChannel = serverSocketChannel.accept(); //设置成非阻塞 socketChannel.configureBlocking(false); //将socketChannel注册到selector,监听事件为OP_READ,同时给socketChannel关联一个buffer socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024)); } if (key.isReadable()){ //通过key反向获取到channel SocketChannel channel = (SocketChannel)key.channel(); //获取到该channel关联的buffer ByteBuffer buffer = (ByteBuffer)key.attachment(); //读取数据 channel.read(buffer); System.out.println("from 客户端"+new String(buffer.array())); } //手动从集合中移动当前的selectionkey,防止重复 *** 作 keyIterator.remove(); } } } }
客户端代码
package com.haust.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; public class client { public static void main(String[] args) throws IOException { //得到一个网络通道 SocketChannel socketChannel = SocketChannel.open(); //设置非阻塞 socketChannel.configureBlocking(false); //提供服务器端的IP和端口 InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666); //连接服务器 if (!socketChannel.connect(inetSocketAddress)){ while (!socketChannel.finishConnect()){ System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作.."); } } //如果连接成功,就发送数据 String str = "hello nio"; ByteBuffer buffer = ByteBuffer.wrap(str.getBytes()); //发送数据,将buffer数据写入channel socketChannel.write(buffer); System.in.read(); } }
效果:
简单的服务器端和客户端的群聊系统(控制台板)
服务器端代码:
package com.haust.groupchat; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; public class GroupChatServer { //定义属性 private Selector selector; private ServerSocketChannel listenChannel; private static final int PORT =6667; //构造器 初始化工作 public GroupChatServer(){ try { //得到选择器 selector = Selector.open(); //ServerSocketChannel listenChannel = ServerSocketChannel.open(); //绑定端口 listenChannel.socket().bind(new InetSocketAddress(PORT)); //设置非阻塞模式 listenChannel.configureBlocking(false); //将listenChannel注册到selector listenChannel.register(selector, SelectionKey.OP_ACCEPT); }catch (IOException e) { e.printStackTrace(); } } public void listen(){ try { while (true){ int count = selector.select(2000);//select方法本身是阻塞的(不带参数),2000代表2秒后返回结果,变为非阻塞的 if (count>0){//说明有事件发生 //遍历得到selectionKey集合 Iteratoriterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ //取出selectionkey SelectionKey key = iterator.next(); //监听到accept if (key.isAcceptable()){ SocketChannel sc = listenChannel.accept(); //将该sc注册到selector sc.configureBlocking(false); sc.register(selector,SelectionKey.OP_READ); //提示 System.out.println(sc.getRemoteAddress()+"上线"); } if (key.isReadable()){ readData(key); } //防止重复 *** 作 iterator.remove(); } }else { // System.out.println("等待..."); } } }catch (IOException e) { e.printStackTrace(); }finally { //发生异常时处理 } } //读取客户端消息 public void readData(SelectionKey key){ SocketChannel channel = null; try { //得到channel channel = (SocketChannel) key.channel(); //创建buffer ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer); //根据count的值进行相应的处理 if (count>0){ //把缓存区的数据转成字符串 String msg = new String(buffer.array()); //输出该消息 System.out.println("from 客户端"+msg); //向其他客户端转发消息(要去掉自己) sendInfoToOtherClients(msg,channel); } }catch (IOException e){ try { System.out.println(channel.getRemoteAddress()+"离线了"); //取消注册 key.cancel(); //关闭通道 channel.close(); } catch (IOException ex) { ex.printStackTrace(); } } } //向其他客户端转发消息的方法 private void sendInfoToOtherClients(String msg,SocketChannel self) throws IOException { System.out.println("服务器转发消息中"); //遍历所有注册到selector上的SocketChannel,并排除self for (SelectionKey key : selector.keys()) { //通过key 取出对应的SocketChannel Channel targetChannel = key.channel(); //排除自己 if (targetChannel instanceof SocketChannel && targetChannel != self){ //转型 SocketChannel dest = (SocketChannel)targetChannel; //将msg 存储到buffer ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); //将buffer的数据写入通道 dest.write(buffer); } } } public static void main(String[] args) { //创建服务器对象 GroupChatServer groupChatServer = new GroupChatServer(); groupChatServer.listen(); } }
客户端代码
package com.haust.groupchat; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; public class GroupChatClient { //定义相关属性 private final String HOST = "127.0.0.1";//服务器的IP private final int PORT = 6667;//服务器端口 private Selector selector; private SocketChannel socketChannel; private String username; //构造器,完成初始化工作 public GroupChatClient() throws IOException { selector = Selector.open(); //连接服务器 socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1",PORT )); //设置非阻塞 socketChannel.configureBlocking(false); //将channel注册到selector socketChannel.register(selector, SelectionKey.OP_READ); //得到username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username+"is OK"); } //向服务器发送消息 public void sendInfo(String info){ info = username + " 说:" +info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes())); } catch (IOException e) { e.printStackTrace(); } } public void readInfo(){ try { int readChannels = selector.select(2000); if (readChannels > 0){//表明有可以用的通道 Iteratoriterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); if (key.isReadable()){ //得到相关的通道 SocketChannel sc = (SocketChannel)key.channel(); //得到一个buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //读取 sc.read(buffer); //把读到的缓冲区的数据转换成字符串 String msg = new String(buffer.array()); System.out.println(msg.trim()); //删除当前的selectionkey防止重复 *** 作 iterator.remove(); } else{ // System.out.println("暂时没有通道"); } } } }catch (IOException e){ e.printStackTrace(); } } public static void main(String[] args) throws IOException { //启动客户端 final GroupChatClient chatClient = new GroupChatClient(); //启动一个线程,每隔三秒,读取从服务器发送数据 new Thread(){ public void run(){ while (true){ chatClient.readInfo(); try{ Thread.currentThread().sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); //发送数据给服务器端端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()){ String s = scanner.nextLine(); chatClient.sendInfo(s); } } }
特别注意:注册到selector的channel是非阻塞的!在监听事件处理结束后需要将此监听事件对应的key移除,防止重复执行。(如果不移除下次还会在集合中)
同一个程序,多次运行的话(应用不同的线程)需要在idea中设置
效果:
将三个客户端断开
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)