一:介绍:
IO:阻塞IO网络模型:服务器启动后会进入阻塞状态,等待client连接,每一个client端连接上服务器后,服务器会为每一个客户端起一个线程来处理客户端的需求。服务器的accept()方法、服务器新起的thread中,Socket的read()和write()方法都是阻塞的。
NIO-Single Thread模型:NIO单线程模型:采用selector管理的轮询查询模式,selector每隔一段时间都去看一下client端有没有产生需要处理的消息(客户端连接请求、客户端发送数据请求、客户端下载数据请求),也就是说selector会同时管理client端的连接和通道内client端的读、写请求。NIO-reactor模式:在NIO单线程模式中,只有一个线程负责处理client端的连接和通道内client端的读、写请求。在reactor模式中,将server端的单线程换成线程池(单线程管家+线程池工人模式)。
- NIO在单线程下可以同时为多个客户端服务
- NIO技术直接将 read buffer 拷贝到 socket buffer. java 的 FileChannel.transferTo() 方法就是这样的实现, 这个实现是依赖于 *** 作系统底层的sendFile()实现的.
二:实例:
1.新建两个java项目 ,一个作为client端,一个作为serve端
serve端代码:
package com.company; import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; public class Main { public static void main(String[] args) throws IOException { //创建客户端的Socket对象(SevereSocket) //ServerSocket (int port)创建绑定到指定端口的服务器套接字 ServerSocket ss=new ServerSocket(50000); //Socket accept()侦听要连接到此套接字并接受他 Socket s=ss.accept(); //获取输入流,读数据,并把数据显示在控制台 InputStream is=s.getInputStream(); byte[] bys=new byte[1024]; int len=is.read(bys); String data=new String(bys,0,len); System.out.println("client:"+data); //释放资源 s.close(); ss.close(); } }
client端代码展示:
package com.company; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; public class Main{ public static void main(String[] args) throws IOException{ //创建客户端的Socket对象 //Socket (InetAddress adress,int port)创建流套接字并将其连接到指定IP地址的指定端口号 // Socket s=new Socket(InetAddress.getByName("192.168.224.1"), 10000); //Socket (String host,int port)创建流套接字并将其连接到指定主机的指定端口号 Socket s=new Socket("127.0.0.1", 50000); //获取输出流,写数据 //OutputStream getOutputStream();返回此套接字的输出流 OutputStream os=s.getOutputStream(); os.write("hello wxc 6319xxxx119,tcp".getBytes()); //释放资源 s.close(); } }
运行结果:
2. NIO端同样新建JAVA项目:
Serve端代码:
import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class server { //网络通信IO *** 作,TCP协议,针对面向流的监听套接字的可选择通道(一般用于服务端) private ServerSocketChannel serverSocketChannel; private Selector selector; public void start(Integer port) throws Exception { serverSocketChannel = ServerSocketChannel.open(); selector = Selector.open(); //绑定监听端口 serverSocketChannel.socket().bind(new InetSocketAddress(port)); //设置为非阻塞模式 serverSocketChannel.configureBlocking(false); //注册到Selector上 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); startListener(); } private void startListener() throws Exception { while (true) { // 如果客户端有请求select的方法返回值将不为零 if (selector.select(1000) == 0) { System.out.println("current not exists task"); continue; } // 如果有事件集合中就存在对应通道的key SetselectionKeys = selector.selectedKeys(); Iterator iterator = selectionKeys.iterator(); // 遍历所有的key找到其中事件类型为Accept的key while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) handleConnection(); if (key.isReadable()) handleMsg(key); iterator.remove(); } } } private void handleConnection() throws Exception { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); } private void handleMsg(SelectionKey key) throws Exception { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer attachment = (ByteBuffer) key.attachment(); channel.read(attachment); System.out.println("current msg: " + new String(attachment.array())); } public static void main(String[] args) throws Exception { server myServer = new server(); myServer.start(8888); } }
client端代码;
package com.company; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; public class Main { public static void main(String[] args) throws Exception { SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); // 连接服务器 if (!socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888))) { while (!socketChannel.finishConnect()) { System.out.println("connecting..."); } } //发送数据 String str = "hello wxc 6319xxxx119"; ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes()); socketChannel.write(byteBuffer); System.in.read(); } }
3.netty端:
同理新建项目,导入相应的包:
serve端:
package com.company; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.net.InetSocketAddress; public class Main { private int port; public static void main(String[] args){ new Main(18080).start(); } public Main(int port) { this.port = port; } public void start() { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup work = new NioEventLoopGroup(); try { ServerBootstrap server = new ServerBootstrap() .group(boss, work).channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast("decoder", new StringDecoder()) .addLast("encoder", new StringEncoder()) .addLast(new HelloWorldServerHandler()); } }); //绑定端口 ChannelFuture future = server.bind().sync(); System.out.println("server started and listen " + port); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); }finally { boss.shutdownGracefully(); work.shutdownGracefully(); } } public static class HelloWorldServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("HelloWorldServerHandler active"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server channelRead.."); System.out.println(ctx.channel().remoteAddress()+"->Server :"+ msg.toString()); ctx.write("server write"+msg); ctx.flush(); } } }
client端:
package com.company; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class Main { private static final String HOST = "localhost"; private static final int PORT= 18080; public static void main(String[] args){ new Main().start(HOST, PORT); } public void start(String host, int port) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap client = new Bootstrap().group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast("decoder", new StringDecoder()) .addLast("encoder", new StringEncoder()) .addLast(new HelloWorldClientHandler()); } }); ChannelFuture future = client.connect(host, port).sync(); future.channel().writeAndFlush("Hello Netty Server ,I am a netty client"); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } public static class HelloWorldClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("HelloWorldClientHandler Active"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("HelloWorldClientHandler read Message:"+msg); } } }
运行结果:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)