Java代码
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
……
} else if (e instanceof MessageEvent) {
MessageEvent event = (MessageEvent) e
NioSocketChannel channel = (NioSocketChannel) event.getChannel()
boolean offered = channel.writeBufferQueue.offer(event)//写到channel的writeBufferQueue
assert offered
channel.worker.writeFromUserCode(channel)
}
}
WriteRequestQueue的offer方法中会根据缓存消息的总大小(字节数)判断是否超过了高水位线highWaterMark,如果第一次超过了超过高水位线,就会fireChannelInterestChanged;后边如果仍然一直往队列放数据,缓存的消息的大小持续超过高水位线的时候,不会再fireChannelInterestChanged。
Java代码
public boolean offer(MessageEvent e) {
boolean success = queue.offer(e)
assert success
int messageSize = getMessageSize(e)
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize)
int highWaterMark = getConfig().getWriteBufferHighWaterMark()
if (newWriteBufferSize >= highWaterMark) {
if (newWriteBufferSize - messageSize <highWaterMark) {
highWaterMarkCounter.incrementAndGet()
if (!notifying.get()) {
notifying.set(Boolean.TRUE)
fireChannelInterestChanged(AbstractNioChannel.this)
notifying.set(Boolean.FALSE)
}
}
}
return true
}
fireChannelInterestChanged这个会调到SimpleChannelUpstreamHandler.handleUpstream,触发SimpleChannelUpstreamHandler.channelInterestChanged,可以通过继承这个方法来自定义做些事情。高水位的值可以通过Bootstrap设置,最终会调到DefaultNioSocketChannelConfig.setOption。writeBufferHighWaterMark默认值为64K
Java代码
public boolean setOption(String key, Object value) {
if (super.setOption(key, value)) {
return true
}
if ("writeBufferHighWaterMark".equals(key)) {
setWriteBufferHighWaterMark0(ConversionUtil.toInt(value))
} else if ("writeBufferLowWaterMark".equals(key)) {
setWriteBufferLowWaterMark0(ConversionUtil.toInt(value))
} else if ("writeSpinCount".equals(key)) {
setWriteSpinCount(ConversionUtil.toInt(value))
} else if ("receiveBufferSizePredictorFactory".equals(key)) {
setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) value)
} else if ("receiveBufferSizePredictor".equals(key)) {
setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value)
} else {
return false
}
return true
}
然后在write0的时候会从队列拉数据,拉数据的时候,如果发现本次拉的数据会导致缓存的数据大小(字节)从低水位writeBufferLowWaterMark之上,掉到了低水位之下,即跨过了低水位,会再次触发fireChannelInterestChanged事件。writeBufferLowWaterMark默认值为32K
Java代码
public MessageEvent poll() {
MessageEvent e = queue.poll()
if (e != null) {
int messageSize = getMessageSize(e)
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize)
int lowWaterMark = getConfig().getWriteBufferLowWaterMark()
if (newWriteBufferSize == 0 || newWriteBufferSize <lowWaterMark) {
if (newWriteBufferSize + messageSize >= lowWaterMark) {//本次拉取,是的缓存数据大小掉到了低水位之下
highWaterMarkCounter.decrementAndGet()
if (isConnected() &&!notifying.get()) {
notifying.set(Boolean.TRUE)
fireChannelInterestChanged(AbstractNioChannel.this)
notifying.set(Boolean.FALSE)
}
}
}
}
return e
}
超过高水位和低于低水位都会触发fireChannelInterestChanged,怎么区分呢?通过AbstractChannel. isWritable(),如果channel的interestOps里边有注册过OP_WRITE,则是不可写的,否则是可写的
Java代码
public boolean isWritable() {
return (getInterestOps() &OP_WRITE) == 0
}
public int getInterestOps() {
if (!isOpen()) {
return Channel.OP_WRITE
}
int interestOps = getRawInterestOps()
int writeBufferSize = this.writeBufferSize.get()
if (writeBufferSize != 0) {
if (highWaterMarkCounter.get() >0) {//还记得这个值,放数据到发送队列的时候值+=1,从队列拉数据出来的时候值-=1
int lowWaterMark = getConfig().getWriteBufferLowWaterMark()
if (writeBufferSize >= lowWaterMark) {//缓存队列数据量,超过高水位,也超过了低水位,意味着高水位>低水位,此时等于注册写 *** 作
interestOps |= Channel.OP_WRITE
} else {
interestOps &= ~Channel.OP_WRITE//缓存队列数据量,超过高水位但是低于低水位,意味着低水位>高水位,此时等于没有注册写 *** 作
}
} else {//超过高水位counter<=0,意味着当前数据量小于高水位
int highWaterMark = getConfig().getWriteBufferHighWaterMark()
if (writeBufferSize >= highWaterMark) {//这里,缓存数据量仍然高于高水位.....并发?按道理说channel的处理是单线程处理的,此时等于注册写 *** 作
interestOps |= Channel.OP_WRITE
} else {
interestOps &= ~Channel.OP_WRITE
}
}
} else {
interestOps &= ~Channel.OP_WRITE//写队列没数据,没有注册写 *** 作
}
return interestOps
}
即,如果超过高水位isWritable()==false,低于低水位isWritable()==true,低水位优先级高于高水位,即如果 当前水位>低水位 则不可写,否则可写
如果在通过netty向某机器写数据,但是写很缓慢,则会导致数据都缓存到netty的发送队列中,如果不做控制,可能会导致full gc/cms gc频繁,甚至最终OOM。所以可以考虑用高水位和低水位的值来控制netty的缓存队列,即用AbstractChannel.isWritable来控制是否继续写,如果AbstractChannel.isWritable==false,则丢弃数据,或者记录发送数据的状态,待后续缓存数据队列水位下降到安全水位后再发送。
最近使用 netty 过程中发现了几个比较细节的 Connection reset by peer 异常,做个笔记。
这个场景出现在用 Jedis ping 检测的场景,用完直接 close,服务端稳定出现 Connection reset by peer。
tcpdump 一下就很容易定位到问题所在,客户端收到 PONG 响应后直接发了一个 RST 包给服务端:
查看 Jedis 的源码发现 socket 有个比较特殊的配置 socket.setSoLinger(true, 0) 。
先看一下 man7/socket.7 的解释:
坦白说不是很明白啥意思。。。
最终在 stackoverflow 上找到一个比较容易理解的解释:
简而言之,设置 SO_LINGER(0) 可以不进行四次挥手直接关闭 TCP 连接,在协议交互上就是直接发 RST 包,这样的好处是可以避免长时间处于 TIME_WAIT 状态,当然 TIME_WAIT 存在也是有原因的,大部分评论都不建议这样配置。
这个场景有点儿微妙,首先得理解一下 tcp 的两个队列。
这篇文章讲得比较清楚: SYN packet handling in the wild
accept 队列满通常是由于 netty boss 线程处理慢,特别是在容器化之后,服务刚启动的时候很容易出现 CPU 受限。
为了模拟这个现象,我写了个示例程序 shichaoyuan/netty-backlog-test ,设置 SO_BACKLOG 为 1,并且在 accept 第一个连接后设置 autoRead 为 false,也就是让 boss 线程不再继续 accept 连接。
启动第一个 Client,可以正常连接,发送 PING,接收 PONG。
启动第二个 Client,也可以正常连接,但是没有收到 PONG:
可见这个连接创建成功了,已经在 Accept Queue 里了,但是进程没有 accept,所以没有与进程绑定。
启动第三个 Client,也可以正常连接,也没有收到 PONG:
与第二个连接一样。
启动第四个 Client,也可以正常连接,但是在发送 PING 后出现 Connection reset by peer:
这个连接在服务端并没有进入 accept queue,处于 SYN_RECV 状态,并且很快就消失了(因为 accept queue 已经满了,无法转入 ESTABLISHED 状态)。
抓包看一下:
从客户端视角来看连接确实是建成功了,有一个比较特殊的地方在三次握手之后,服务端又向客户端发送了一个 [S.],客户端回复了一个 [.],这个交互看起来不影响连接。
服务端后来销毁了连接,而客户端还认为连接是 ESTABLISHED 的,发送 PING 消息,服务端自然得回复一个 RST。
PS:我在 Windows 10 的 WSL2 中实验这种场景是建连接超时,可能不同的 *** 作系统或 linux 版本对这个交互的过程处理不同,在此不进行进一步测试了。
以上,这个故事告诉我们判断连接是否可用,建成功之后应该发个心跳包测试一下。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)