package com.netty.threadPoolContext; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; public final class EchoClient { static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializerEchoClientHandler() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } p.addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect(HOST, PORT).sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
package com.netty.threadPoolContext; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import java.nio.charset.Charset; public class EchoClientHandler extends ChannelInboundHandlerAdapter { private final ByteBuf firstMessage; public EchoClientHandler() { firstMessage = Unpooled.buffer(EchoClient.SIZE); for (int i = 0; i < firstMessage.capacity(); i++) { firstMessage.writeByte((byte) i); } } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是客户端", CharsetUtil.UTF_8)); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg; byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String s = new String(bytes, Charset.forName("utf-8")); System.out.println("客户端接收到:" + s); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }EchoServer
package com.netty.threadPoolContext; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); //创建业务线程池,创建2个子线程 static final EventExecutorGroup group = new DefaultEventExecutorGroup(2); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializerEchoServerHandler() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } p.addLast(group, serverHandler); } }); ChannelFuture f = b.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.netty.threadPoolContext; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; import java.nio.charset.Charset; import java.text.SimpleDateFormat; import java.util.Date; @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { SimpleDateFormat date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //group就是线程池,此处建立了16个线程 static final EventExecutorGroup group = new DefaultEventExecutorGroup(16); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { System.out.println(date.format(new Date()) +" EchoServerHandler 线程:" + Thread.currentThread().getName()); //普通方式,接收客户端消息 ByteBuf byteBuf = (ByteBuf) msg; byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String s = new String(bytes, Charset.forName("utf-8")); Thread.sleep(5 * 1000); System.out.println(date.format(new Date()) +"普通方式 线程池:" + Thread.currentThread().getName()); ctx.writeAndFlush(Unpooled.copiedBuffer(date.format(new Date()) +" 你好,我是服务端", CharsetUtil.UTF_8)); System.out.println("继续"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)