服务端代码
服务端接受客户端发送的消息输出,并给客户端发送一个消息
//创建多路复用选择器Selector
Selector selector=Selector.open();
//创建一个通道对象Channel,监听9001端口
ServerSocketChannel channel = ServerSocketChannel.open().bind(new InetSocketAddress(9001));
//设置channel为非阻塞
channel.configureBlocking(false);
//
/**
* 1.SelectionKey.OP_CONNECT:连接事件
* 2.SelectionKey.OP_ACCEPT:接收事件
* 3.SelectionKey.OP_READ:读事件
* 4.SelectionKey.OP_WRITE:写事件
*
* 将channel绑定到selector上并注册OP_ACCEPT事件
*/
channel.register(selector,SelectionKey.OP_ACCEPT);
while (true){
//只有当OP_ACCEPT事件到达时,selector.select()会返回(一个key),如果该事件没到达会一直阻塞
selector.select();
//当有事件到达了,select()不在阻塞,然后selector.selectedKeys()会取到已经到达事件的SelectionKey集合
Set keys = selector.selectedKeys();
Iterator iterator = keys.iterator();
while (iterator.hasNext()){
SelectionKey key = (SelectionKey) iterator.next();
//删除这个SelectionKey,防止下次select方法返回已处理过的通道
iterator.remove();
//根据SelectionKey状态判断
if (key.isConnectable()){
//连接成功
} else if (key.isAcceptable()){
/**
* 接受客户端请求
*
* 因为我们只注册了OP_ACCEPT事件,所以有客户端链接上,只会走到这
* 我们要做的就是去读取客户端的数据,所以我们需要根据SelectionKey获取到serverChannel
* 根据serverChannel获取到客户端Channel,然后为其再注册一个OP_READ事件
*/
// 1,获取到ServerSocketChannel
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
// 2,因为已经确定有事件到达,所以accept()方法不会阻塞
SocketChannel clientChannel = serverChannel.accept();
// 3,设置channel为非阻塞
clientChannel.configureBlocking(false);
// 4,注册OP_READ事件
clientChannel.register(key.selector(),SelectionKey.OP_READ);
} else if (key.isReadable()){
// 通道可以读数据
/**
* 因为客户端连上服务器之后,注册了一个OP_READ事件发送了一些数据
* 所以首先还是需要先获取到clientChannel
* 然后通过Buffer读取clientChannel的数据
*/
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
long bytesRead = clientChannel.read(byteBuffer);
while (bytesRead>0){
byteBuffer.flip();
System.out.println("client data :"+new String(byteBuffer.array()));
byteBuffer.clear();
bytesRead = clientChannel.read(byteBuffer);
}
/**
* 我们服务端收到信息之后,我们再给客户端发送一个数据
*/
byteBuffer.clear();
byteBuffer.put("客户端你好,我是服务端,你看这NIO多难".getBytes("UTF-8"));
byteBuffer.flip();
clientChannel.write(byteBuffer);
} else if (key.isWritable() && key.isValid()){
//通道可以写数据
}
}
}
客户端代码
客户端连接上服务端后,先给服务端发送一个消息,并接受服务端发送的消息
Selector selector = Selector.open();
SocketChannel clientChannel = SocketChannel.open();
//将channel设置为非阻塞
clientChannel.configureBlocking(false);
//连接服务器
clientChannel.connect(new InetSocketAddress(9001));
//注册OP_CONNECT事件
clientChannel.register(selector, SelectionKey.OP_CONNECT);
while (true){
//如果事件没到达就一直阻塞着
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isConnectable()){
/**
* 连接服务器端成功
*
* 首先获取到clientChannel,然后通过Buffer写入数据,然后为clientChannel注册OP_READ时间
*/
clientChannel = (SocketChannel) key.channel();
if (clientChannel.isConnectionPending()){
clientChannel.finishConnect();
}
clientChannel.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.clear();
byteBuffer.put("服务端你好,我是客户端,你看这NIO难吗".getBytes("UTF-8"));
byteBuffer.flip();
clientChannel.write(byteBuffer);
clientChannel.register(key.selector(),SelectionKey.OP_READ);
} else if (key.isReadable()){
//通道可以读数据
clientChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
long bytesRead = clientChannel.read(byteBuffer);
while (bytesRead>0){
byteBuffer.flip();
System.out.println("server data :"+new String(byteBuffer.array()));
byteBuffer.clear();
bytesRead = clientChannel.read(byteBuffer);
}
} else if (key.isWritable() && key.isValid()){
//通道可以写数据
}
}
}
5.2、AIO
服务端
服务端向客户端发送消息,并接受客户端发送的消息
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress("127.0.0.1", 9001));
//异步接受请求
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
//成功时
@Override
public void completed(AsynchronousSocketChannel result, Void attachment) {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("我是服务端,客户端你好".getBytes());
buffer.flip();
result.write(buffer, null, new CompletionHandler<Integer, Void>(){
@Override
public void completed(Integer result, Void attachment) {
System.out.println("服务端发送消息成功");
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("发送失败");
}
});
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
result.read(readBuffer, null, new CompletionHandler<Integer, Void>() {
//成功时调用
@Override
public void completed(Integer result, Void attachment) {
System.out.println(new String(readBuffer.array()));
}
//失败时调用
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("读取失败");
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
//失败时
@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
}
});
//防止线程执行完
TimeUnit.SECONDS.sleep(1000L);
客户端
客户端向服务端发送消息,并接受服务端发送的消息
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
Future<Void> future = client.connect(new InetSocketAddress("127.0.0.1", 9001));
//阻塞,获取连接
future.get();
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读数据
client.read(buffer, null, new CompletionHandler<Integer, Void>() {
//成功时调用
@Override
public void completed(Integer result, Void attachment) {
System.out.println(new String(buffer.array()));
}
//失败时调用
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("客户端接收消息失败");
}
});
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put("我是客户端,服务端你好".getBytes());
writeBuffer.flip();
//阻塞方法
Future<Integer> write = client.write(writeBuffer);
Integer r = write.get();
if(r>0){
System.out.println("客户端消息发送成功");
}
//休眠线程
TimeUnit.SECONDS.sleep(1000L);
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)