所谓心跳,即在 TCP 长连接中,客户端和服务器之间定期发送的一种特殊的数据包,通知对方自己还在线,以确保 TCP 连接的有效性。
心跳检测机制:客户端每隔一段时间发送PING消息给服务端,服务端接受到后回复PONG消息。客户端如果在一定时间内没有收到PONG响应,则认为连接断开,服务端如果在一定时间内没有收到来自客户端的PING请求,则认为连接已经断开。通过这种来回的PING-PONG消息机制侦测连接的活跃性。
在 Netty 中,本身也提供了 IdleStateHandler 用于检测连接闲置,该Handler可以检测连接未发生读写事件而触发相应事件。
看下它的构造器:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS); } 复制代码
解释下三个参数的含义:
- eaderIdleTimeSeconds 读超时:当在指定的时间间隔内没有从 Channel 读取到数据时,会触发一个 READER_IDLE 的 IdleStateEvent 事件。
- riterIdleTimeSeconds 写超时:即当在指定的时间间隔内没有数据写入到 Channel 时,会触发一个 WRITER_IDLE 的 IdleStateEvent 事件。
- llIdleTimeSeconds 读/写超时:即当在指定的时间间隔内没有读或写 *** 作时,会触发一个 ALL_IDLE 的 IdleStateEvent 事件。
注:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { this(false, readerIdleTime, writerIdleTime, allIdleTime, unit); } 复制代码
要实现 Netty 服务端心跳检测机制需要在服务器端的 ChannelInitializer 中加入如下的代码:
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS)); 复制代码
初步地看下 IdleStateHandler 源码,先看下 IdleStateHandler 中的 channelRead 方法:
红框代码其实表示该方法只是进行了透传,不做任何业务逻辑处理,让 channelPipe 中的下一个 handler 处理channelRead 方法
我们再看看 channelActive 方法:
这里有个 initialize 的方法,这是 IdleStateHandler 的精髓,接着探究:
这边会触发一个 Task,ReaderIdleTimeoutTask,这个task里的 run 方法源码是这样的:
第一个红框代码是用当前时间减去最后一次 channelRead 方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead 已经是6s之前的事情了,你设置的是5s,那么 nextDelay 则为-1,说明超时了,那么第二个红框代码则会触发下一个 handler 的 userEventTriggered 方法:
如果没有超时则不触发 userEventTriggered 方法。
服务端代码public class HeartBeatServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline channelPipeline = socketChannel.pipeline(); channelPipeline.addLast("decoder",new StringDecoder()); channelPipeline.addLast("encoder",new StringEncoder()); channelPipeline.addLast(new IdleStateHandler(3,0,0, TimeUnit.SECONDS)); channelPipeline.addLast(new HeartBeatServerHandler()); } }); System.out.println("netty server start。。"); ChannelFuture channelFuture = serverBootstrap.bind(8888).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } } 复制代码
继承 SimpleChannelInboundHandler 重写 channelRead 方法和 userEventTriggered 方法
public class HeartBeatServerHandler extends SimpleChannelInboundHandler客户端代码{ int readIdleTimes = 0; @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { System.out.println(" ====== > [server] message received : " + s); if ("Heartbeat Packet".equals(s)){ ctx.channel().writeAndFlush("ok"); }else{ System.out.println(" 其他信息处理..."); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; String eventType = null; switch (event.state()){ case READER_IDLE: eventType = "读空闲"; readIdleTimes++; // 读空闲的计数加1 break; case WRITER_IDLE: eventType = "写空闲"; // 不处理 break; case ALL_IDLE: eventType = "读写空闲"; // 不处理 break; } System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType); if (readIdleTimes > 3){ System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源"); ctx.channel().writeAndFlush("idle close"); ctx.channel().close(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.err.println("=== " + ctx.channel().remoteAddress() + " is active ==="); } } 复制代码
public class HeartBeatClient { public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline channelPipeline = socketChannel.pipeline(); channelPipeline.addLast("decoder",new StringDecoder()); channelPipeline.addLast("encoder",new StringEncoder()); channelPipeline.addLast(new HeartBeatClientHandler()); } }); System.out.println("netty client start。。"); Channel channel = bootstrap.connect("127.0.0.1", 8888).sync().channel(); String text = "Heartbeat Packet"; Random random = new Random(); while(channel.isActive()){ int num = random.nextInt(8); Thread.sleep(num*1000); channel.writeAndFlush(text); } }catch (Exception e){ e.printStackTrace(); }finally { group.shutdownGracefully(); } } static class HeartBeatClientHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(" client received :" + msg); if (msg != null && msg.equals("idle close")) { System.out.println(" 服务端关闭连接,客户端也关闭"); ctx.channel().closeFuture(); } } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)