flink的反压机制(源码剖析)

flink的反压机制(源码剖析),第1张

flink的反压机制(源码剖析) 背景

在前文我们对flink早期基于TCP的反压和现在基于信任值的反压机制进行了剖析 Flink反压机制剖析,本文主要从源码的角度对flink基于Credit&BackLog的反压机制从源码的角度剖析其具体实现过程。

基于TCP反压的问题

    TaskManager之间会启动一个TCP通道进行数据交互,TaskManager的所有Task通过多路复用使用同一个TCP通道。因此,当下游因一个Task处理能力不足造成反压时,就会导致整个TCP通道阻塞。即使其他Task还有空余的Buffer也无法接受数据。上游ResultPartition只能通过TCP通道的状态被动感知下游处理能力,不能提前调整数据发生评率,也不能根据ResultPartition当前数据挤压情况调节下游节点的数据处理能力。
基于信用值的反压机制介绍

信任值反压的基本原理:引入上游BackLog表示上游数据挤压情况(即ResultSubPartition的队列中BufferConsumer的挤压情况),下游Credit表示下游处理能力(即InputChannel中Buffer队列中可用Buffer数量),动态调节上下游数据生产和处理频率。
具体过程如下:

    RemoteInputChannel启动过程中,会向NetworkBufferPool申请ExclusiveBuffers空间,具体大小根究配置决定RemoteInputChannel启动后,会向ResultPartition发生PartitionRequest请求,其中会包含InitCredit值,InitCredit值等于ExclusiveBuffers队列的大小。Credit值会写到上游ResultSubPartition对于的CreditbaseViewReader中,ViewReader通过消耗Credit读取Buffer。上游ResultSubPartition的BufferConsumer队列写入新的BufferConsumer时,会同步增加BackLog值,最后BackLog值会和Buffer一起转换为BufferResponse结构发生到RemoteInputChannel中。RemoteInputChannel会根据BackLog指标,判断当前ExclusiveBuffers是否够用,如果不够,则向NetworkBufferPool申请FloatingBuffers,并更新UNAnnouncedCredit指标。RemoteInputChannel检测到足够的Buffer后,向ResultPartition发生UNAnnouncedCredit信用值,增加该RemoteInputChannel的信用值。当CreditbaseViewReader有足够的Credit后,会将CreditbaseViewReader添加到AvailableReader队列中,然后从ResultSubPartition中读取Buffer数据。
基于信用值的反压机制详解 ResultPartition发送BackLog

算子处理完数据,会通过RecordWriter将数据写入ResultSubPartition的Buffers队列中,并更新BackLog指标。

class PipelinedSubpartition extends ResultSubpartition {
	private boolean add(BufferConsumer bufferConsumer, boolean finish) {
		checkNotNull(bufferConsumer);

		final boolean notifyDataAvailable;
		synchronized (buffers) {
			if (isFinished || isReleased) {
				bufferConsumer.close();
				return false;
			}

			// 将BufferConsumers添加到队列中
			buffers.add(bufferConsumer);
			updateStatistics(bufferConsumer);
			//增加BackLog指标
			increaseBuffersInBacklog(bufferConsumer);
			notifyDataAvailable = shouldNotifyDataAvailable() || finish;

			isFinished |= finish;
		}

		if (notifyDataAvailable) {
			notifyDataAvailable();
		}

		return true;
	}
	//同步增加BackLog指标
	private void increaseBuffersInBacklog(BufferConsumer buffer) {
		assert Thread.holdsLock(buffers);

		if (buffer != null && buffer.isBuffer()) {
			buffersInBacklog++;
		}
	}
	//ViewReader消费完Buffer后,同步减少BackLog指标
	private void decreaseBuffersInBacklogUnsafe(boolean isBuffer) {
		assert Thread.holdsLock(buffers);
		if (isBuffer) {
			buffersInBacklog--;
		}
	}
}

BackLog和Buffer被消费出来封装为BufferResponse对象发送给下游RemoteInputChannel处理。

InputChannel处理BackLog

下游NettyClient的Handler中会调用到InputChannel.onBuffer方法,将数据写到InputChannel的Buffer队列中,期间会调用onSenderBackLog方法处理BackLog指标。

public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener {
	void onSenderBacklog(int backlog) throws IOException {
		int numRequestedBuffers = 0;

		synchronized (bufferQueue) {
			if (isReleased.get()) {
				return;
			}
			//通过BackLog+initialCredit计算需要的Buffer数
			numRequiredBuffers = backlog + initialCredit;
			while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) {
				//InputChannel中可用Buffer数不够,向LocalBufferPool申请FloatingBuffer
				Buffer buffer = inputGate.getBufferPool().requestBuffer();
				if (buffer != null) {
					bufferQueue.addFloatingBuffer(buffer);
					numRequestedBuffers++;
				} else if (inputGate.getBufferProvider().addBufferListener(this)) {
					// 没有申请到Buffer,将当前InputChannel注册为监听器,等待获取更多的FloatingBuffer
					isWaitingForFloatingBuffers = true;
					break;
				}
			}
		}
		//将申请的FloatingBuffer的数量增加到unannouncedCredit中,用于增加信任值
		if (numRequestedBuffers > 0 && unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) {
			notifyCreditAvailable();
		}
	}
}

BackLog的大小会影响RemoteInputChannel中FloatingBuffer的申请数量,通过BackLog可以调节RemoteInputChannel的数据接入和处理能力。

InputChannel向ResultPartition发生Credit

RemoteInputChannel中的信用值分为两类:

    InitCredit:RemoteInputChannel的初始Credit,InitialCredit和RemoteInputChannel中的ExclusiveBuffers的数量保持一直。UnAnnouncedCredit:对于FloatingBuffer数量,UnAnnouncedCredit需要实时动态反馈给ResultPartition,以告知RemoteInputChannel具有更多的信用值处理数据

当RemoteInputChannel中的Credit因可用Buffer改变而变化时,RemoteInputChannel会调用notifyCreditAvailable()方法通知上游的ResultPartition

public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener {
	private void notifyCreditAvailable() {
		checkState(partitionRequestClient != null, "Tried to send task event to producer before requesting a queue.");
		//通过partitionRequestClient将当前InputChannel注册到NettyClient的CreditbaseHandler中。
		partitionRequestClient.notifyCreditAvailable(this);
	}
}

将InputChannel作为UserEvent事件注入到Handler的pipeline中,然后回调userEventTriggered方法,将InputChannel放入队列中。

class CreditbasedPartitionRequestClientHandler extends ChannelInboundHandlerAdapter implements NetworkClientHandler {

	//Step1
	@Override
	public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
		//将InputChannel作为UserEvent事件注入到pipeline循环中
		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));
	}

	//Step2:handler检测到UserEvent事件会调用该方法
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
		if (msg instanceof RemoteInputChannel) {
			boolean triggerWrite = inputChannelsWithCredit.isEmpty();
			//将InputChannel添加到inputChannelsWithCredit队列中
			inputChannelsWithCredit.add((RemoteInputChannel) msg);
			//如果之前inputChannelsWithCredit初始为空,则触发
			if (triggerWrite) {
				writeAndFlushNextMessageIfPossible(ctx.channel());
			}
		} else {
			ctx.fireUserEventTriggered(msg);
		}
	}
	//Step3:将InputChannel新增的Credit发生给上游的ResultSubPartition
	private void writeAndFlushNextMessageIfPossible(Channel channel) {
		//判断通道是否为可用状态
		if (channelError.get() != null || !channel.isWritable()) {
			return;
		}
		//循环读取inputChannelsWithCredit队列中的InputChannel
		while (true) {
			RemoteInputChannel inputChannel = inputChannelsWithCredit.poll();

			if (inputChannel == null) {
				return;
			}

			//如果InputChannel已被释放,则不需要发生Credit
			if (!inputChannel.isReleased()) {
				//创建AddCredit请求
				AddCredit msg = new AddCredit(
					inputChannel.getPartitionId(),
					inputChannel.getAndResetUnannouncedCredit(),//获取UnannouncedCredit,并清零
					inputChannel.getInputChannelId());

				// 向TCP Channel中写入AddCredit消息
				channel.writeAndFlush(msg).addListener(writeListener);

				return;
			}
		}
	}
}
ResultPartition处理信用值

上游NettyServer接受到消息后,会先进入PartitionRequestServerHandler进行分类处理,当判断NettyMessage为AddCredit类型时,就会调用PartitionRequestQueue这个Handler的addCredit方法继续处理。

	//Step1
	void addCredit(InputChannelID receiverId, int credit) throws Exception {
		if (fatalError) {
			return;
		}
		//获取InputChannel对应的ViewReader
		NetworkSequenceViewReader reader = allReaders.get(receiverId);
		if (reader != null) {
			//增加ViewReader的信任值
			reader.addCredit(credit);
			//将该ViewReader将入可用队列,之后会使用队列中的ViewReader读取ResultSubPartition中的Buffer数据
			enqueueAvailableReader(reader);
		} else {
			throw new IllegalStateException("No reader for receiverId = " + receiverId + " exists.");
		}
	}
	//Step2
	private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
		if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
			return;
		}
		// 将ViewReader注册到可用队列中
		boolean triggerWrite = availableReaders.isEmpty();
		registerAvailableReader(reader);
		//如果之前队列为空,则再次开始调用队列中的ViewReader读取Buffer数据
		if (triggerWrite) {
			writeAndFlushNextMessageIfPossible(ctx.channel());
		}
	}
	//Step3
	//循环调用可用队列中的ViewReader,读取Buffer数据
	private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
		if (fatalError || !channel.isWritable()) {
			return;
		}

		BufferAndAvailability next = null;
		try {
			while (true) {
				NetworkSequenceViewReader reader = pollAvailableReader();

				// 没有可用的ViewReader就返回,等待下次有数据写入ResultSubPartition或者有Credit更新时,再次调用
				if (reader == null) {
					return;
				}
				//通过ViewReader读取Buffer数据
				next = reader.getNextBuffer();
				if (next == null) {
					if (!reader.isReleased()) {
						continue;
					}

					Throwable cause = reader.getFailureCause();
					if (cause != null) {
						ErrorResponse msg = new ErrorResponse(
							new ProducerFailedException(cause),
							reader.getReceiverId());

						ctx.writeAndFlush(msg);
					}
				} else {
					// 如果该ResultSubPartition还有可读Buffer,注册ViewReader继续读取Buffer
					if (next.moreAvailable()) {
						registerAvailableReader(reader);
					}
					//创建BufferResponse请求
					BufferResponse msg = new BufferResponse(
						next.buffer(),
						reader.getSequenceNumber(),//获取Record的序列号
						reader.getReceiverId(),
						next.buffersInBacklog());//获取BackLog指标

					// 通过TCP Channel将BufferResponse消息发送给InputChannel
					channel.writeAndFlush(msg).addListener(writeListener);

					return;
				}
			}
		} catch (Throwable t) {
			if (next != null) {
				next.buffer().recycleBuffer();
			}

			throw new IOException(t.getMessage(), t);
		}
	}
总结

基于Credit的反压机制主要是通过上下游的BackLog&Credit指标控制的。

    BackLog表明上游积压的数据量,通过BackLog会影响到下游RemoteInputChannel中FloatingBuffer的数量,提升InputChannel的处理能力。Credit体现了下游的处理能力,当下游RemoteInputChannel的AvailableBuffer的数量变化时,会将增加的Buffer数量以Credit的形式发生给上游,表明下游有更多的Buffer接受数据。如果下游数据处理不及时,上游就会发生BackLog提升RemoteInputChannel中FloatingBuffer的数量。如果RemoteInputChannel无法申请更多的FloatingBuffer,则不会继续向上游发生Credit,此时上游的Handler就会把ViewReader从AvailableReader队列中移除,就不会再讲ResultSubPartition中的Buffer推送给下游。直到下游有足够Credit,才会再次触发ViewReader的读取 *** 作。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5701697.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存