本文紧接上文StreamTask数据流,上文我们讲解了每个executor中的task中的数据是如何进行读取转换写出的过程。本文我们将讲解executor之间的数据传输流程,既包括flink的shuffle实现,以及StreamTask中InputChannel和ResultSubPartition的读写数据细节。
ShuffleMaster & ShuffleEnvironment不同Task之间的数据交换设计到Task之间的Shuffle *** 作,主要是StreamTask中的InputGate组件和ResultPartition组件建立TCP连接,通过网络传输数据。所有Task在初始化的时候都会创建ShuffleEnvironment组件,通过ShuffleEnvironment组件统一创建InputGate和ResultPartition组件。ShuffleMaster组件统一管理和监控作业中所有的InputGate和ResultPartition组件。
ShuffleService服务ShuffleEnvironment和ShuffleMaster组件通过ShuffleServiceFactory创建,同时flink内部通过SPI机制实现了可插拔的ShuffleService服务。ShuffleServiceFactory的实现类通过SPI的方式加载到ClassLoader中,既通过ShuffleServiceLoader从配置文件中加载系统配置的ShuffleServiceFactory实现类。用户可以自定义实现ShuffleService服务,实现了Shuffle服务的可插拔。
public class TaskManagerServices { private static ShuffleEnvironment, ?> createShuffleEnvironment( TaskManagerServicesConfiguration taskManagerServicesConfiguration, TaskEventDispatcher taskEventDispatcher, MetricGroup taskManagerMetricGroup) throws FlinkException { final ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext( taskManagerServicesConfiguration.getConfiguration(), taskManagerServicesConfiguration.getResourceID(), taskManagerServicesConfiguration.getNetworkMemorySize(), taskManagerServicesConfiguration.isLocalCommunicationOnly(), taskManagerServicesConfiguration.getTaskManagerAddress(), taskEventDispatcher, taskManagerMetricGroup); //通过ShuffleServiceLoader加载配置文件对应的ShuffleServiceFactory实现类, //TaskManagerServices 通过ShuffleServiceFactory创建ShuffleEnvironment return ShuffleServiceLoader .loadShuffleServiceFactory(taskManagerServicesConfiguration.getConfiguration()) .createShuffleEnvironment(shuffleEnvironmentContext); } }
ShuffleServiceFactory在创建ShuffleEnvironment的时候会初始化ShuffleEnvironment需要的ConnectionManager(用于InputChannel创建和ResultPartition的客户端进行通信)、FileChannelManager(指定配置中的临时文件夹,用于离线作业存储临时文件)、NetworkBufferPool(分配和管理MemorySegment)、ResultPartitionFactory(管理ResultPartition,并创建ResultSubpartitionView,消费buffer数据)、SingleInputGateFactory(创建InputGate)
Tips:ShuffleEnvironment内部的ResultPartitionFactory和SingleInputGateFactory组件使用同一个NetworkBufferPool创建ResultPartition和InputGate,所以同一个TaskManager使用一个NetworkBufferPool。
public enum ShuffleServiceLoader { ; public static final ConfigOptionShuffleEnvironment创建ResultPartition和InputGateSHUFFLE_SERVICE_FACTORY_CLASS = ConfigOptions .key("shuffle-service-factory.class") .defaultValue("org.apache.flink.runtime.io.network.NettyShuffleServiceFactory") .withDescription("The full class name of the shuffle service factory implementation to be used by the cluster. " + "The default implementation uses Netty for network communication and local memory as well disk space " + "to store results on a TaskExecutor."); public static ShuffleServiceFactory, ?, ?> loadShuffleServiceFactory(Configuration configuration) throws FlinkException { String shuffleServiceClassName = configuration.getString(SHUFFLE_SERVICE_FACTORY_CLASS); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); return InstantiationUtil.instantiate( shuffleServiceClassName, ShuffleServiceFactory.class, classLoader); } }
在TaskManager接收到JobManager提交的Task作业描述信息后,会初始化Task对象,Task对象内部会根据描述信息通过ShuffleEnvironment创建ResultPartition和InputChate组件。
Task构造器部分逻辑
// 创建ResultPartition final ResultPartitionWriter[] resultPartitionWriters = shuffleEnvironment.createResultPartitionWriters( taskShuffleContext, resultPartitionDeploymentDescriptors).toArray(new ResultPartitionWriter[] {}); this.consumableNotifyingPartitionWriters = ConsumableNotifyingResultPartitionWriterDecorator.decorate( resultPartitionDeploymentDescriptors, resultPartitionWriters, this, jobId, resultPartitionConsumableNotifier); // 创建InputGate final InputGate[] gates = shuffleEnvironment.createInputGates( taskShuffleContext, this, inputGateDeploymentDescriptors).toArray(new InputGate[] {}); this.inputGates = new InputGate[gates.length]; int counter = 0; for (InputGate gate : gates) { inputGates[counter++] = new InputGateWithMetrics(gate, metrics.getIOMetricGroup().getNumBytesInCounter()); }
Task实例将InputGate和ResultPartition封装到RuntimeEnvironment环境信息对象中,传递给StreamTask。StreamTask用于穿件StreamNetWorkTaskInput和RecordWriter组件,从而完成Task数据的输入和输出。
ResultPartition & InputGateResultPartition包含多个ResultSubPartition实例和一个LocalBufferPool组件,每个ResultSubPartition实例都有一个Buffer队列存储Buffer数据,LocalBufferPool组件用于获取Buffer数据的内存存储空间。ResultPartitionManager用于监控和管理同一个TaskManager中的所有生产和消费分区,既ResultPartition和ResultPartitionView。ResultSubPartitionView用于消费ResultSubPartition缓存的Buffer数据,然后推送到网络中。
InputGate中包含多个InputChannel实例和一个LocalBufferPool组件,通过将InputGate封装为CheckpointInputGate完成CheckPoint的相关 *** 作。InputChannel实例通过ConnectManager建立与上游ResultManager的连接,读取网络中的数据;通过LocalBufferPool组件向NetworkBufferPool申请Buffer内存存储空间,通过Buffer缓存网络中读取的二进制数据。然后借助DataOut组件将数据推送给OperatorChain进行处理。
上面我们讲了ResultPartition & InputGate在Task初始化的时候,通过ShuffleEnvironment创建。然后在Task线程运行时,会调用setupPartitionsAndGates方法启动ResultPartition & InputGate。此时,会为每个InputGate和ResultPartition创建LocalBufferPool,同时会向ResultPartitionManager注册ResultPartition,并且每个InputChannel都会向上游对应的Task实例注册,既InputChannel通过ConnectManager创建和上游ResultPartition的连接。
public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionProducerStateProvider, CheckpointListener, BackPressureSampleableTask { private void doRun(){ //... setupPartitionsAndGates(consumableNotifyingPartitionWriters, inputGates); //... //启动StreamTask this.invokable = invokable; invokable.invoke(); } @VisibleForTesting public static void setupPartitionsAndGates( ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException, InterruptedException { //为每一个ResultPartition创建LocalBufferPool,并将ResultPartition注册到TM的ResultPartitionManager中 for (ResultPartitionWriter partition : producedPartitions) { partition.setup(); } // 为每个InputGate创建LocalBufferPool,同时InputChannel向ResultPartition注册 //通过ConnectManager获取上游ResultPartition的Client,然后向上游发送PartitionRequest请求 for (InputGate gate : inputGates) { gate.setup(); } } }
InputGate 的启动过程
public class SingleInputGate extends InputGate { @Override public void setup() throws IOException, InterruptedException { checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool."); // assign exclusive buffers to input channels directly and use the rest for floating buffers assignExclusiveSegments(); //创建LocalBufferPool BufferPool bufferPool = bufferPoolFactory.get(); setBufferPool(bufferPool); //注册 requestPartitions(); } @VisibleForTesting void requestPartitions() throws IOException, InterruptedException { synchronized (requestLock) { if (!requestedPartitionsFlag) { //...检查 *** 作 //InputChannel 向上游ResultPartition注册 for (InputChannel inputChannel : inputChannels.values()) { inputChannel.requestSubpartition(consumedSubpartitionIndex); } } requestedPartitionsFlag = true; } } }
public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener { public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { if (partitionRequestClient == null) { // 创建客户端 try { partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId); } catch (IOException e) { // IOExceptions indicate that we could not open a connection to the remote TaskExecutor throw new PartitionConnectionException(partitionId, e); } //发送PartitionRequest请求,上游ResultPartition的nettyServer收到后 //会创建对应ResultSubPartition的ResultSubPartitionView组件,读取数据 //(通过ResultSubPartitionView内部的NetworkSequenceViewReader监听器) partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0); } } }
至此,ResultPartition & InputGate启动完成,并相互建立了TCP连接
ResulSubtPartitionView读取ResulSubtPartition的Buffer数据上文我们将了Task中的数据通过RecordWriter序列化之后以BufferConsumer的类型写入ResultSubPartition的Buffer队列中等待消费。上游的NettyServer收到InputChannel的PartitionRequest请求之后,会为对应的ResultSubPartition创建ResulSubtPartitionView读取数据。同时会在ResulSubtPartitionView中注册NetworkSequenceViewReader监听器,通过监听器获取ResultSubPartition队列中Buffer的情况。
class PipelinedSubpartition extends ResultSubpartition { @Override public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException { final boolean notifyDataAvailable; synchronized (buffers) { //。。。 //availabilityListener是CreditbasedSequenceNumberingViewReader,当buffer队列中的buffer可用且有足够Credit时, //CreditbasedSequenceNumberingViewReader会回调PipelinedSubpartitionView //的getNextBuffer方法读取数据(既最终调用ResultSubPartition的pollBuffer()读取队列中缓存的Buffer数据) readView = new PipelinedSubpartitionView(this, availabilityListener); notifyDataAvailable = !buffers.isEmpty(); } if (notifyDataAvailable) { notifyDataAvailable(); } return readView; } }
class PipelinedSubpartitionView implements ResultSubpartitionView { private final PipelinedSubpartition parent; //通过Listencer回调该方法,读取ResultSubPartition中缓存的Buffer数据 @Nullable @Override public BufferAndBacklog getNextBuffer() { return parent.pollBuffer(); } }
class CreditbasedSequenceNumberingViewReader implements BufferAvailabilityListener, NetworkSequenceViewReader { private final PartitionRequestQueue requestQueue; @Override public BufferAndAvailability getNextBuffer() throws IOException, InterruptedException { BufferAndBacklog next = subpartitionView.getNextBuffer(); if (next != null) { sequenceNumber++; if (next.buffer().isBuffer() && --numCreditsAvailable < 0) { throw new IllegalStateException("no credit available"); } return new BufferAndAvailability( next.buffer(), isAvailable(next), next.buffersInBacklog()); } else { return null; } } //当ResultSubPartition有Buffer写入时,会通过该监听器回调PartitionRequestQueue //的writeAndFlushNextMessageIfPossible方法,循环读取可用的NetworkSequenceViewReader //避免了PartitionRequestQueue一直空轮询 @Override public void notifyDataAvailable() { requestQueue.notifyReaderNonEmpty(this); } }
以上是ResultSubPartitionView如何读取ResultSubPartition中的Buffer数据的过程,接下来我们看看Buffer数据被读取出来之后是如何写到网络中的。
ResultPartition会通过ConnectManager组件启动一个NettyServer,启动之后通过Netty的ChannelHandler的实现类PartitionRequestQueue回调NetworkSequenceViewReader的getNextBuffer方法,然后将Buffer数据转换为NettyMassage类型写到网络中。
PartitionRequestQueue 将所有的NetworkSequenceViewReader注册到allReaders中,availableReaders保存当前激活的NetworkSequenceViewReader。PartitionRequestQueue 会循环读取激活的NetworkSequenceViewReader。
同时当ResultSubPartition有Buffer加入时,会通过NetworkSequenceViewReader监听器,回调PartitionRequestQueue 循环读取可用的NetworkSequenceViewReader。(通过监听器回调的方式避免了PartitionRequestQueue 一直循环)
class PartitionRequestQueue extends ChannelInboundHandlerAdapter { //当下游的Client建立连接之后,获取到对方的channel,将数据通过Channel写到网络中 @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { writeAndFlushNextMessageIfPossible(ctx.channel()); } //下游channel建立之后,循环读取可用NetworkSequenceViewReader //当ResultSubPartition中加入Bufffer后,会触发监听器NetworkSequenceViewReader的notifyDataAvailable方法 //然后回调该方法,再循环读取可用的NetworkSequenceViewReader(设置监听器回调的方式避免了一直循环) private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException { if (fatalError || !channel.isWritable()) { return; } BufferAndAvailability next = null; try { while (true) { //循环读取可用的NetworkSequenceViewReader NetworkSequenceViewReader reader = pollAvailableReader(); // 没有可用NetworkSequenceViewReader则跳出循环,等待监听器激活下一次循环 if (reader == null) { return; } //通过NetworkSequenceViewReader读取ResultSubPartition的Buffer数据, //并将Credit减一 next = reader.getNextBuffer(); if (next == null) { if (!reader.isReleased()) { continue; } Throwable cause = reader.getFailureCause(); if (cause != null) { ErrorResponse msg = new ErrorResponse( new ProducerFailedException(cause), reader.getReceiverId()); ctx.writeAndFlush(msg); } } else { // 如果该NetworkSequenceViewReader还可用,则放入可用队列,等待下次读取 if (next.moreAvailable()) { registerAvailableReader(reader); } //将Buffer数据封装为NettyMessage类型 BufferResponse msg = new BufferResponse( next.buffer(), reader.getSequenceNumber(), reader.getReceiverId(), next.buffersInBacklog()); // 将数据通过Channel写出 channel.writeAndFlush(msg).addListener(writeListener); return; } } } catch (Throwable t) { if (next != null) { next.buffer().recycleBuffer(); } throw new IOException(t.getMessage(), t); } } }
NetworkSequenceViewReader激活的方式有两种:
1、向ResultSubPartition中添加新的BufferConsumer对象,会同步调用PipelinedSubpartitionView的notifyDataAvailable方法,最终会回调到PartitionRequestQueue 的userEventTriggered方法,将对应的NetworkSequenceViewReader加入到激活队列中。
class PartitionRequestQueue extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception { // The user event triggered event loop callback is used for thread-safe // hand over of reader queues and cancelled producers. if (msg instanceof NetworkSequenceViewReader) { enqueueAvailableReader((NetworkSequenceViewReader) msg); } else if (msg.getClass() == InputChannelID.class) { // Release partition view that get a cancel request. InputChannelID toCancel = (InputChannelID) msg; // remove reader from queue of available readers availableReaders.removeIf(reader -> reader.getReceiverId().equals(toCancel)); // remove reader from queue of all readers and release its resource final NetworkSequenceViewReader toRelease = allReaders.remove(toCancel); if (toRelease != null) { releaseViewReader(toRelease); } } else { ctx.fireUserEventTriggered(msg); } } }
2、基于Credit指标变化对NetworkSequenceViewReader激活,Credit表现了下游的处理能力,只有当上游有Credit的时候,才能激活对应ResultSubPartition的NetworkSequenceViewReader读取并发送数据。
class PartitionRequestQueue extends ChannelInboundHandlerAdapter { void addCredit(InputChannelID receiverId, int credit) throws Exception { if (fatalError) { return; } NetworkSequenceViewReader reader = allReaders.get(receiverId); if (reader != null) { reader.addCredit(credit); enqueueAvailableReader(reader); } else { throw new IllegalStateException("No reader for receiverId = " + receiverId + " exists."); } } }InputChannel读取Buffer数据
上文我们讲了StreamTask从InputChannel的Buffer队列中消费数据,这里将讲解InputChannel是如何从网络中读取数据的。
InputChannel中提供了OnBuffer方法,用于将Buffer数据缓存在InputChannel的队列中。OnBuffer方法被Netty的ChannelHandler的实现类CreditbasedPartitionRequestClientHandler回调,当Buffer从Netty的TCP网络进入到InputChanneler后,就会通过OnBuffer方法写入到InputChannel的队列中。
ChannelInboundHandlerAdapter 从网络中解析Buffer数据,并写入到InputChannel中
class CreditbasedPartitionRequestClientHandler extends ChannelInboundHandlerAdapter implements NetworkClientHandler { private void decodeMsg(Object msg) throws Throwable { final Class> msgClazz = msg.getClass(); // ---- 处理Buffer数据 -------------------------------------------------------- if (msgClazz == NettyMessage.BufferResponse.class) { NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg; RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId); if (inputChannel == null) { bufferOrEvent.releaseBuffer(); cancelRequestFor(bufferOrEvent.receiverId); return; } decodeBufferOrEvent(inputChannel, bufferOrEvent); } else if (msgClazz == NettyMessage.ErrorResponse.class) { // ---- Error --------------------------------------------------------- NettyMessage.ErrorResponse error = (NettyMessage.ErrorResponse) msg; SocketAddress remoteAddr = ctx.channel().remoteAddress(); //。。。 } } else { throw new IllegalStateException("Received unknown message from producer: " + msg.getClass()); } } private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable { try { ByteBuf nettyBuffer = bufferOrEvent.getNettyBuffer(); final int receivedSize = nettyBuffer.readableBytes(); if (bufferOrEvent.isBuffer()) { // ---- Buffer ------------------------------------------------ // Early return for empty buffers. Otherwise Netty's readBytes() throws an // IndexOutOfBoundsException. if (receivedSize == 0) { inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog); return; } //从InputChannel的LocalBufferPool中获取Buffer内存存储空间 Buffer buffer = inputChannel.requestBuffer(); if (buffer != null) { //将nettyBuffer中的数据写到Buffer中 nettyBuffer.readBytes(buffer.asByteBuf(), receivedSize); buffer.setCompressed(bufferOrEvent.isCompressed); //将Buffer写到InputChannel的缓冲队列中 inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog); } else if (inputChannel.isReleased()) { cancelRequestFor(bufferOrEvent.receiverId); } else { throw new IllegalStateException("No buffer available in credit-based input channel."); } } else { // ---- Event ------------------------------------------------- // TODO We can just keep the serialized data in the Netty buffer and release it later at the reader byte[] byteArray = new byte[receivedSize]; nettyBuffer.readBytes(byteArray); MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray); Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize); inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog); } } finally { bufferOrEvent.releaseBuffer(); } } }
RemoteInputChannel提供OnBuffer方法写入Buffer数据到缓存队列中
public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener { public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException { boolean recycleBuffer = true; try { final boolean wasEmpty; synchronized (receivedBuffers) { //。。。 //将Buffer将入到receivedBuffers队列中缓存 wasEmpty = receivedBuffers.isEmpty(); receivedBuffers.add(buffer); recycleBuffer = false; } //保证消息接收的顺序性 ++expectedSequenceNumber; if (wasEmpty) { notifyChannelNonEmpty(); } if (backlog >= 0) { onSenderBacklog(backlog); } } finally { //回收Buffer的内存空间 if (recycleBuffer) { buffer.recycleBuffer(); } } } }
以上就是InputGate和ResultPartition层面的数据交互,InputGate和ResultPartition组件的数据交互是较为上层的数据交互逻辑,Buffer和Event数据经过TCP网络进行传输的过程是通过ConnectManager组件实现的。ConnectManager组件可以创建NettyServer和NettyClient等底层网络组件。
ConnectManagerConnectManager属于ShuffleEnvironment的核心组件。TaskManager启动服务时,会同步启动ShuffleEnvironment。然后通过ShuffleEnvironment的ConnectManager组件启动TaskManager的Netty客户端和服务端。
ConnectManager的主要组件
ConnectManager的默认实现类是NettyConnectionManager
- NettyClient:封装了Netty的客户端,内部通过Netty的Bootstrap启动客户端服务,主要用于创建和发送PartitionRequest请求,同时负责处理服务端的Buffer数据
- NettyServer:封装了Netty的服务端,内部通过ServerBootstrap启动服务端服务,主要用于接收Netty客户端的PartitionRequest请求,同时将ResultPartition的Buffer数据写入到TCP通道中。
- NettyProtocol:主要定义了Netty客户端和服务端使用的ChannelHandlers。
- PartitionRequestClientFactory:通过NettyClient创造PartitionRequestClient(既通过NettyClient连接上游的NettyServer,所有PartitionRequestClient都使用一个NettyClient),PartitionRequestClient用于向NettyServer发生PartitionRequest请求,底层是通过连接的Channel与NettyServer交互
Netty客户端和服务器的启动和停止都通过ConnectManager管控
public class NettyConnectionManager implements ConnectionManager { private final NettyServer server; private final NettyClient client; //用于Netty读写时分配内存 private final NettyBufferPool bufferPool; private final PartitionRequestClientFactory partitionRequestClientFactory; private final NettyProtocol nettyProtocol; @Override public int start() throws IOException { client.init(nettyProtocol, bufferPool); return server.init(nettyProtocol, bufferPool); } @Override public void shutdown() { client.shutdown(); server.shutdown(); } }NettyProtocol
NettyProtocol定义了Netty网络通信中客户端和服务端对事件处理的逻辑顺序,既ChannelHandler集合。
1、ServerChannelHanlder
NettyServer的ChannelPipeline中的处理器主要包含NettyMessageEncoder、NettyMessageDecoder、PartitionRequestServerHandler和PartitionRequestQueue。Server端读入消息后,会通过NettyMessageDecoder进行解码,然后由PartitionRequestServerHandler进行解析出来,如果解析出NettyMessage是PartitionRequest类型,就会创建新的NetworkSequenceViewReader,并添加到PartitionRequestQueue的读取队列中。然后PartitionRequestQueue使用可读的NetworkSequenceViewReader从ResultPartition中读取Buffer数据,并下发数据。
其中服务端的主要逻辑在PartitionRequestServerHandler和PartitionRequestQueue中。
PartitionRequestServerHandler会对下游发送的PartitionRequest、TaskEventRequest、AddCredit等消息进行处理。
PartitionRequestQueue会存储所有可用的NetworkSequenceViewReader,通过NetworkSequenceViewReader循环读取ResultSubPartition中的Buffer数据,并发送到TCP通道中
(1)PartitionRequestServerHandler的核心实现
class PartitionRequestServerHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception { try { Class> msgClazz = msg.getClass(); // ---------------------------------------------------------------- // 处理PartitionRequest消息 // ---------------------------------------------------------------- if (msgClazz == PartitionRequest.class) { PartitionRequest request = (PartitionRequest) msg; LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request); try { //生成InputChannel对应的NetworkSequenceViewReader NetworkSequenceViewReader reader; reader = new CreditbasedSequenceNumberingViewReader( request.receiverId, request.credit, outboundQueue); reader.requestSubpartitionView( partitionProvider, request.partitionId, request.queueIndex); //将NetworkSequenceViewReader加入到PartitionRequestQueue的可读队列中, //等待PartitionRequestQueue使用队列中的ViewReader读取ResultSubPartition中的Buffer数据 outboundQueue.notifyReaderCreated(reader); } catch (PartitionNotFoundException notFound) { respondWithError(ctx, notFound, request.receiverId); } } // ---------------------------------------------------------------- // Task events // ---------------------------------------------------------------- else if (msgClazz == TaskEventRequest.class) { TaskEventRequest request = (TaskEventRequest) msg; if (!taskEventPublisher.publish(request.partitionId, request.event)) { respondWithError(ctx, new IllegalArgumentException("Task event receiver not found."), request.receiverId); } } else if (msgClazz == CancelPartitionRequest.class) { CancelPartitionRequest request = (CancelPartitionRequest) msg; outboundQueue.cancel(request.receiverId); } else if (msgClazz == CloseRequest.class) { outboundQueue.close(); } else if (msgClazz == AddCredit.class) { AddCredit request = (AddCredit) msg; //向对应的NetworkSequenceViewReader添加Credit,并将ViewReader加到PartitionRequestQueue的可读队列中 outboundQueue.addCredit(request.receiverId, request.credit); } else { LOG.warn("Received unexpected client request: {}", msg); } } catch (Throwable t) { respondWithError(ctx, t); } } }
(2)PartitionRequestQueue的核心实现
class PartitionRequestQueue extends ChannelInboundHandlerAdapter { // 当ResultSubPartition添加新的BufferConsumer对象时,会同步调用到此方法, //实现PartitionRequestQueue中激活当前NetworkSequenceViewReader void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) { //将ViewReader以UserEvent的形式传入ChannelPipeline,实现在PartitionRequestQueue中激活当前ViewReader ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader)); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception { // 如果msg是NetworkSequenceViewReader类型,则调用enqueueAvailableReader方法激活当前的ViewReader if (msg instanceof NetworkSequenceViewReader) { enqueueAvailableReader((NetworkSequenceViewReader) msg); } else if (msg.getClass() == InputChannelID.class) { //如果是InputChannelID类型,则释放Server端的InputChannel信息 InputChannelID toCancel = (InputChannelID) msg; // 从availableReaders中删除InputChannelID对应的ViewReader availableReaders.removeIf(reader -> reader.getReceiverId().equals(toCancel)); // 从allReaders中删除ViewReader,并释放资源 final NetworkSequenceViewReader toRelease = allReaders.remove(toCancel); if (toRelease != null) { releaseViewReader(toRelease); } } else { //其他事件不进行处理,继续下发 ctx.fireUserEventTriggered(msg); } } //下游发送AddCredit消息后,会回调到该方法,增加InputChannelID对应的ViewReader的Credit,并将其加入到PartitionRequestQueue的可用队列中 void addCredit(InputChannelID receiverId, int credit) throws Exception { if (fatalError) { return; } NetworkSequenceViewReader reader = allReaders.get(receiverId); if (reader != null) { reader.addCredit(credit); enqueueAvailableReader(reader); } else { throw new IllegalStateException("No reader for receiverId = " + receiverId + " exists."); } } //使用可用队列的ViewReader读取ResultSubPartition的Buffer,并通过Channel发送到TCP通道中 private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException { if (fatalError || !channel.isWritable()) { return; } BufferAndAvailability next = null; try { while (true) { //从可用队列中获取ViewReader NetworkSequenceViewReader reader = pollAvailableReader(); // 没有可用ViewReader时,不在空轮训 if (reader == null) { return; } //ViewReader读取ResultSubPartition的Buffer,并将Credit减一 next = reader.getNextBuffer(); if (next == null) { if (!reader.isReleased()) { continue; } Throwable cause = reader.getFailureCause(); if (cause != null) { ErrorResponse msg = new ErrorResponse( new ProducerFailedException(cause), reader.getReceiverId()); ctx.writeAndFlush(msg); } } else { // 当前ResultSubPartition还有可用Buffer,则将ViewReader加可用队列中 if (next.moreAvailable()) { registerAvailableReader(reader); } BufferResponse msg = new BufferResponse( next.buffer(), reader.getSequenceNumber(), reader.getReceiverId(), next.buffersInBacklog()); // 将数据输出到指定的TCP通道中 channel.writeAndFlush(msg).addListener(writeListener); return; } } } catch (Throwable t) { if (next != null) { next.buffer().recycleBuffer(); } throw new IOException(t.getMessage(), t); } } }
2、ClientChannelHanlder
ClientChannelHanlder的ChannelPipeline中的处理器主要包含NettyMessageDecoder、NettyMessageEncoder和CreditbasedPartitionRequestClientHandler。其中CreditbasedPartitionRequestClientHandler实现了客户端的主要逻辑,经过NettyMessageDecoder解码的NettyMessage会被CreditbasedPartitionRequestClientHandler读取(同时会向上游NettyServer发送AddCredit消息,增加上游NetworkSequenceViewReader的Credit),然后发生到对应的RemoteInputChannel的Buffer队列中,供下游算子消费。
CreditbasedPartitionRequestClientHandler的核心实现
包括向上游发送AddCredit消息,以及读取Buffer数据,写入到InputChannel的Buffer队列中
class CreditbasedPartitionRequestClientHandler extends ChannelInboundHandlerAdapter implements NetworkClientHandler { //将InputChannel以UserEvent的形式传入ChannelPipeline,实现在该Handler中向上游发送AddCredit消息 @Override public void notifyCreditAvailable(final RemoteInputChannel inputChannel) { ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(inputChannel)); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof RemoteInputChannel) { boolean triggerWrite = inputChannelsWithCredit.isEmpty(); inputChannelsWithCredit.add((RemoteInputChannel) msg); //如果当前InputChannel的Credit增加,则向上游发送AddCredit消息 if (triggerWrite) { writeAndFlushNextMessageIfPossible(ctx.channel()); } } else { //其他消息不处理,继续下发 ctx.fireUserEventTriggered(msg); } } //发送AddCredit消息 private void writeAndFlushNextMessageIfPossible(Channel channel) { if (channelError.get() != null || !channel.isWritable()) { return; } //所有可用InputChannel发送AddCredit消息 while (true) { RemoteInputChannel inputChannel = inputChannelsWithCredit.poll(); if (inputChannel == null) { return; } //发送AddCredit消息 if (!inputChannel.isReleased()) { AddCredit msg = new AddCredit( inputChannel.getPartitionId(), inputChannel.getAndResetUnannouncedCredit(), inputChannel.getInputChannelId()); channel.writeAndFlush(msg).addListener(writeListener); return; } } } //NettyClient读取网络并反序列化后,会调用该方法 private void decodeMsg(Object msg) throws Throwable { final Class> msgClazz = msg.getClass(); // 处理Buffer数据类型 if (msgClazz == NettyMessage.BufferResponse.class) { NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg; RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId); //如果InputChannel为空,则释放bufferOrEvent,并向上游发送CancelPartitionRequest消息 if (inputChannel == null) { bufferOrEvent.releaseBuffer(); cancelRequestFor(bufferOrEvent.receiverId); return; } //解析BufferOrEvent数据 decodeBufferOrEvent(inputChannel, bufferOrEvent); } else if (msgClazz == NettyMessage.ErrorResponse.class) { // 处理错误信息 NettyMessage.ErrorResponse error = (NettyMessage.ErrorResponse) msg; SocketAddress remoteAddr = ctx.channel().remoteAddress(); //。。。 } else { throw new IllegalStateException("Received unknown message from producer: " + msg.getClass()); } } private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable { try { ByteBuf nettyBuffer = bufferOrEvent.getNettyBuffer(); final int receivedSize = nettyBuffer.readableBytes(); if (bufferOrEvent.isBuffer()) { // ---- Buffer数据 ------------------------------------------------ // 通过receivedSize判断Buffer是否为空 if (receivedSize == 0) { inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog); return; } //若Buffer不为空,则通过InputChannel申请Buffer空间 Buffer buffer = inputChannel.requestBuffer(); if (buffer != null) { //将数据写入到Buffer中 nettyBuffer.readBytes(buffer.asByteBuf(), receivedSize); buffer.setCompressed(bufferOrEvent.isCompressed); //将Buffer写到InputChannel的队列中 inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog); } else if (inputChannel.isReleased()) { cancelRequestFor(bufferOrEvent.receiverId); } else { throw new IllegalStateException("No buffer available in credit-based input channel."); } } else { // ---- Event数据 ------------------------------------------------- // 堆内申请Buffer大小的byte[] byte[] byteArray = new byte[receivedSize]; nettyBuffer.readBytes(byteArray); MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray); Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize); //将Buffer写到InputChannel的队列中 inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog); } } finally { bufferOrEvent.releaseBuffer(); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)