目录
pom依赖
netty服务端代码
netty客户端
PosttingObject封装netty客户端连接信息
测试客户端发送消息到服务器端
1. 可以实现socket长连接,心跳机制每隔N秒客户端给服务器发送一条消息,代表客户端还存活。
2. 可以实现在随意代码位置按照用户id标识,发送消息给服务端。
pom依赖
netty服务端代码io.netty netty-all4.1.36.Final
package com.kc.monitor.core.utils.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class NettyServer { private static int port = 8080; public static void main(String[] args) { // 用于接受客户端连接的请求 (并没有处理请求) NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于处理客户端连接的读写 *** 作 NioEventLoopGroup workGroup = new NioEventLoopGroup(); // 用于创建我们的ServerBootstrap ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializernetty客户端() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // LinebasedframeDecoder解决粘包-解包问题,设置我们分割最大长度为1024 // 原理就是自动帮我们把带有n或rn的数据进行换行 //socketChannel.pipeline().addLast(new LinebasedframeDecoder(1024)); socketChannel.pipeline().addLast(new StringEncoder());// String编码器 socketChannel.pipeline().addLast(new StringDecoder());// String解码器 socketChannel.pipeline().addLast(new ServerHandler());// 管道类-接收发送消息 } }); // 绑定我们的端口号码 try { // 绑定端口号,同步等待成功 ChannelFuture future = serverBootstrap.bind(port).sync(); System.out.println("服务器启动成功:" + port); // 等待服务器监听端口 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 优雅的关闭连接 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } } class ServerHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println("8080-msg:" + msg); // 响应内容: channelHandlerContext.writeAndFlush("8080n"); // String类型加上n会自动粘包和拆包了 } }
package com.kc.monitor.core.utils.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LinebasedframeDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; public class NettyClient { public static ConcurrentHashMapPosttingObject封装netty客户端连接信息concurrentHashMap = new ConcurrentHashMap(); public ClientHandler clientHandler = new ClientHandler(); public void initNetty(String userId, String host, int port){ //创建nioEventLoopGroup NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) .handler(new ChannelInitializer () { @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("正在连接中..."); ch.pipeline().addLast(new LinebasedframeDecoder(1024)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(clientHandler); } }); try { // 发起连接 ChannelFuture sync = bootstrap.connect().sync(); System.out.println("用户:"+userId + "->服务端连接成功..."); // 异步等待关闭连接channel // sync.channel().closeFuture().sync(); // System.out.println("连接已关闭.."); } catch (InterruptedException e) { e.printStackTrace(); } finally { //group.shutdownGracefully(); //System.out.println("优雅关闭连接"); } PosttingObject posttingObject = new PosttingObject(); posttingObject.setNioEventLoopGroup(group); posttingObject.setNettyClient(this); concurrentHashMap.put(userId, posttingObject); } } class ClientHandler extends SimpleChannelInboundHandler { public ChannelHandlerContext channelHandlerContext; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { channelHandlerContext = ctx; //channelHandlerContext.writeAndFlush("11111"); //channelHandlerContext.channel().close(); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println("接收消息:" + msg); //channelHandlerContext.channel().close(); } }
package com.kc.monitor.core.utils.netty; import io.netty.channel.nio.NioEventLoopGroup; import lombok.Data; @Data public class PosttingObject { private NettyClient nettyClient; private NioEventLoopGroup nioEventLoopGroup; }测试客户端发送消息到服务器端
package com.kc.monitor.core.utils.netty; public class NettyClientSendMsg { private static final String host = "192.168.52.1"; private static final int port = 8080; public static void main(String[] args) { // 初始化Netty连接 String userId = "1"; new NettyClient().initNetty(userId, host, port); // 根据用户id发送消息到客户端 PosttingObject posttingObject = NettyClient.concurrentHashMap.get(userId); try{ // 发送消息 NettyClient nettyClient = posttingObject.getNettyClient(); nettyClient.clientHandler.channelHandlerContext.writeAndFlush("01n"); nettyClient.clientHandler.channelHandlerContext.writeAndFlush("02n"); nettyClient.clientHandler.channelHandlerContext.writeAndFlush("03n"); }finally { // 优雅关闭连接 posttingObject.getNioEventLoopGroup().shutdownGracefully(); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)