netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理

netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理,第1张

netty写数据的时候,会先放到一个缓存队列AbstractNioChannel.writeBufferQueue中,这个队列是WriteRequestQueue

Java代码

public void eventSunk(

ChannelPipeline pipeline, ChannelEvent e) throws Exception {

if (e instanceof ChannelStateEvent) {

……

} else if (e instanceof MessageEvent) {

MessageEvent event = (MessageEvent) e

NioSocketChannel channel = (NioSocketChannel) event.getChannel()

boolean offered = channel.writeBufferQueue.offer(event)//写到channel的writeBufferQueue

assert offered

channel.worker.writeFromUserCode(channel)

}

}

WriteRequestQueue的offer方法中会根据缓存消息的总大小(字节数)判断是否超过了高水位线highWaterMark,如果第一次超过了超过高水位线,就会fireChannelInterestChanged;后边如果仍然一直往队列放数据,缓存的消息的大小持续超过高水位线的时候,不会再fireChannelInterestChanged。

Java代码

public boolean offer(MessageEvent e) {

boolean success = queue.offer(e)

assert success

int messageSize = getMessageSize(e)

int newWriteBufferSize = writeBufferSize.addAndGet(messageSize)

int highWaterMark = getConfig().getWriteBufferHighWaterMark()

if (newWriteBufferSize >= highWaterMark) {

if (newWriteBufferSize - messageSize <highWaterMark) {

highWaterMarkCounter.incrementAndGet()

if (!notifying.get()) {

notifying.set(Boolean.TRUE)

fireChannelInterestChanged(AbstractNioChannel.this)

notifying.set(Boolean.FALSE)

}

}

}

return true

}

fireChannelInterestChanged这个会调到SimpleChannelUpstreamHandler.handleUpstream,触发SimpleChannelUpstreamHandler.channelInterestChanged,可以通过继承这个方法来自定义做些事情。高水位的值可以通过Bootstrap设置,最终会调到DefaultNioSocketChannelConfig.setOption。writeBufferHighWaterMark默认值为64K

Java代码

public boolean setOption(String key, Object value) {

if (super.setOption(key, value)) {

return true

}

if ("writeBufferHighWaterMark".equals(key)) {

setWriteBufferHighWaterMark0(ConversionUtil.toInt(value))

} else if ("writeBufferLowWaterMark".equals(key)) {

setWriteBufferLowWaterMark0(ConversionUtil.toInt(value))

} else if ("writeSpinCount".equals(key)) {

setWriteSpinCount(ConversionUtil.toInt(value))

} else if ("receiveBufferSizePredictorFactory".equals(key)) {

setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) value)

} else if ("receiveBufferSizePredictor".equals(key)) {

setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value)

} else {

return false

}

return true

}

然后在write0的时候会从队列拉数据,拉数据的时候,如果发现本次拉的数据会导致缓存的数据大小(字节)从低水位writeBufferLowWaterMark之上,掉到了低水位之下,即跨过了低水位,会再次触发fireChannelInterestChanged事件。writeBufferLowWaterMark默认值为32K

Java代码

public MessageEvent poll() {

MessageEvent e = queue.poll()

if (e != null) {

int messageSize = getMessageSize(e)

int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize)

int lowWaterMark = getConfig().getWriteBufferLowWaterMark()

if (newWriteBufferSize == 0 || newWriteBufferSize <lowWaterMark) {

if (newWriteBufferSize + messageSize >= lowWaterMark) {//本次拉取,是的缓存数据大小掉到了低水位之下

highWaterMarkCounter.decrementAndGet()

if (isConnected() &&!notifying.get()) {

notifying.set(Boolean.TRUE)

fireChannelInterestChanged(AbstractNioChannel.this)

notifying.set(Boolean.FALSE)

}

}

}

}

return e

}

超过高水位和低于低水位都会触发fireChannelInterestChanged,怎么区分呢?通过AbstractChannel. isWritable(),如果channel的interestOps里边有注册过OP_WRITE,则是不可写的,否则是可写的

Java代码

public boolean isWritable() {

return (getInterestOps() &OP_WRITE) == 0

}

public int getInterestOps() {

if (!isOpen()) {

return Channel.OP_WRITE

}

int interestOps = getRawInterestOps()

int writeBufferSize = this.writeBufferSize.get()

if (writeBufferSize != 0) {

if (highWaterMarkCounter.get() >0) {//还记得这个值,放数据到发送队列的时候值+=1,从队列拉数据出来的时候值-=1

int lowWaterMark = getConfig().getWriteBufferLowWaterMark()

if (writeBufferSize >= lowWaterMark) {//缓存队列数据量,超过高水位,也超过了低水位,意味着高水位>低水位,此时等于注册写 *** 作

interestOps |= Channel.OP_WRITE

} else {

interestOps &= ~Channel.OP_WRITE//缓存队列数据量,超过高水位但是低于低水位,意味着低水位>高水位,此时等于没有注册写 *** 作

}

} else {//超过高水位counter<=0,意味着当前数据量小于高水位

int highWaterMark = getConfig().getWriteBufferHighWaterMark()

if (writeBufferSize >= highWaterMark) {//这里,缓存数据量仍然高于高水位.....并发?按道理说channel的处理是单线程处理的,此时等于注册写 *** 作

interestOps |= Channel.OP_WRITE

} else {

interestOps &= ~Channel.OP_WRITE

}

}

} else {

interestOps &= ~Channel.OP_WRITE//写队列没数据,没有注册写 *** 作

}

return interestOps

}

即,如果超过高水位isWritable()==false,低于低水位isWritable()==true,低水位优先级高于高水位,即如果 当前水位>低水位 则不可写,否则可写

如果在通过netty向某机器写数据,但是写很缓慢,则会导致数据都缓存到netty的发送队列中,如果不做控制,可能会导致full gc/cms gc频繁,甚至最终OOM。所以可以考虑用高水位和低水位的值来控制netty的缓存队列,即用AbstractChannel.isWritable来控制是否继续写,如果AbstractChannel.isWritable==false,则丢弃数据,或者记录发送数据的状态,待后续缓存数据队列水位下降到安全水位后再发送。

最近使用 netty 过程中发现了几个比较细节的 Connection reset by peer 异常,做个笔记。

这个场景出现在用 Jedis ping 检测的场景,用完直接 close,服务端稳定出现 Connection reset by peer。

tcpdump 一下就很容易定位到问题所在,客户端收到 PONG 响应后直接发了一个 RST 包给服务端:

查看 Jedis 的源码发现 socket 有个比较特殊的配置 socket.setSoLinger(true, 0) 。

先看一下 man7/socket.7 的解释:

坦白说不是很明白啥意思。。。

最终在 stackoverflow 上找到一个比较容易理解的解释:

简而言之,设置 SO_LINGER(0) 可以不进行四次挥手直接关闭 TCP 连接,在协议交互上就是直接发 RST 包,这样的好处是可以避免长时间处于 TIME_WAIT 状态,当然 TIME_WAIT 存在也是有原因的,大部分评论都不建议这样配置。

这个场景有点儿微妙,首先得理解一下 tcp 的两个队列。

这篇文章讲得比较清楚: SYN packet handling in the wild

accept 队列满通常是由于 netty boss 线程处理慢,特别是在容器化之后,服务刚启动的时候很容易出现 CPU 受限。

为了模拟这个现象,我写了个示例程序 shichaoyuan/netty-backlog-test ,设置 SO_BACKLOG 为 1,并且在 accept 第一个连接后设置 autoRead 为 false,也就是让 boss 线程不再继续 accept 连接。

启动第一个 Client,可以正常连接,发送 PING,接收 PONG。

启动第二个 Client,也可以正常连接,但是没有收到 PONG:

可见这个连接创建成功了,已经在 Accept Queue 里了,但是进程没有 accept,所以没有与进程绑定。

启动第三个 Client,也可以正常连接,也没有收到 PONG:

与第二个连接一样。

启动第四个 Client,也可以正常连接,但是在发送 PING 后出现 Connection reset by peer:

这个连接在服务端并没有进入 accept queue,处于 SYN_RECV 状态,并且很快就消失了(因为 accept queue 已经满了,无法转入 ESTABLISHED 状态)。

抓包看一下:

从客户端视角来看连接确实是建成功了,有一个比较特殊的地方在三次握手之后,服务端又向客户端发送了一个 [S.],客户端回复了一个 [.],这个交互看起来不影响连接。

服务端后来销毁了连接,而客户端还认为连接是 ESTABLISHED 的,发送 PING 消息,服务端自然得回复一个 RST。

PS:我在 Windows 10 的 WSL2 中实验这种场景是建连接超时,可能不同的 *** 作系统或 linux 版本对这个交互的过程处理不同,在此不进行进一步测试了。

以上,这个故事告诉我们判断连接是否可用,建成功之后应该发个心跳包测试一下。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/9408909.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-28
下一篇 2023-04-28

发表评论

登录后才能评论

评论列表(0条)

保存