在前文我们对flink早期基于TCP的反压和现在基于信任值的反压机制进行了剖析 Flink反压机制剖析,本文主要从源码的角度对flink基于Credit&BackLog的反压机制从源码的角度剖析其具体实现过程。
基于TCP反压的问题- TaskManager之间会启动一个TCP通道进行数据交互,TaskManager的所有Task通过多路复用使用同一个TCP通道。因此,当下游因一个Task处理能力不足造成反压时,就会导致整个TCP通道阻塞。即使其他Task还有空余的Buffer也无法接受数据。上游ResultPartition只能通过TCP通道的状态被动感知下游处理能力,不能提前调整数据发生评率,也不能根据ResultPartition当前数据挤压情况调节下游节点的数据处理能力。
信任值反压的基本原理:引入上游BackLog表示上游数据挤压情况(即ResultSubPartition的队列中BufferConsumer的挤压情况),下游Credit表示下游处理能力(即InputChannel中Buffer队列中可用Buffer数量),动态调节上下游数据生产和处理频率。
具体过程如下:
- RemoteInputChannel启动过程中,会向NetworkBufferPool申请ExclusiveBuffers空间,具体大小根究配置决定RemoteInputChannel启动后,会向ResultPartition发生PartitionRequest请求,其中会包含InitCredit值,InitCredit值等于ExclusiveBuffers队列的大小。Credit值会写到上游ResultSubPartition对于的CreditbaseViewReader中,ViewReader通过消耗Credit读取Buffer。上游ResultSubPartition的BufferConsumer队列写入新的BufferConsumer时,会同步增加BackLog值,最后BackLog值会和Buffer一起转换为BufferResponse结构发生到RemoteInputChannel中。RemoteInputChannel会根据BackLog指标,判断当前ExclusiveBuffers是否够用,如果不够,则向NetworkBufferPool申请FloatingBuffers,并更新UNAnnouncedCredit指标。RemoteInputChannel检测到足够的Buffer后,向ResultPartition发生UNAnnouncedCredit信用值,增加该RemoteInputChannel的信用值。当CreditbaseViewReader有足够的Credit后,会将CreditbaseViewReader添加到AvailableReader队列中,然后从ResultSubPartition中读取Buffer数据。
算子处理完数据,会通过RecordWriter将数据写入ResultSubPartition的Buffers队列中,并更新BackLog指标。
class PipelinedSubpartition extends ResultSubpartition { private boolean add(BufferConsumer bufferConsumer, boolean finish) { checkNotNull(bufferConsumer); final boolean notifyDataAvailable; synchronized (buffers) { if (isFinished || isReleased) { bufferConsumer.close(); return false; } // 将BufferConsumers添加到队列中 buffers.add(bufferConsumer); updateStatistics(bufferConsumer); //增加BackLog指标 increaseBuffersInBacklog(bufferConsumer); notifyDataAvailable = shouldNotifyDataAvailable() || finish; isFinished |= finish; } if (notifyDataAvailable) { notifyDataAvailable(); } return true; } //同步增加BackLog指标 private void increaseBuffersInBacklog(BufferConsumer buffer) { assert Thread.holdsLock(buffers); if (buffer != null && buffer.isBuffer()) { buffersInBacklog++; } } //ViewReader消费完Buffer后,同步减少BackLog指标 private void decreaseBuffersInBacklogUnsafe(boolean isBuffer) { assert Thread.holdsLock(buffers); if (isBuffer) { buffersInBacklog--; } } }
BackLog和Buffer被消费出来封装为BufferResponse对象发送给下游RemoteInputChannel处理。
InputChannel处理BackLog下游NettyClient的Handler中会调用到InputChannel.onBuffer方法,将数据写到InputChannel的Buffer队列中,期间会调用onSenderBackLog方法处理BackLog指标。
public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener { void onSenderBacklog(int backlog) throws IOException { int numRequestedBuffers = 0; synchronized (bufferQueue) { if (isReleased.get()) { return; } //通过BackLog+initialCredit计算需要的Buffer数 numRequiredBuffers = backlog + initialCredit; while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) { //InputChannel中可用Buffer数不够,向LocalBufferPool申请FloatingBuffer Buffer buffer = inputGate.getBufferPool().requestBuffer(); if (buffer != null) { bufferQueue.addFloatingBuffer(buffer); numRequestedBuffers++; } else if (inputGate.getBufferProvider().addBufferListener(this)) { // 没有申请到Buffer,将当前InputChannel注册为监听器,等待获取更多的FloatingBuffer isWaitingForFloatingBuffers = true; break; } } } //将申请的FloatingBuffer的数量增加到unannouncedCredit中,用于增加信任值 if (numRequestedBuffers > 0 && unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) { notifyCreditAvailable(); } } }
BackLog的大小会影响RemoteInputChannel中FloatingBuffer的申请数量,通过BackLog可以调节RemoteInputChannel的数据接入和处理能力。
InputChannel向ResultPartition发生CreditRemoteInputChannel中的信用值分为两类:
- InitCredit:RemoteInputChannel的初始Credit,InitialCredit和RemoteInputChannel中的ExclusiveBuffers的数量保持一直。UnAnnouncedCredit:对于FloatingBuffer数量,UnAnnouncedCredit需要实时动态反馈给ResultPartition,以告知RemoteInputChannel具有更多的信用值处理数据
当RemoteInputChannel中的Credit因可用Buffer改变而变化时,RemoteInputChannel会调用notifyCreditAvailable()方法通知上游的ResultPartition
public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener { private void notifyCreditAvailable() { checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue."); //通过partitionRequestClient将当前InputChannel注册到NettyClient的CreditbaseHandler中。 partitionRequestClient.notifyCreditAvailable(this); } }
将InputChannel作为UserEvent事件注入到Handler的pipeline中,然后回调userEventTriggered方法,将InputChannel放入队列中。
class CreditbasedPartitionRequestClientHandler extends ChannelInboundHandlerAdapter implements NetworkClientHandler { //Step1 @Override public void notifyCreditAvailable(final RemoteInputChannel inputChannel) { //将InputChannel作为UserEvent事件注入到pipeline循环中 ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel)); } //Step2:handler检测到UserEvent事件会调用该方法 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof RemoteInputChannel) { boolean triggerWrite = inputChannelsWithCredit.isEmpty(); //将InputChannel添加到inputChannelsWithCredit队列中 inputChannelsWithCredit.add((RemoteInputChannel) msg); //如果之前inputChannelsWithCredit初始为空,则触发 if (triggerWrite) { writeAndFlushNextMessageIfPossible(ctx.channel()); } } else { ctx.fireUserEventTriggered(msg); } } //Step3:将InputChannel新增的Credit发生给上游的ResultSubPartition private void writeAndFlushNextMessageIfPossible(Channel channel) { //判断通道是否为可用状态 if (channelError.get() != null || !channel.isWritable()) { return; } //循环读取inputChannelsWithCredit队列中的InputChannel while (true) { RemoteInputChannel inputChannel = inputChannelsWithCredit.poll(); if (inputChannel == null) { return; } //如果InputChannel已被释放,则不需要发生Credit if (!inputChannel.isReleased()) { //创建AddCredit请求 AddCredit msg = new AddCredit( inputChannel.getPartitionId(), inputChannel.getAndResetUnannouncedCredit(),//获取UnannouncedCredit,并清零 inputChannel.getInputChannelId()); // 向TCP Channel中写入AddCredit消息 channel.writeAndFlush(msg).addListener(writeListener); return; } } } }ResultPartition处理信用值
上游NettyServer接受到消息后,会先进入PartitionRequestServerHandler进行分类处理,当判断NettyMessage为AddCredit类型时,就会调用PartitionRequestQueue这个Handler的addCredit方法继续处理。
//Step1 void addCredit(InputChannelID receiverId, int credit) throws Exception { if (fatalError) { return; } //获取InputChannel对应的ViewReader NetworkSequenceViewReader reader = allReaders.get(receiverId); if (reader != null) { //增加ViewReader的信任值 reader.addCredit(credit); //将该ViewReader将入可用队列,之后会使用队列中的ViewReader读取ResultSubPartition中的Buffer数据 enqueueAvailableReader(reader); } else { throw new IllegalStateException("No reader for receiverId = " + receiverId + " exists."); } } //Step2 private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception { if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) { return; } // 将ViewReader注册到可用队列中 boolean triggerWrite = availableReaders.isEmpty(); registerAvailableReader(reader); //如果之前队列为空,则再次开始调用队列中的ViewReader读取Buffer数据 if (triggerWrite) { writeAndFlushNextMessageIfPossible(ctx.channel()); } } //Step3 //循环调用可用队列中的ViewReader,读取Buffer数据 private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException { if (fatalError || !channel.isWritable()) { return; } BufferAndAvailability next = null; try { while (true) { NetworkSequenceViewReader reader = pollAvailableReader(); // 没有可用的ViewReader就返回,等待下次有数据写入ResultSubPartition或者有Credit更新时,再次调用 if (reader == null) { return; } //通过ViewReader读取Buffer数据 next = reader.getNextBuffer(); if (next == null) { if (!reader.isReleased()) { continue; } Throwable cause = reader.getFailureCause(); if (cause != null) { ErrorResponse msg = new ErrorResponse( new ProducerFailedException(cause), reader.getReceiverId()); ctx.writeAndFlush(msg); } } else { // 如果该ResultSubPartition还有可读Buffer,注册ViewReader继续读取Buffer if (next.moreAvailable()) { registerAvailableReader(reader); } //创建BufferResponse请求 BufferResponse msg = new BufferResponse( next.buffer(), reader.getSequenceNumber(),//获取Record的序列号 reader.getReceiverId(), next.buffersInBacklog());//获取BackLog指标 // 通过TCP Channel将BufferResponse消息发送给InputChannel channel.writeAndFlush(msg).addListener(writeListener); return; } } } catch (Throwable t) { if (next != null) { next.buffer().recycleBuffer(); } throw new IOException(t.getMessage(), t); } }总结
基于Credit的反压机制主要是通过上下游的BackLog&Credit指标控制的。
- BackLog表明上游积压的数据量,通过BackLog会影响到下游RemoteInputChannel中FloatingBuffer的数量,提升InputChannel的处理能力。Credit体现了下游的处理能力,当下游RemoteInputChannel的AvailableBuffer的数量变化时,会将增加的Buffer数量以Credit的形式发生给上游,表明下游有更多的Buffer接受数据。如果下游数据处理不及时,上游就会发生BackLog提升RemoteInputChannel中FloatingBuffer的数量。如果RemoteInputChannel无法申请更多的FloatingBuffer,则不会继续向上游发生Credit,此时上游的Handler就会把ViewReader从AvailableReader队列中移除,就不会再讲ResultSubPartition中的Buffer推送给下游。直到下游有足够Credit,才会再次触发ViewReader的读取 *** 作。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)