入门netty之前nio的那些事(不容错过的知识点加代码实战讲解)

入门netty之前nio的那些事(不容错过的知识点加代码实战讲解),第1张

入门netty之前nio的那些事(不容错过的知识点加代码实战讲解) 核心知识点总结

    NIO基于Channel(通道)和Buffer(缓冲区)进行 *** 作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道

    selector、channel、buffer关系图


    关系说明:
    1.每个channel都会对应一个buffer
    2.selector对应一个线程,一个selector可以监听多个channel(连接)
    3.程序根据selector监听切换到哪个channel是由事件决定的
    4.selector会根据监听到的不同的事件在各个通道上切换
    5.buffer就是一个内存块,底层有一个数组
    6.NIO中的buffer是可以读也可以写,但需要flip等方法进行切换
    7.channel是双向的,可以同时进行读写,可以实现异步读写数据,可以从缓冲读数据,也可以写数据到缓冲

    ServerSocketChannel和SocketChannel用于TCP的数据读写。

    ServerSocketChannel和SocketChannel的作用:简单理解ServerSocketChannel是当客户端向服务器端发起连接请求时,服务器端通过ServerSocketChannel分配给此客户端一个SocketChannel,然后客户端与服务器端的通讯就依赖这个SocketChannel。无论是客户端还是服务器端,channel都需要通过buffer进行连接。


    每一个客户端都有一个SocketChannel与服务器端进行通讯,channel.read(buffer),将channel里的内容写入buffer(从通道里面读取内容),channel.write(buffer),将buffer的内容写入channel(将内容写入通道)。

    selector: Selector能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。

    OP_READ 事件不仅仅只有可读时才触发,以下情况都会触发:channel 中数据读取完毕、连接管道的另一端被关闭、有一个错误的 pending、对方发送消息过来。

案例一:

简单的服务器端、客户端连接(控制台显示)
服务器端代码:

package com.haust.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class server {
    public static void main(String[] args) throws IOException {
        //创建ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //得到一个selector对象
        Selector selector = Selector.open();
        //绑定一个本地6666端口,在服务器进行监听
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));
        //将channel设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        //把serverSocketChannel注册到selector,监听的事件为OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true){
            //这里我们等待1秒,如果没有事件发生,返回
            if (selector.select(1000) == 0){//在1秒内没有事件发生
                System.out.println("服务器等待1秒,无连接");
                continue;
            }
            //如果返回的>0,就获取到相关的selectionKey集合
            //selector.keys()表示注册到selector上channel的数量
            //1.如果返回的>0,表示已经获取到关注的事件
            //2.selector.selectedKeys()返回监听到的事件集合
            //通过selectionKeys反向获取通道
            Set selectionKeys = selector.selectedKeys();
            //遍历Set,在这里我们使用迭代器遍历
            Iterator keyIterator = selectionKeys.iterator();

            while (keyIterator.hasNext()){
                //获取到Selectionkey
                SelectionKey key = keyIterator.next();
                //根据key对应的通道发生的事件做相应的处理
                if (key.isAcceptable()){//如果是accept事件表示有新的客户端连接
                    //为该客户端生成一个socketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    //设置成非阻塞
                    socketChannel.configureBlocking(false);
                    //将socketChannel注册到selector,监听事件为OP_READ,同时给socketChannel关联一个buffer
                    socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }
                if (key.isReadable()){
                    //通过key反向获取到channel
                    SocketChannel channel = (SocketChannel)key.channel();
                    //获取到该channel关联的buffer
                    ByteBuffer buffer = (ByteBuffer)key.attachment();
                    //读取数据
                    channel.read(buffer);
                    System.out.println("from 客户端"+new String(buffer.array()));
                }
                //手动从集合中移动当前的selectionkey,防止重复 *** 作
                keyIterator.remove();
            }


        }
    }
}

客户端代码

package com.haust.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class client {
    public static void main(String[] args) throws IOException {
        //得到一个网络通道
        SocketChannel socketChannel = SocketChannel.open();
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //提供服务器端的IP和端口
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
        //连接服务器
        if (!socketChannel.connect(inetSocketAddress)){
            while (!socketChannel.finishConnect()){
                System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作..");
            }
        }
        //如果连接成功,就发送数据
        String str = "hello nio";

        ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
        //发送数据,将buffer数据写入channel
        socketChannel.write(buffer);
        System.in.read();


    }
}

效果:

案例二:群聊系统

简单的服务器端和客户端的群聊系统(控制台板)
服务器端代码:

package com.haust.groupchat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

public class GroupChatServer {
    //定义属性
    private Selector selector;
    private ServerSocketChannel listenChannel;
    private static final int PORT =6667;
    //构造器 初始化工作
    public GroupChatServer(){
        try {
            //得到选择器
            selector = Selector.open();
            //ServerSocketChannel
            listenChannel = ServerSocketChannel.open();
            //绑定端口
            listenChannel.socket().bind(new InetSocketAddress(PORT));
            //设置非阻塞模式
            listenChannel.configureBlocking(false);
            //将listenChannel注册到selector
            listenChannel.register(selector, SelectionKey.OP_ACCEPT);
        }catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void listen(){
        try {
            while (true){
                int count = selector.select(2000);//select方法本身是阻塞的(不带参数),2000代表2秒后返回结果,变为非阻塞的
                if (count>0){//说明有事件发生
                    //遍历得到selectionKey集合
                    Iterator iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()){
                        //取出selectionkey
                        SelectionKey key = iterator.next();
                        //监听到accept
                        if (key.isAcceptable()){
                            SocketChannel sc = listenChannel.accept();
                            //将该sc注册到selector
                            sc.configureBlocking(false);
                            sc.register(selector,SelectionKey.OP_READ);
                            //提示
                            System.out.println(sc.getRemoteAddress()+"上线");
                        }
                        if (key.isReadable()){
                            readData(key);
                        }
                        //防止重复 *** 作
                        iterator.remove();
                    }
                }else {
//                    System.out.println("等待...");
                }
            }
        }catch (IOException e) {
            e.printStackTrace();
        }finally {
            //发生异常时处理
        }
    }
    //读取客户端消息
    public void readData(SelectionKey key){
        SocketChannel channel = null;
        try {
            //得到channel
            channel = (SocketChannel) key.channel();
            //创建buffer
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = channel.read(buffer);
            //根据count的值进行相应的处理
            if (count>0){
                //把缓存区的数据转成字符串
                String msg = new String(buffer.array());
                //输出该消息
                System.out.println("from 客户端"+msg);

                //向其他客户端转发消息(要去掉自己)
                sendInfoToOtherClients(msg,channel);
            }

        }catch (IOException e){
            try {
                System.out.println(channel.getRemoteAddress()+"离线了");
                //取消注册
                key.cancel();
                //关闭通道
                channel.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
    //向其他客户端转发消息的方法
    private void sendInfoToOtherClients(String msg,SocketChannel self) throws IOException {
        System.out.println("服务器转发消息中");
        //遍历所有注册到selector上的SocketChannel,并排除self
        for (SelectionKey key : selector.keys()) {
            //通过key 取出对应的SocketChannel
            Channel targetChannel = key.channel();
            //排除自己
            if (targetChannel instanceof SocketChannel && targetChannel != self){
                //转型
                SocketChannel dest = (SocketChannel)targetChannel;
                //将msg 存储到buffer
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                //将buffer的数据写入通道
                dest.write(buffer);
            }
        }
    }


    public static void main(String[] args) {
        //创建服务器对象
        GroupChatServer groupChatServer = new GroupChatServer();
        groupChatServer.listen();
    }

}

客户端代码

package com.haust.groupchat;



import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;

public class GroupChatClient {
    //定义相关属性
    private final String HOST = "127.0.0.1";//服务器的IP
    private final int PORT = 6667;//服务器端口
    private Selector selector;
    private SocketChannel socketChannel;
    private String username;

    //构造器,完成初始化工作
    public GroupChatClient() throws IOException {
        selector = Selector.open();
        //连接服务器
        socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1",PORT ));
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //将channel注册到selector
        socketChannel.register(selector, SelectionKey.OP_READ);
        //得到username
        username = socketChannel.getLocalAddress().toString().substring(1);

        System.out.println(username+"is OK");
    }
    //向服务器发送消息
    public void sendInfo(String info){
        info = username + " 说:" +info;
        try {
            socketChannel.write(ByteBuffer.wrap(info.getBytes()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void readInfo(){
        try {
            int readChannels = selector.select(2000);
            if (readChannels > 0){//表明有可以用的通道
                Iterator iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    if (key.isReadable()){
                        //得到相关的通道
                        SocketChannel sc = (SocketChannel)key.channel();
                        //得到一个buffer
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        //读取
                        sc.read(buffer);
                        //把读到的缓冲区的数据转换成字符串
                        String msg = new String(buffer.array());
                        System.out.println(msg.trim());
                        //删除当前的selectionkey防止重复 *** 作
                        iterator.remove();

                    } else{
//                        System.out.println("暂时没有通道");
                    }

                }
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException {
        //启动客户端
        final GroupChatClient chatClient = new GroupChatClient();
        //启动一个线程,每隔三秒,读取从服务器发送数据
        new Thread(){
            public void run(){
                while (true){
                    chatClient.readInfo();
                    try{
                        Thread.currentThread().sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        //发送数据给服务器端端
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()){
            String s = scanner.nextLine();
            chatClient.sendInfo(s);
        }

    }
}

特别注意:注册到selector的channel是非阻塞的!在监听事件处理结束后需要将此监听事件对应的key移除,防止重复执行。(如果不移除下次还会在集合中)
同一个程序,多次运行的话(应用不同的线程)需要在idea中设置


效果:




将三个客户端断开

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5716626.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存