Java IO(NIO、AIO)代码演示

Java IO(NIO、AIO)代码演示,第1张

5、Java IO 5.1、NIO

服务端代码

服务端接受客户端发送的消息输出,并给客户端发送一个消息

		//创建多路复用选择器Selector
        Selector selector=Selector.open();
        //创建一个通道对象Channel,监听9001端口
        ServerSocketChannel channel = ServerSocketChannel.open().bind(new InetSocketAddress(9001));
        //设置channel为非阻塞
        channel.configureBlocking(false);
        //
        /**
         * 1.SelectionKey.OP_CONNECT:连接事件
         * 2.SelectionKey.OP_ACCEPT:接收事件
         * 3.SelectionKey.OP_READ:读事件
         * 4.SelectionKey.OP_WRITE:写事件
         *
         * 将channel绑定到selector上并注册OP_ACCEPT事件
         */
        channel.register(selector,SelectionKey.OP_ACCEPT);

        while (true){
            //只有当OP_ACCEPT事件到达时,selector.select()会返回(一个key),如果该事件没到达会一直阻塞
            selector.select();
            //当有事件到达了,select()不在阻塞,然后selector.selectedKeys()会取到已经到达事件的SelectionKey集合
            Set keys = selector.selectedKeys();
            Iterator iterator = keys.iterator();
            while (iterator.hasNext()){
                SelectionKey key = (SelectionKey) iterator.next();
                //删除这个SelectionKey,防止下次select方法返回已处理过的通道
                iterator.remove();
                //根据SelectionKey状态判断
                if (key.isConnectable()){
                    //连接成功
                } else if (key.isAcceptable()){
                    /**
                     * 接受客户端请求
                     *
                     * 因为我们只注册了OP_ACCEPT事件,所以有客户端链接上,只会走到这
                     * 我们要做的就是去读取客户端的数据,所以我们需要根据SelectionKey获取到serverChannel
                     * 根据serverChannel获取到客户端Channel,然后为其再注册一个OP_READ事件
                     */
                    // 1,获取到ServerSocketChannel
                    ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                    // 2,因为已经确定有事件到达,所以accept()方法不会阻塞
                    SocketChannel clientChannel = serverChannel.accept();
                    // 3,设置channel为非阻塞
                    clientChannel.configureBlocking(false);
                    // 4,注册OP_READ事件
                    clientChannel.register(key.selector(),SelectionKey.OP_READ);
                } else if (key.isReadable()){
                    // 通道可以读数据
                    /**
                     * 因为客户端连上服务器之后,注册了一个OP_READ事件发送了一些数据
                     * 所以首先还是需要先获取到clientChannel
                     * 然后通过Buffer读取clientChannel的数据
                     */
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
                    long bytesRead = clientChannel.read(byteBuffer);
                    while (bytesRead>0){
                        byteBuffer.flip();
                        System.out.println("client data :"+new String(byteBuffer.array()));
                        byteBuffer.clear();
                        bytesRead = clientChannel.read(byteBuffer);
                    }

                    /**
                     * 我们服务端收到信息之后,我们再给客户端发送一个数据
                     */
                    byteBuffer.clear();
                    byteBuffer.put("客户端你好,我是服务端,你看这NIO多难".getBytes("UTF-8"));
                    byteBuffer.flip();
                    clientChannel.write(byteBuffer);
                } else if (key.isWritable() && key.isValid()){
                    //通道可以写数据
                }

            }
        }

客户端代码

客户端连接上服务端后,先给服务端发送一个消息,并接受服务端发送的消息

Selector selector = Selector.open();
SocketChannel clientChannel = SocketChannel.open();
//将channel设置为非阻塞
clientChannel.configureBlocking(false);
//连接服务器
clientChannel.connect(new InetSocketAddress(9001));
//注册OP_CONNECT事件
clientChannel.register(selector, SelectionKey.OP_CONNECT);
while (true){
    //如果事件没到达就一直阻塞着
    selector.select();
    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    while (iterator.hasNext()){
        SelectionKey key = iterator.next();
        iterator.remove();
        if (key.isConnectable()){
            /**
             * 连接服务器端成功
             *
             * 首先获取到clientChannel,然后通过Buffer写入数据,然后为clientChannel注册OP_READ时间
             */
            clientChannel = (SocketChannel) key.channel();
            if (clientChannel.isConnectionPending()){
                clientChannel.finishConnect();
            }
            clientChannel.configureBlocking(false);
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            byteBuffer.clear();
            byteBuffer.put("服务端你好,我是客户端,你看这NIO难吗".getBytes("UTF-8"));
            byteBuffer.flip();
            clientChannel.write(byteBuffer);
            clientChannel.register(key.selector(),SelectionKey.OP_READ);
        } else if (key.isReadable()){
            //通道可以读数据
            clientChannel = (SocketChannel) key.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
            long bytesRead = clientChannel.read(byteBuffer);
            while (bytesRead>0){
                byteBuffer.flip();
                System.out.println("server data :"+new String(byteBuffer.array()));
                byteBuffer.clear();
                bytesRead = clientChannel.read(byteBuffer);
            }
        } else if (key.isWritable() && key.isValid()){
            //通道可以写数据
        }
    }
}

5.2、AIO

服务端

服务端向客户端发送消息,并接受客户端发送的消息

AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress("127.0.0.1", 9001));
//异步接受请求
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
    //成功时
    @Override
    public void completed(AsynchronousSocketChannel result, Void attachment) {
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            buffer.put("我是服务端,客户端你好".getBytes());
            buffer.flip();
            result.write(buffer, null, new CompletionHandler<Integer, Void>(){
                @Override
                public void completed(Integer result, Void attachment) {
                    System.out.println("服务端发送消息成功");
                }

                @Override
                public void failed(Throwable exc, Void attachment) {
                    System.out.println("发送失败");
                }
            });

            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
            result.read(readBuffer, null, new CompletionHandler<Integer, Void>() {
                //成功时调用
                @Override
                public void completed(Integer result, Void attachment) {
                    System.out.println(new String(readBuffer.array()));
                }
                //失败时调用
                @Override
                public void failed(Throwable exc, Void attachment) {
                    System.out.println("读取失败");
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //失败时
    @Override
    public void failed(Throwable exc, Void attachment) {
        exc.printStackTrace();
    }
});
//防止线程执行完
TimeUnit.SECONDS.sleep(1000L);

客户端

客户端向服务端发送消息,并接受服务端发送的消息

AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
Future<Void> future = client.connect(new InetSocketAddress("127.0.0.1", 9001));
//阻塞,获取连接
future.get();

ByteBuffer buffer = ByteBuffer.allocate(1024);
//读数据
client.read(buffer, null, new CompletionHandler<Integer, Void>() {
    //成功时调用
    @Override
    public void completed(Integer result, Void attachment) {
        System.out.println(new String(buffer.array()));
    }
    //失败时调用
    @Override
    public void failed(Throwable exc, Void attachment) {
        System.out.println("客户端接收消息失败");
    }
});

ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put("我是客户端,服务端你好".getBytes());
writeBuffer.flip();
//阻塞方法
Future<Integer> write = client.write(writeBuffer);
Integer r = write.get();
if(r>0){
    System.out.println("客户端消息发送成功");
}
//休眠线程
TimeUnit.SECONDS.sleep(1000L);

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/919414.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存