心跳机制广泛运用在我们的应用平台中。对于连接到应用服务的客户端,服务端有必要对长时间没有请求的客户端连接进行清理,以避免连接过多。这就需要服务端有空闲连接检测机制。
而针对客户端而言,如果长时间未请求数据,为避免被服务端清理连接,就需要间歇性的发送心跳请求。
在Netty中,针对以上需求,已经有现成的Handler可供使用,这就是本文要介绍的IdleStateHandler。
1.IdleStateHandler的使用 1.1 服务端检测长时间未请求的客户端我们在服务端可以使用IdleStateHandler来检测长时间未发送请求的客户端,对其进行清理 *** 作,简单示例如下:
// 还是使用HelloServer的示例,我们在ChannelInitializer中添加IdleStateHandler .childHandler(new ChannelInitializer() { protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 在这里添加IdleStateHandler,设置空闲检测时间为10秒 pipeline.addLast("idle", new IdleStateHandler(10, 10, 10)); // 针对空闲事件的处理(自定义),具体内容如下 pipeline.addLast("idledeal", new IdleEventHandler()); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new HelloServerHandler()); } }); // IdleEventHandler public class IdleEventHandler extends ChannelDuplexHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { evt = (IdleStateEvent) evt; // 若检测到长时间未读到请求事件,则清理客户端连接 if (evt.equals(IdleStateEvent.READER_IDLE_STATE_EVENT)) { System.out.println("idle..."); ctx.channel().close(); }else { // 其他事件 // TODO } } } }
在本例中,我们设置了IdleStateHandler的读空闲检测时间为10s,则客户端连接10s没有发送任何请求过来时,则发送一个IdleStateEvent.READER_IDLE_STATE_EVENT事件到下游,IdleEventHandler处理该事件,直接关闭客户端连接。
1.2 客户端发送心跳请求若客户端本身检测到长时间未发送请求,为避免被服务端清理,则可以主动发送一个心跳请求。简单示例如下
// 同样的使用HelloClient的代码,我们改造下ChannelInitializer .handler(new ChannelInitializer() { protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ... // 在这里添加IdleStateHandler检测 pipeline.addLast("idle", new IdleStateHandler(5, 5, 5)); // 空闲事件处理Handler(自定义),具体内容如下 pipeline.addLast("idledeal", new ClientIdleEventHandler()); pipeline.addLast("handler", new HelloClientHandler()); } }); // ClientIdleEventHandler public class ClientIdleEventHandler extends ChannelDuplexHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { evt = (IdleStateEvent) evt; // 若检测到长时间未发送请求事件,则主动发送心跳信息 if (evt.equals(IdleStateEvent.WRITER_IDLE_STATE_EVENT)) { ctx.writeAndFlush("ping"); }else { // 其他事件 // TODO } } } }
这样,当客户端发现已经5s没有发送过请求时,则主动发送一个ping心跳信息到服务端,避免被清理
2.IdleStateHandler的构造我们首先来看下IdleStateHandler的相关构造方法和基本属性
public class IdleStateHandler extends ChannelDuplexHandler { // Not create a new ChannelFutureListener per write operation to reduce GC pressure. // write监听器 private final ChannelFutureListener writeListener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { lastWriteTime = ticksInNanos(); firstWriterIdleEvent = firstAllIdleEvent = true; } }; private final boolean observeOutput; // 三种类型的空闲时间设置 private final long readerIdleTimeNanos; private final long writerIdleTimeNanos; private final long allIdleTimeNanos; // 读空闲检测定时任务 private ScheduledFuture> readerIdleTimeout; // 最近一次读事件 private long lastReadTime; // 是否第一次读idleEvent触发 private boolean firstReaderIdleEvent = true; // 以下与读设置类似 private ScheduledFuture> writerIdleTimeout; private long lastWriteTime; private boolean firstWriterIdleEvent = true; private ScheduledFuture> allIdleTimeout; private boolean firstAllIdleEvent = true; // IdleStateHandler的状态,避免多次初始化 private byte state; // 0 - none, 1 - initialized, 2 - destroyed private boolean reading; private long lastChangeCheckTimeStamp; private int lastMessageHashCode; private long lastPendingWriteBytes; private long lastFlushProgress; // 默认使用的构造器 public IdleStateHandler( int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { // 默认单位为秒 this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS); } public IdleStateHandler( long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { this(false, readerIdleTime, writerIdleTime, allIdleTime, unit); } public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { ObjectUtil.checkNotNull(unit, "unit"); this.observeOutput = observeOutput; // 以纳秒为单位重新设置超时时间 if (readerIdleTime <= 0) { readerIdleTimeNanos = 0; } else { readerIdleTimeNanos = Math.max(unit.tonanos(readerIdleTime), MIN_TIMEOUT_NANOS); } if (writerIdleTime <= 0) { writerIdleTimeNanos = 0; } else { writerIdleTimeNanos = Math.max(unit.tonanos(writerIdleTime), MIN_TIMEOUT_NANOS); } if (allIdleTime <= 0) { allIdleTimeNanos = 0; } else { allIdleTimeNanos = Math.max(unit.tonanos(allIdleTime), MIN_TIMEOUT_NANOS); } } }
IdleStateHandler继承了ChannelDuplexHandler,说明其可以处理inbound、outbound事件;
构造方法比较简单,我们比较常用的就是第一个构造方法,以秒为单位来设置读、写、all的空闲检测;
属性的话,我们通过具体方法来学习。
3.IdleStateHandler空闲检测在handlerAdded、channelActive等方法中,都有一个initialize()方法,这个方法用来初始化检测器,我们先来看下
3.1 initialize() 初始化IdleStateHandlerpublic class IdleStateHandler extends ChannelDuplexHandler { private void initialize(ChannelHandlerContext ctx) { // 已经初始化过则不再重复初始化 switch (state) { case 1: case 2: return; } // 设置状态为initialized state = 1; // TODO initOutputChanged(ctx); // 设置最新的readTime和writeTime为当前时间 lastReadTime = lastWriteTime = ticksInNanos(); if (readerIdleTimeNanos > 0) { // 创建一个读空闲检测定时任务,延后readerIdleTimeNanos执行,具体schedule方法见下面 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (writerIdleTimeNanos > 0) { // 创建一个写空闲检测定时任务,延后readerIdleTimeNanos执行 writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (allIdleTimeNanos > 0) { // 创建一个读、写空闲检测定时任务,延后readerIdleTimeNanos执行 allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } } // 创建一个定时任务 ScheduledFuture> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) { return ctx.executor().schedule(task, delay, unit); } }
初始化方法,主要用于初始化三个定时任务,那么这三个定时任务ReaderIdleTimeoutTask、WriterIdleTimeoutTask、AllIdleTimeoutTask具体是怎么用的呢?我们先来看下读空闲检测是如何做的。
3.2 读空闲检测public class IdleStateHandler extends ChannelDuplexHandler { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 当设置的readerIdleTimeNanos或allIdleTimeNanos大于0时,说明需要进行读空闲检测 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { // 设置reading 正在读数据的状态为true reading = true; // 设置两个状态为为true firstReaderIdleEvent = firstAllIdleEvent = true; } ctx.fireChannelRead(msg); } // 重点在这来,如果本次读已经结束,则需要重置时间和状态位 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 如果数据正在读状态 if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) { // 则重新设置lastReadTime为当前时间 lastReadTime = ticksInNanos(); // 将正在读标志设置为false reading = false; } ctx.fireChannelReadComplete(); } }
通过上述两个方法可以看出,当发生读事件时,设置reading=true,当本次读结束时,则设置reading=false,lastReadTime(最近一次读时间)为当前时间。
那么这个是如何被检测到读超时的呢?我们可以回到initialize()方法,其中有一个ReaderIdleTimeoutTask的定时任务,延迟readerIdleTimeNanos执行,一起来看下这个task的具体内容
3.2.1 ReaderIdleTimeoutTask 读空闲检测定时任务
private final class ReaderIdleTimeoutTask extends AbstractIdleTask { // 将ChannelHandlerContext传入当前task ReaderIdleTimeoutTask(ChannelHandlerContext ctx) { super(ctx); } @Override protected void run(ChannelHandlerContext ctx) { long nextDelay = readerIdleTimeNanos; // 如果当前没有发生读事件,则reading为false if (!reading) { nextDelay -= ticksInNanos() - lastReadTime; } // 读空闲超时,需要发送READER_IDLE事件 if (nextDelay <= 0) { // Reader is idle - set a new timeout and notify the callback. // 重启一个定时检测任务,延迟readerIdleTimeNanos执行 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstReaderIdleEvent; firstReaderIdleEvent = false; try { // 往下游发送一个READER_IDLE Event IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first); channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { // 注意这里虽然也是重启一个定时任务,但是延迟时间与上面有所不同,这里的具体延迟时间为readerIdleTimeNanos - (ticksInNanos() - lastReadTime) readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } } protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { ctx.fireUserEventTriggered(evt); } }
这里比较有意思的是nextDelay值的设置,当读事件正在进行时(reading=true),则直接进行下一次循环;
当读事件未执行,若ticksInNanos(当前时间) - lastReadTime(最后一次读完成时间) > readerIdleTimeNanos(读空闲检测时间),说明读空闲超时,往下游发送一个READER_IDLE Event
reading=false的情况有两种:没有发生过读、读已经结束;当读数据正在进行时,则reading=true
总结:通过这种对lastReadTime的定时任务检测,就可以发现是否已经长时间未读,若是,则发送下游READER_IDLE事件,下游检测到该事件进行相应处理即可。
3.3 写空闲检测分析过程与3.2 读空闲检测类似,我们先来看下重写后的write方法
public class IdleStateHandler extends ChannelDuplexHandler { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // 如果writerIdleTimeNanos或allIdleTimeNanos大于0,说明需要进行写空闲检测 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) { // 对write方法执行后的ChannelFuture添加监听器,writeListener内容如下 ctx.write(msg, promise.unvoid()).addListener(writeListener); } else { ctx.write(msg, promise); } } private final ChannelFutureListener writeListener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // 当写方法完成时,设置最新一次写时间为当前时间 lastWriteTime = ticksInNanos(); firstWriterIdleEvent = firstAllIdleEvent = true; } }; }
方法并不复杂,主要就是对write方法添加一个监听器,用于监听wirte方法完成,完成后重置下lastWriteTime。下面来看下WriteTask所做的事情
3.3.1 WriterIdleTimeoutTask 写空闲检测
private final class WriterIdleTimeoutTask extends AbstractIdleTask { // 将ChannelHandlerContext传入当前task WriterIdleTimeoutTask(ChannelHandlerContext ctx) { super(ctx); } @Override protected void run(ChannelHandlerContext ctx) { long lastWriteTime = IdleStateHandler.this.lastWriteTime; // 同样的方式来计算nextDelay long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime); // 已超时 if (nextDelay <= 0) { // 先生成一个定时任务,用于下次写超时检测 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstWriterIdleEvent; firstWriterIdleEvent = false; try { // 这里比较有意思,我们具体在3.3.2 来看下 if (hasOutputChanged(ctx, first)) { return; } // 直接向下游传递一个WRITER_IDLE事件 IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first); channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } } else { // 说明写未超时,重新生成一个定时任务,延迟nextDelay执行 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS); } } }
3.3.2 hasOutputChanged() 判断Channeloutboundbuffer是否发生过变化
public class IdleStateHandler extends ChannelDuplexHandler { private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) { // 默认observeOutput=false,不会进行下面的检测,需要主动开启 if (observeOutput) { // 如果 lastChangeCheckTimeStamp 和 lastWriteTime 不一样,说明写 *** 作进行过了,需要更新此值 if (lastChangeCheckTimeStamp != lastWriteTime) { lastChangeCheckTimeStamp = lastWriteTime; // 非首次,则直接返回true // 这里的first,即firstWriterIdleEvent参数,默认为true,当写 *** 作完成时也被置为true if (!first) { return true; } } Channel channel = ctx.channel(); Unsafe unsafe = channel.unsafe(); ChannelOutboundBuffer buf = unsafe.outboundBuffer(); if (buf != null) { int messageHashCode = System.identityHashCode(buf.current()); long pendingWriteBytes = buf.totalPendingWriteBytes(); // 这来主要判断ChannelOutboundBuffer中的值是否发生了变化 // 如果有数据添加进来,则前后肯定不一致 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) { lastMessageHashCode = messageHashCode; lastPendingWriteBytes = pendingWriteBytes; if (!first) { return true; } } long flushProgress = buf.currentProgress(); if (flushProgress != lastFlushProgress) { lastFlushProgress = flushProgress; if (!first) { return true; } } } } return false; } }
关于检测Channeloutboundbuffer变化的逻辑,在正常使用IdleStateHandler中是不会触发的,具体的细节分析可以参考下下文
Netty 心跳服务之 IdleStateHandler 源码分析 - 简书
总结:写空闲检测与读空闲检测基本类似,笔者不再赘述。
3.4 AllIdle检测这个的检测与上述读写检测基本是一样的,大家可以自行阅读AllIdleTimeoutTask.java,笔者不再赘述
总结:IdleStateHandler的代码不算复杂,在我们的应用探活中可以很好的发挥作用。
主要还是我们监听到Idle event后的自定义处理方案,这个才是关键。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)