flink中的网络数据传输流程

flink中的网络数据传输流程,第1张

flink中的网络数据传输流程 背景

本文紧接上文StreamTask数据流,上文我们讲解了每个executor中的task中的数据是如何进行读取转换写出的过程。本文我们将讲解executor之间的数据传输流程,既包括flink的shuffle实现,以及StreamTask中InputChannel和ResultSubPartition的读写数据细节。

ShuffleMaster & ShuffleEnvironment

不同Task之间的数据交换设计到Task之间的Shuffle *** 作,主要是StreamTask中的InputGate组件和ResultPartition组件建立TCP连接,通过网络传输数据。所有Task在初始化的时候都会创建ShuffleEnvironment组件,通过ShuffleEnvironment组件统一创建InputGate和ResultPartition组件。ShuffleMaster组件统一管理和监控作业中所有的InputGate和ResultPartition组件。

ShuffleService服务

ShuffleEnvironment和ShuffleMaster组件通过ShuffleServiceFactory创建,同时flink内部通过SPI机制实现了可插拔的ShuffleService服务。ShuffleServiceFactory的实现类通过SPI的方式加载到ClassLoader中,既通过ShuffleServiceLoader从配置文件中加载系统配置的ShuffleServiceFactory实现类。用户可以自定义实现ShuffleService服务,实现了Shuffle服务的可插拔。

public class TaskManagerServices {
	private static ShuffleEnvironment createShuffleEnvironment(
			TaskManagerServicesConfiguration taskManagerServicesConfiguration,
			TaskEventDispatcher taskEventDispatcher,
			MetricGroup taskManagerMetricGroup) throws FlinkException {

		final ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext(
			taskManagerServicesConfiguration.getConfiguration(),
			taskManagerServicesConfiguration.getResourceID(),
			taskManagerServicesConfiguration.getNetworkMemorySize(),
			taskManagerServicesConfiguration.isLocalCommunicationOnly(),
			taskManagerServicesConfiguration.getTaskManagerAddress(),
			taskEventDispatcher,
			taskManagerMetricGroup);
		//通过ShuffleServiceLoader加载配置文件对应的ShuffleServiceFactory实现类,
		//TaskManagerServices 通过ShuffleServiceFactory创建ShuffleEnvironment
		return ShuffleServiceLoader
			.loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration())
			.createShuffleEnvironment(shuffleEnvironmentContext);
	}
}

ShuffleServiceFactory在创建ShuffleEnvironment的时候会初始化ShuffleEnvironment需要的ConnectionManager(用于InputChannel创建和ResultPartition的客户端进行通信)、FileChannelManager(指定配置中的临时文件夹,用于离线作业存储临时文件)、NetworkBufferPool(分配和管理MemorySegment)、ResultPartitionFactory(管理ResultPartition,并创建ResultSubpartitionView,消费buffer数据)、SingleInputGateFactory(创建InputGate)

Tips:ShuffleEnvironment内部的ResultPartitionFactory和SingleInputGateFactory组件使用同一个NetworkBufferPool创建ResultPartition和InputGate,所以同一个TaskManager使用一个NetworkBufferPool。

public enum ShuffleServiceLoader {
	;
	public static final ConfigOption SHUFFLE_SERVICE_FACTORY_CLASS = ConfigOptions
		.key("shuffle-service-factory.class")
		.defaultValue("org.apache.flink.runtime.io.network.NettyShuffleServiceFactory")
		.withDescription("The full class name of the shuffle service factory implementation to be used by the cluster. " +
			"The default implementation uses Netty for network communication and local memory as well disk space " +
			"to store results on a TaskExecutor.");

	public static ShuffleServiceFactory loadShuffleServiceFactory(Configuration configuration) throws FlinkException {
		String shuffleServiceClassName = configuration.getString(SHUFFLE_SERVICE_FACTORY_CLASS);
		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
		return InstantiationUtil.instantiate(
			shuffleServiceClassName,
			ShuffleServiceFactory.class,
			classLoader);
	}
}
ShuffleEnvironment创建ResultPartition和InputGate

在TaskManager接收到JobManager提交的Task作业描述信息后,会初始化Task对象,Task对象内部会根据描述信息通过ShuffleEnvironment创建ResultPartition和InputChate组件。

Task构造器部分逻辑

		// 创建ResultPartition
		final ResultPartitionWriter[] resultPartitionWriters = shuffleEnvironment.createResultPartitionWriters(
			taskShuffleContext,
			resultPartitionDeploymentDescriptors).toArray(new ResultPartitionWriter[] {});

		this.consumableNotifyingPartitionWriters = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
			resultPartitionDeploymentDescriptors,
			resultPartitionWriters,
			this,
			jobId,
			resultPartitionConsumableNotifier);

		// 创建InputGate
		final InputGate[] gates = shuffleEnvironment.createInputGates(
			taskShuffleContext,
			this,
			inputGateDeploymentDescriptors).toArray(new InputGate[] {});

		this.inputGates = new InputGate[gates.length];
		int counter = 0;
		for (InputGate gate : gates) {
			inputGates[counter++] = new InputGateWithMetrics(gate, metrics.getIOMetricGroup().getNumBytesInCounter());
		}

Task实例将InputGate和ResultPartition封装到RuntimeEnvironment环境信息对象中,传递给StreamTask。StreamTask用于穿件StreamNetWorkTaskInput和RecordWriter组件,从而完成Task数据的输入和输出。

ResultPartition & InputGate

ResultPartition包含多个ResultSubPartition实例和一个LocalBufferPool组件,每个ResultSubPartition实例都有一个Buffer队列存储Buffer数据,LocalBufferPool组件用于获取Buffer数据的内存存储空间。ResultPartitionManager用于监控和管理同一个TaskManager中的所有生产和消费分区,既ResultPartition和ResultPartitionView。ResultSubPartitionView用于消费ResultSubPartition缓存的Buffer数据,然后推送到网络中。
InputGate中包含多个InputChannel实例和一个LocalBufferPool组件,通过将InputGate封装为CheckpointInputGate完成CheckPoint的相关 *** 作。InputChannel实例通过ConnectManager建立与上游ResultManager的连接,读取网络中的数据;通过LocalBufferPool组件向NetworkBufferPool申请Buffer内存存储空间,通过Buffer缓存网络中读取的二进制数据。然后借助DataOut组件将数据推送给OperatorChain进行处理。

ResultPartition & InputGate的启动和初始化

上面我们讲了ResultPartition & InputGate在Task初始化的时候,通过ShuffleEnvironment创建。然后在Task线程运行时,会调用setupPartitionsAndGates方法启动ResultPartition & InputGate。此时,会为每个InputGate和ResultPartition创建LocalBufferPool,同时会向ResultPartitionManager注册ResultPartition,并且每个InputChannel都会向上游对应的Task实例注册,既InputChannel通过ConnectManager创建和上游ResultPartition的连接。

public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionProducerStateProvider, CheckpointListener, BackPressureSampleableTask {
	private void doRun(){
		//...
		setupPartitionsAndGates(consumableNotifyingPartitionWriters, inputGates);
		//...
		//启动StreamTask
		this.invokable = invokable;
		invokable.invoke();
	}
	@VisibleForTesting
	public static void setupPartitionsAndGates(
		ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException, InterruptedException {
		//为每一个ResultPartition创建LocalBufferPool,并将ResultPartition注册到TM的ResultPartitionManager中
		for (ResultPartitionWriter partition : producedPartitions) {
			partition.setup();
		}

		// 为每个InputGate创建LocalBufferPool,同时InputChannel向ResultPartition注册
		//通过ConnectManager获取上游ResultPartition的Client,然后向上游发送PartitionRequest请求
		for (InputGate gate : inputGates) {
			gate.setup();
		}
	}
}

InputGate 的启动过程

public class SingleInputGate extends InputGate {
	@Override
	public void setup() throws IOException, InterruptedException {
		checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool.");
		// assign exclusive buffers to input channels directly and use the rest for floating buffers
		assignExclusiveSegments();
		//创建LocalBufferPool
		BufferPool bufferPool = bufferPoolFactory.get();
		setBufferPool(bufferPool);
		//注册
		requestPartitions();
	}
	@VisibleForTesting
	void requestPartitions() throws IOException, InterruptedException {
		synchronized (requestLock) {
			if (!requestedPartitionsFlag) {
				//...检查 *** 作
				//InputChannel 向上游ResultPartition注册
				for (InputChannel inputChannel : inputChannels.values()) {
					inputChannel.requestSubpartition(consumedSubpartitionIndex);
				}
			}

			requestedPartitionsFlag = true;
		}
	}
}
public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener {
	public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
		if (partitionRequestClient == null) {
			// 创建客户端
			try {
				partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId);
			} catch (IOException e) {
				// IOExceptions indicate that we could not open a connection to the remote TaskExecutor
				throw new PartitionConnectionException(partitionId, e);
			}
			//发送PartitionRequest请求,上游ResultPartition的nettyServer收到后
			//会创建对应ResultSubPartition的ResultSubPartitionView组件,读取数据
			//(通过ResultSubPartitionView内部的NetworkSequenceViewReader监听器)
			partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
		}
	}
}

至此,ResultPartition & InputGate启动完成,并相互建立了TCP连接

ResulSubtPartitionView读取ResulSubtPartition的Buffer数据

上文我们将了Task中的数据通过RecordWriter序列化之后以BufferConsumer的类型写入ResultSubPartition的Buffer队列中等待消费。上游的NettyServer收到InputChannel的PartitionRequest请求之后,会为对应的ResultSubPartition创建ResulSubtPartitionView读取数据。同时会在ResulSubtPartitionView中注册NetworkSequenceViewReader监听器,通过监听器获取ResultSubPartition队列中Buffer的情况。

class PipelinedSubpartition extends ResultSubpartition {
	@Override
	public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
		final boolean notifyDataAvailable;
		synchronized (buffers) {
			//。。。
			//availabilityListener是CreditbasedSequenceNumberingViewReader,当buffer队列中的buffer可用且有足够Credit时,
			//CreditbasedSequenceNumberingViewReader会回调PipelinedSubpartitionView 
			//的getNextBuffer方法读取数据(既最终调用ResultSubPartition的pollBuffer()读取队列中缓存的Buffer数据)
			readView = new PipelinedSubpartitionView(this, availabilityListener);
			notifyDataAvailable = !buffers.isEmpty();
		}
		if (notifyDataAvailable) {
			notifyDataAvailable();
		}

		return readView;
	}
}
class PipelinedSubpartitionView implements ResultSubpartitionView {
	private final PipelinedSubpartition parent;

	//通过Listencer回调该方法,读取ResultSubPartition中缓存的Buffer数据
	@Nullable
	@Override
	public BufferAndBacklog getNextBuffer() {
		return parent.pollBuffer();
	}
}
class CreditbasedSequenceNumberingViewReader implements BufferAvailabilityListener, NetworkSequenceViewReader {
	private final PartitionRequestQueue requestQueue;
	@Override
	public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException {
		BufferAndBacklog next = subpartitionView.getNextBuffer();
		if (next != null) {
			sequenceNumber++;

			if (next.buffer().isBuffer() && --numCreditsAvailable < 0) {
				throw new IllegalStateException("no credit available");
			}

			return new BufferAndAvailability(
				next.buffer(), isAvailable(next), next.buffersInBacklog());
		} else {
			return null;
		}
	}
	//当ResultSubPartition有Buffer写入时,会通过该监听器回调PartitionRequestQueue
	//的writeAndFlushNextMessageIfPossible方法,循环读取可用的NetworkSequenceViewReader
	//避免了PartitionRequestQueue一直空轮询
	@Override
	public void notifyDataAvailable() {
		requestQueue.notifyReaderNonEmpty(this);
	}
}

以上是ResultSubPartitionView如何读取ResultSubPartition中的Buffer数据的过程,接下来我们看看Buffer数据被读取出来之后是如何写到网络中的。

ResultPartition会通过ConnectManager组件启动一个NettyServer,启动之后通过Netty的ChannelHandler的实现类PartitionRequestQueue回调NetworkSequenceViewReader的getNextBuffer方法,然后将Buffer数据转换为NettyMassage类型写到网络中。
PartitionRequestQueue 将所有的NetworkSequenceViewReader注册到allReaders中,availableReaders保存当前激活的NetworkSequenceViewReader。PartitionRequestQueue 会循环读取激活的NetworkSequenceViewReader。
同时当ResultSubPartition有Buffer加入时,会通过NetworkSequenceViewReader监听器,回调PartitionRequestQueue 循环读取可用的NetworkSequenceViewReader。(通过监听器回调的方式避免了PartitionRequestQueue 一直循环)

class PartitionRequestQueue extends ChannelInboundHandlerAdapter {

	//当下游的Client建立连接之后,获取到对方的channel,将数据通过Channel写到网络中
	@Override
	public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
		writeAndFlushNextMessageIfPossible(ctx.channel());
	}
	//下游channel建立之后,循环读取可用NetworkSequenceViewReader
	//当ResultSubPartition中加入Bufffer后,会触发监听器NetworkSequenceViewReader的notifyDataAvailable方法
	//然后回调该方法,再循环读取可用的NetworkSequenceViewReader(设置监听器回调的方式避免了一直循环)
	private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
		if (fatalError || !channel.isWritable()) {
			return;
		}

		BufferAndAvailability next = null;
		try {
			while (true) {
				//循环读取可用的NetworkSequenceViewReader
				NetworkSequenceViewReader reader = pollAvailableReader();

				// 没有可用NetworkSequenceViewReader则跳出循环,等待监听器激活下一次循环
				if (reader == null) {
					return;
				}
				//通过NetworkSequenceViewReader读取ResultSubPartition的Buffer数据,
				//并将Credit减一
				next = reader.getNextBuffer();
				if (next == null) {
					if (!reader.isReleased()) {
						continue;
					}

					Throwable cause = reader.getFailureCause();
					if (cause != null) {
						ErrorResponse msg = new ErrorResponse(
							new ProducerFailedException(cause),
							reader.getReceiverId());

						ctx.writeAndFlush(msg);
					}
				} else {
					// 如果该NetworkSequenceViewReader还可用,则放入可用队列,等待下次读取
					if (next.moreAvailable()) {
						registerAvailableReader(reader);
					}
					//将Buffer数据封装为NettyMessage类型
					BufferResponse msg = new BufferResponse(
						next.buffer(),
						reader.getSequenceNumber(),
						reader.getReceiverId(),
						next.buffersInBacklog());

					// 将数据通过Channel写出
					channel.writeAndFlush(msg).addListener(writeListener);

					return;
				}
			}
		} catch (Throwable t) {
			if (next != null) {
				next.buffer().recycleBuffer();
			}

			throw new IOException(t.getMessage(), t);
		}
	}
}

NetworkSequenceViewReader激活的方式有两种:
1、向ResultSubPartition中添加新的BufferConsumer对象,会同步调用PipelinedSubpartitionView的notifyDataAvailable方法,最终会回调到PartitionRequestQueue 的userEventTriggered方法,将对应的NetworkSequenceViewReader加入到激活队列中。

class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
		// The user event triggered event loop callback is used for thread-safe
		// hand over of reader queues and cancelled producers.

		if (msg instanceof NetworkSequenceViewReader) {
			enqueueAvailableReader((NetworkSequenceViewReader) msg);
		} else if (msg.getClass() == InputChannelID.class) {
			// Release partition view that get a cancel request.
			InputChannelID toCancel = (InputChannelID) msg;

			// remove reader from queue of available readers
			availableReaders.removeIf(reader -> reader.getReceiverId().equals(toCancel));

			// remove reader from queue of all readers and release its resource
			final NetworkSequenceViewReader toRelease = allReaders.remove(toCancel);
			if (toRelease != null) {
				releaseViewReader(toRelease);
			}
		} else {
			ctx.fireUserEventTriggered(msg);
		}
	}
}

2、基于Credit指标变化对NetworkSequenceViewReader激活,Credit表现了下游的处理能力,只有当上游有Credit的时候,才能激活对应ResultSubPartition的NetworkSequenceViewReader读取并发送数据。

class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
	void addCredit(InputChannelID receiverId, int credit) throws Exception {
		if (fatalError) {
			return;
		}

		NetworkSequenceViewReader reader = allReaders.get(receiverId);
		if (reader != null) {
			reader.addCredit(credit);

			enqueueAvailableReader(reader);
		} else {
			throw new IllegalStateException("No reader for receiverId = " + receiverId + " exists.");
		}
	}
}
InputChannel读取Buffer数据

上文我们讲了StreamTask从InputChannel的Buffer队列中消费数据,这里将讲解InputChannel是如何从网络中读取数据的。
InputChannel中提供了OnBuffer方法,用于将Buffer数据缓存在InputChannel的队列中。OnBuffer方法被Netty的ChannelHandler的实现类CreditbasedPartitionRequestClientHandler回调,当Buffer从Netty的TCP网络进入到InputChanneler后,就会通过OnBuffer方法写入到InputChannel的队列中。

ChannelInboundHandlerAdapter 从网络中解析Buffer数据,并写入到InputChannel中

class CreditbasedPartitionRequestClientHandler extends ChannelInboundHandlerAdapter implements NetworkClientHandler {
	private void decodeMsg(Object msg) throws Throwable {
		final Class msgClazz = msg.getClass();

		// ---- 处理Buffer数据 --------------------------------------------------------
		if (msgClazz == NettyMessage.BufferResponse.class) {
			NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;

			RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
			if (inputChannel == null) {
				bufferOrEvent.releaseBuffer();

				cancelRequestFor(bufferOrEvent.receiverId);

				return;
			}

			decodeBufferOrEvent(inputChannel, bufferOrEvent);

		} else if (msgClazz == NettyMessage.ErrorResponse.class) {
			// ---- Error ---------------------------------------------------------
			NettyMessage.ErrorResponse error = (NettyMessage.ErrorResponse) msg;
			SocketAddress remoteAddr = ctx.channel().remoteAddress();
			//。。。
			}
		} else {
			throw new IllegalStateException("Received unknown message from producer: " + msg.getClass());
		}
	}

	private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable {
		try {
			ByteBuf nettyBuffer = bufferOrEvent.getNettyBuffer();
			final int receivedSize = nettyBuffer.readableBytes();
			if (bufferOrEvent.isBuffer()) {
				// ---- Buffer ------------------------------------------------

				// Early return for empty buffers. Otherwise Netty's readBytes() throws an
				// IndexOutOfBoundsException.
				if (receivedSize == 0) {
					inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
					return;
				}
				//从InputChannel的LocalBufferPool中获取Buffer内存存储空间
				Buffer buffer = inputChannel.requestBuffer();
				if (buffer != null) {
					//将nettyBuffer中的数据写到Buffer中
					nettyBuffer.readBytes(buffer.asByteBuf(), receivedSize);
					buffer.setCompressed(bufferOrEvent.isCompressed);
					//将Buffer写到InputChannel的缓冲队列中
					inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
				} else if (inputChannel.isReleased()) {
					cancelRequestFor(bufferOrEvent.receiverId);
				} else {
					throw new IllegalStateException("No buffer available in credit-based input channel.");
				}
			} else {
				// ---- Event -------------------------------------------------
				// TODO We can just keep the serialized data in the Netty buffer and release it later at the reader
				byte[] byteArray = new byte[receivedSize];
				nettyBuffer.readBytes(byteArray);

				MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);

				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
			}
		} finally {
			bufferOrEvent.releaseBuffer();
		}
	}
}

RemoteInputChannel提供OnBuffer方法写入Buffer数据到缓存队列中

public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener {
	public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
		boolean recycleBuffer = true;

		try {

			final boolean wasEmpty;
			synchronized (receivedBuffers) {
				//。。。
				//将Buffer将入到receivedBuffers队列中缓存
				wasEmpty = receivedBuffers.isEmpty();
				receivedBuffers.add(buffer);
				recycleBuffer = false;
			}
			//保证消息接收的顺序性
			++expectedSequenceNumber;

			if (wasEmpty) {
				notifyChannelNonEmpty();
			}

			if (backlog >= 0) {
				onSenderBacklog(backlog);
			}
		} finally {
			//回收Buffer的内存空间
			if (recycleBuffer) {
				buffer.recycleBuffer();
			}
		}
	}
}

以上就是InputGate和ResultPartition层面的数据交互,InputGate和ResultPartition组件的数据交互是较为上层的数据交互逻辑,Buffer和Event数据经过TCP网络进行传输的过程是通过ConnectManager组件实现的。ConnectManager组件可以创建NettyServer和NettyClient等底层网络组件。

ConnectManager

ConnectManager属于ShuffleEnvironment的核心组件。TaskManager启动服务时,会同步启动ShuffleEnvironment。然后通过ShuffleEnvironment的ConnectManager组件启动TaskManager的Netty客户端和服务端。

ConnectManager的主要组件
ConnectManager的默认实现类是NettyConnectionManager

  • NettyClient:封装了Netty的客户端,内部通过Netty的Bootstrap启动客户端服务,主要用于创建和发送PartitionRequest请求,同时负责处理服务端的Buffer数据
  • NettyServer:封装了Netty的服务端,内部通过ServerBootstrap启动服务端服务,主要用于接收Netty客户端的PartitionRequest请求,同时将ResultPartition的Buffer数据写入到TCP通道中。
  • NettyProtocol:主要定义了Netty客户端和服务端使用的ChannelHandlers。
  • PartitionRequestClientFactory:通过NettyClient创造PartitionRequestClient(既通过NettyClient连接上游的NettyServer,所有PartitionRequestClient都使用一个NettyClient),PartitionRequestClient用于向NettyServer发生PartitionRequest请求,底层是通过连接的Channel与NettyServer交互

Netty客户端和服务器的启动和停止都通过ConnectManager管控

public class NettyConnectionManager implements ConnectionManager {

	private final NettyServer server;

	private final NettyClient client;
	//用于Netty读写时分配内存
	private final NettyBufferPool bufferPool;

	private final PartitionRequestClientFactory partitionRequestClientFactory;

	private final NettyProtocol nettyProtocol;
	
	@Override
	public int start() throws IOException {
		client.init(nettyProtocol, bufferPool);

		return server.init(nettyProtocol, bufferPool);
	}
	
	@Override
	public void shutdown() {
		client.shutdown();
		server.shutdown();
	}
}
NettyProtocol

NettyProtocol定义了Netty网络通信中客户端和服务端对事件处理的逻辑顺序,既ChannelHandler集合。
1、ServerChannelHanlder
NettyServer的ChannelPipeline中的处理器主要包含NettyMessageEncoder、NettyMessageDecoder、PartitionRequestServerHandler和PartitionRequestQueue。Server端读入消息后,会通过NettyMessageDecoder进行解码,然后由PartitionRequestServerHandler进行解析出来,如果解析出NettyMessage是PartitionRequest类型,就会创建新的NetworkSequenceViewReader,并添加到PartitionRequestQueue的读取队列中。然后PartitionRequestQueue使用可读的NetworkSequenceViewReader从ResultPartition中读取Buffer数据,并下发数据。
其中服务端的主要逻辑在PartitionRequestServerHandler和PartitionRequestQueue中。

PartitionRequestServerHandler会对下游发送的PartitionRequest、TaskEventRequest、AddCredit等消息进行处理。
PartitionRequestQueue会存储所有可用的NetworkSequenceViewReader,通过NetworkSequenceViewReader循环读取ResultSubPartition中的Buffer数据,并发送到TCP通道中
(1)PartitionRequestServerHandler的核心实现

class PartitionRequestServerHandler extends SimpleChannelInboundHandler {
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
		try {
			Class msgClazz = msg.getClass();

			// ----------------------------------------------------------------
			// 处理PartitionRequest消息
			// ----------------------------------------------------------------
			if (msgClazz == PartitionRequest.class) {
				PartitionRequest request = (PartitionRequest) msg;

				LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request);

				try {
					//生成InputChannel对应的NetworkSequenceViewReader 
					NetworkSequenceViewReader reader;
					reader = new CreditbasedSequenceNumberingViewReader(
						request.receiverId,
						request.credit,
						outboundQueue);

					reader.requestSubpartitionView(
						partitionProvider,
						request.partitionId,
						request.queueIndex);
					//将NetworkSequenceViewReader加入到PartitionRequestQueue的可读队列中,
					//等待PartitionRequestQueue使用队列中的ViewReader读取ResultSubPartition中的Buffer数据
					outboundQueue.notifyReaderCreated(reader);
				} catch (PartitionNotFoundException notFound) {
					respondWithError(ctx, notFound, request.receiverId);
				}
			}
			// ----------------------------------------------------------------
			// Task events
			// ----------------------------------------------------------------
			else if (msgClazz == TaskEventRequest.class) {
				TaskEventRequest request = (TaskEventRequest) msg;

				if (!taskEventPublisher.publish(request.partitionId, request.event)) {
					respondWithError(ctx, new IllegalArgumentException("Task event receiver not found."), request.receiverId);
				}
			} else if (msgClazz == CancelPartitionRequest.class) {
				CancelPartitionRequest request = (CancelPartitionRequest) msg;

				outboundQueue.cancel(request.receiverId);
			} else if (msgClazz == CloseRequest.class) {
				outboundQueue.close();
			} else if (msgClazz == AddCredit.class) {
				AddCredit request = (AddCredit) msg;
				//向对应的NetworkSequenceViewReader添加Credit,并将ViewReader加到PartitionRequestQueue的可读队列中
				outboundQueue.addCredit(request.receiverId, request.credit);
			} else {
				LOG.warn("Received unexpected client request: {}", msg);
			}
		} catch (Throwable t) {
			respondWithError(ctx, t);
		}
	}
}

(2)PartitionRequestQueue的核心实现

class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
	// 当ResultSubPartition添加新的BufferConsumer对象时,会同步调用到此方法,
	//实现PartitionRequestQueue中激活当前NetworkSequenceViewReader
	void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
		//将ViewReader以UserEvent的形式传入ChannelPipeline,实现在PartitionRequestQueue中激活当前ViewReader
		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
	}
	
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
		// 如果msg是NetworkSequenceViewReader类型,则调用enqueueAvailableReader方法激活当前的ViewReader
		if (msg instanceof NetworkSequenceViewReader) {
			enqueueAvailableReader((NetworkSequenceViewReader) msg);
		} else if (msg.getClass() == InputChannelID.class) {
			//如果是InputChannelID类型,则释放Server端的InputChannel信息
			InputChannelID toCancel = (InputChannelID) msg;

			// 从availableReaders中删除InputChannelID对应的ViewReader
			availableReaders.removeIf(reader -> reader.getReceiverId().equals(toCancel));

			// 从allReaders中删除ViewReader,并释放资源
			final NetworkSequenceViewReader toRelease = allReaders.remove(toCancel);
			if (toRelease != null) {
				releaseViewReader(toRelease);
			}
		} else {
			//其他事件不进行处理,继续下发
			ctx.fireUserEventTriggered(msg);
		}
	}
	//下游发送AddCredit消息后,会回调到该方法,增加InputChannelID对应的ViewReader的Credit,并将其加入到PartitionRequestQueue的可用队列中
	void addCredit(InputChannelID receiverId, int credit) throws Exception {
		if (fatalError) {
			return;
		}

		NetworkSequenceViewReader reader = allReaders.get(receiverId);
		if (reader != null) {
			reader.addCredit(credit);

			enqueueAvailableReader(reader);
		} else {
			throw new IllegalStateException("No reader for receiverId = " + receiverId + " exists.");
		}
	}

	//使用可用队列的ViewReader读取ResultSubPartition的Buffer,并通过Channel发送到TCP通道中
	private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
		if (fatalError || !channel.isWritable()) {
			return;
		}
		
		BufferAndAvailability next = null;
		try {
			while (true) {
				//从可用队列中获取ViewReader
				NetworkSequenceViewReader reader = pollAvailableReader();

				// 没有可用ViewReader时,不在空轮训
				if (reader == null) {
					return;
				}
				//ViewReader读取ResultSubPartition的Buffer,并将Credit减一
				next = reader.getNextBuffer();
				if (next == null) {
					if (!reader.isReleased()) {
						continue;
					}

					Throwable cause = reader.getFailureCause();
					if (cause != null) {
						ErrorResponse msg = new ErrorResponse(
							new ProducerFailedException(cause),
							reader.getReceiverId());

						ctx.writeAndFlush(msg);
					}
				} else {
					// 当前ResultSubPartition还有可用Buffer,则将ViewReader加可用队列中
					if (next.moreAvailable()) {
						registerAvailableReader(reader);
					}

					BufferResponse msg = new BufferResponse(
						next.buffer(),
						reader.getSequenceNumber(),
						reader.getReceiverId(),
						next.buffersInBacklog());

					// 将数据输出到指定的TCP通道中
					channel.writeAndFlush(msg).addListener(writeListener);

					return;
				}
			}
		} catch (Throwable t) {
			if (next != null) {
				next.buffer().recycleBuffer();
			}

			throw new IOException(t.getMessage(), t);
		}
	}
}

2、ClientChannelHanlder
ClientChannelHanlder的ChannelPipeline中的处理器主要包含NettyMessageDecoder、NettyMessageEncoder和CreditbasedPartitionRequestClientHandler。其中CreditbasedPartitionRequestClientHandler实现了客户端的主要逻辑,经过NettyMessageDecoder解码的NettyMessage会被CreditbasedPartitionRequestClientHandler读取(同时会向上游NettyServer发送AddCredit消息,增加上游NetworkSequenceViewReader的Credit),然后发生到对应的RemoteInputChannel的Buffer队列中,供下游算子消费。
CreditbasedPartitionRequestClientHandler的核心实现
包括向上游发送AddCredit消息,以及读取Buffer数据,写入到InputChannel的Buffer队列中

class CreditbasedPartitionRequestClientHandler extends ChannelInboundHandlerAdapter implements NetworkClientHandler {

	//将InputChannel以UserEvent的形式传入ChannelPipeline,实现在该Handler中向上游发送AddCredit消息
	@Override
	public void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel));
	}

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
		if (msg instanceof RemoteInputChannel) {
			boolean triggerWrite = inputChannelsWithCredit.isEmpty();

			inputChannelsWithCredit.add((RemoteInputChannel) msg);
			//如果当前InputChannel的Credit增加,则向上游发送AddCredit消息
			if (triggerWrite) {
				writeAndFlushNextMessageIfPossible(ctx.channel());
			}
		} else {
			//其他消息不处理,继续下发
			ctx.fireUserEventTriggered(msg);
		}
	}
	//发送AddCredit消息
	private void writeAndFlushNextMessageIfPossible(Channel channel) {
		if (channelError.get() != null || !channel.isWritable()) {
			return;
		}
		//所有可用InputChannel发送AddCredit消息
		while (true) {
			RemoteInputChannel inputChannel = inputChannelsWithCredit.poll();
			if (inputChannel == null) {
				return;
			}

			//发送AddCredit消息
			if (!inputChannel.isReleased()) {
				AddCredit msg = new AddCredit(
					inputChannel.getPartitionId(),
					inputChannel.getAndResetUnannouncedCredit(),
					inputChannel.getInputChannelId());
				channel.writeAndFlush(msg).addListener(writeListener);
				return;
			}
		}
	}
	
	//NettyClient读取网络并反序列化后,会调用该方法
	private void decodeMsg(Object msg) throws Throwable {
		final Class msgClazz = msg.getClass();

		// 处理Buffer数据类型
		if (msgClazz == NettyMessage.BufferResponse.class) {
			NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;

			RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
			//如果InputChannel为空,则释放bufferOrEvent,并向上游发送CancelPartitionRequest消息
			if (inputChannel == null) {
				bufferOrEvent.releaseBuffer();
				cancelRequestFor(bufferOrEvent.receiverId);
				return;
			}
			//解析BufferOrEvent数据
			decodeBufferOrEvent(inputChannel, bufferOrEvent);

		} else if (msgClazz == NettyMessage.ErrorResponse.class) {
			// 处理错误信息
			NettyMessage.ErrorResponse error = (NettyMessage.ErrorResponse) msg;

			SocketAddress remoteAddr = ctx.channel().remoteAddress();
			//。。。
		} else {
			throw new IllegalStateException("Received unknown message from producer: " + msg.getClass());
		}
	}

	private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable {
		try {
			ByteBuf nettyBuffer = bufferOrEvent.getNettyBuffer();
			final int receivedSize = nettyBuffer.readableBytes();
			if (bufferOrEvent.isBuffer()) {
				// ---- Buffer数据 ------------------------------------------------

				// 通过receivedSize判断Buffer是否为空
				if (receivedSize == 0) {
					inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
					return;
				}
				//若Buffer不为空,则通过InputChannel申请Buffer空间
				Buffer buffer = inputChannel.requestBuffer();
				if (buffer != null) {
					//将数据写入到Buffer中
					nettyBuffer.readBytes(buffer.asByteBuf(), receivedSize);
					buffer.setCompressed(bufferOrEvent.isCompressed);
					//将Buffer写到InputChannel的队列中
					inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
				} else if (inputChannel.isReleased()) {
					cancelRequestFor(bufferOrEvent.receiverId);
				} else {
					throw new IllegalStateException("No buffer available in credit-based input channel.");
				}
			} else {
				// ---- Event数据 -------------------------------------------------
				// 堆内申请Buffer大小的byte[]
				byte[] byteArray = new byte[receivedSize];
				nettyBuffer.readBytes(byteArray);

				MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);
				//将Buffer写到InputChannel的队列中
				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
			}
		} finally {
			bufferOrEvent.releaseBuffer();
		}
	}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5681414.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存