一、服务端与客户端设计
1.服务启动类2.服务端管道处理器3.客户端启动类4.客户端管道处理器5.启动验证
·服务端·客户端 二、示例分析
1.ServerBootstrap的Group方法2.ServerBootstrap的Channel方法
1.反射实现类ReflectiveChannelFactory2.非空校验并赋值channelFactory 3.ServerBootstrap的Option、ChildOption方法
1.ChannelConfig实现类DefaultServerSocketChannelConfig 4.ServerBootstrap的ChildHandler方法,实现类
一、服务端与客户端设计 1.服务启动类package com.example.netty.server; import com.example.netty.server.handler.NettyServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyServer { public static void main(String[] args) throws InterruptedException { //创建BossGroup 和 WorkerGroup EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务端的启动对象 配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程进行设置 bootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,128) .childOption(ChannelOption.SO_KEEPALIVE,true) .childHandler(new ChannelInitializer2.服务端管道处理器() { //给 PipLine 设置处理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("Server is Ready"); //绑定一个端口并且同步 生成了一个 ChannelFuture 对象 启动服务 ChannelFuture cf = bootstrap.bind(6668).sync(); //对关闭通道进行侦听 cf.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.example.netty.server.handler; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import java.nio.ByteBuffer; import java.nio.charset.Charset; public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { System.out.println("Server Read Thread :" + Thread.currentThread().getName()); System.out.println("Server ctx=" + ctx); //将message转成一个ByteBuf ByteBuf buf = (ByteBuf) message; System.out.println("Client Send Message is:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("Client Ip Address is:" + ctx.channel().remoteAddress()); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //将数据写入到缓冲区并刷新 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Client",CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }3.客户端启动类
package com.example.netty.client; import com.example.netty.client.handler.NettyClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class NettyClient { public static void main(String[] args) throws InterruptedException { //客户端需要一个事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端启动对象 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer4.客户端管道处理器() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler()); } }); System.out.println("Client OK ..."); //启动客户端 连接服务端 ChannelFuture cf = bootstrap.connect("127.0.0.1",6668).sync(); //关闭通道增加监听 cf.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }
package com.example.netty.client.handler; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Client :" + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Server ", CharsetUtil.UTF_8)); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("Server SendBack Message:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("Server Ip Address :" + ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }5.启动验证 ·服务端
Connected to the target VM, address: '127.0.0.1:60934', transport: 'socket' Java HotSpot(TM) 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended Server is Ready Server Read Thread :nioEventLoopGroup-3-1 Server ctx=ChannelHandlerContext(NettyServerHandler#0, [id: 0x7748b291, L:/127.0.0.1:6668 - R:/127.0.0.1:60940]) Client Send Message is:Hello Server Client Ip Address is:/127.0.0.1:60940·客户端
Connected to the target VM, address: '127.0.0.1:60937', transport: 'socket' Java HotSpot(TM) 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended Client OK ... Client :ChannelHandlerContext(NettyClientHandler#0, [id: 0xb9a0d3d1, L:/127.0.0.1:60940 - R:/127.0.0.1:6668]) Server SendBack Message:Hello Client Server Ip Address :/127.0.0.1:6668二、示例分析 1.ServerBootstrap的Group方法
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup"); return this; }2.ServerBootstrap的Channel方法
public B channel(Class extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory1.反射实现类ReflectiveChannelFactory( ObjectUtil.checkNotNull(channelClass, "channelClass") )); }
package io.netty.channel; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.StringUtil; import java.lang.reflect.Constructor; public class ReflectiveChannelFactory2.非空校验并赋值channelFactoryimplements ChannelFactory { private final Constructor extends T> constructor; public ReflectiveChannelFactory(Class extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz"); try { this.constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", e); } } @Override public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } } @Override public String toString() { return StringUtil.simpleClassName(ReflectiveChannelFactory.class) + '(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)"; } }
public B channelFactory(io.netty.channel.ChannelFactory extends C> channelFactory) { return channelFactory((ChannelFactory3.ServerBootstrap的Option、ChildOption方法) channelFactory); } @Deprecated public B channelFactory(ChannelFactory extends C> channelFactory) { ObjectUtil.checkNotNull(channelFactory, "channelFactory"); if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); }
多个客户端如果请求同时到达,则需要一个队列来进行放置,以便服务端依次处理 Option ---> bossGroup ChildOption ---> workGroup1.ChannelConfig实现类DefaultServerSocketChannelConfig
@Override public4.ServerBootstrap的ChildHandler方法,实现类boolean setOption(ChannelOption option, T value) { validate(option, value); if (option == SO_RCVBUF) { setReceiveBufferSize((Integer) value); } else if (option == SO_REUSEADDR) { setReuseAddress((Boolean) value); } else if (option == SO_BACKLOG) { setBacklog((Integer) value); } else { return super.setOption(option, value); } return true; }
DefaultChannelPipeline implements ChannelPipeline
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)