TaskExecutor#submitTask开始,主要有4个工作:
- 首先将 TaskSlot 的状态修改为 Active,然后取消 Slot 的使用超时任务:Timeout
- 从 BlobService 下载 Task 执行需要的各种资源
- 构造 Task 对象 new Task
- 封装一个 Task信息对象 TaskInfo,(TaskInfo, JobInfo,JobMasterInfo)
- 初始化 ResultPartition 和 ResultSubPartition
- 初始化 InputGate 和 InputChannel
- 初始化执行线程
- 启动 Task 的执行 Task#doRun
doRun主要有13个动作:
Task.run(); // 内部总共 13 个动作 Task.doRun(); // 1、先更改 Task 的状态: CREATED ==> DEPLOYING transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING); // 2、准备 ExecutionConfig final ExecutionConfig executionConfig = serializedExecutionConfig.deserializevalue(userCodeClassLoader); // 3、注册输入和输出组件, 拉起 ResultPartition 和 InputGate setupPartitionsAndGates(consumableNotifyingPartitionWriters, inputGates); // 4、注册 ResultPartition 到 taskEventDispatcher for(ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) { taskEventDispatcher.registerPartition(partitionWriter.getPartitionId()); } // 5、从分布式缓存中,拷贝下来一些运行 Task 所需要的资源文件 DistributedCache.readFileInfoFromConfig(jobConfiguration); // 6、初始化环境对象 RuntimeEnvironment, 包装在 Task 执行过程中需要的各种组件 Environment env = new RuntimeEnvironment(jobId, vertexId, executionId, ....); // 7、初始化调用对象 ,通过反射实例化 StreamTask 实例(可能的两种情况: SourceStreamTask, OneInputStreamTask) AbstractInvokable invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // 8、为该 Task 保存 invokable 启动实例 this.invokable = invokable; // 9、先更改 Task 的状态: DEPLOYING ==> RUNNING t ransitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING); // 10、Task 切换进入 RUNNING 状态, 并告知 JobMaster taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // 11、真正把 Task 启动起来了(最最最重要) invokable.invoke(); // 12、StreamTask 需要正常结束,处理 buffer 中的数据 for(ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) { if(partitionWriter != null) { partitionWriter.finish(); } } // 13、先更改 Task 的状态: RUNNING ==> FINISHED transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED);1.2 数据流转
- JobMaster通过RPC往TaskExecutor提交task
- 一个task中在初始化的时候会初始化 AbstractInvokable,其子类如下:
- DataSinkTask
- DataSourceTask
- StreamTask
- OneInputStreamTask
- TwoInputStreamTask
- MultiInputStreamTask
- SourceStreamTask
- SourceOperatorStreamTask
- BoundedStreamTask
- BatchTask
- IterationSynchronizationSinkTask
- Task执行: Task#invokable.invoke()
- 执行SteamTask中注册好的 processInput方法
- SourceStreamTask#processInput 建立一个到数据源的连接,然后ctx.collect(record) 往后面传
- StreamTask#processInput 会从input拿数据,然后执行 operator.processElement(record)
- 执行SteamTask中注册好的 processInput方法
注:NettyClient和Task之间通过缓存inputChannelsWithData传输数据。
Task: 一个task可能包含一个operator chain
operator:一个operator可能包含userFunction,比如SourceStreamTask中会存在LegacySourceFunctionThread,该thread运行时会调用userFunction.run(ctx),userFunction中实现了从外部数据源读取数据的逻辑。
operator执行数据处理逻辑完成后,都是通过output.collect将数据输出。
output.collect常用的有两种:ChainingOutput 和 RecordWriterOutput
ChainingOutput: chain之间的output,collect的时候直接执行 pushToOperator
RecordWriterOutput: 不同Task之间使用netty传输数据,nettyClient收到上游数据后将数据放在inputChannelsWithData,当前task去inputChannelsWithData中拿数据,然后处理。
output包括ChainingOutput和RecordWriterOutput逻辑。
StreamTask#doRun StreamTask#invokable.invoke() StreamTask#executeInvoke() StreamTask#runMailboxLoop() MailboxProcessor#runMailboxLoop() MailboxProcessor#processMail MailboxProcessor#mailboxDefaultAction.runDefaultAction StreamTask#processInput SourceStreamTask#processInput SourceStreamTask#sourceThread.start(); StreamSource#userFunction.run(ctx) 内部执行for循环,阻塞在reader,如果收到数据就会往下游传 ctx.collect(record) StreamSourceContext#processAndCollect StreamSourceContext#output.collect(reuse.replace(element, lastRecordTime)) output有两种情况:ChainingOutput(同一个chain中的下一个operator) 和 RecordWriterOutput(下一个task) 注:reuse.replace 内存复用。流式处理,来一条算一条,事实上,在计算过程中只需要一个对象的内存就够了。 ChainingOutput#collect 以ChainingOutput为例 ChainingOutput#pushToOperator ChainingOutput#input.processElement(castRecord) intput是一个StreamOperator StreamMap#output.collect(element.replace(userFunction.map(element.getValue()))); 下游已StreamMap为例, element.getValue是获取数据, userFunction.map执行转换 element.replace 把element替换成最新的值 RecordWriterOutput#collect RecordWriterOutput#pushToOperator 每个Task在初始化的时候,都启动一个recordReader也启动一个recordWriter RecordWriterOutput#serializationDelegate.setInstance(record) RecordWriterOutput#recordWriter.emit(serializationDelegate) ChannelSelectorRecordWriter#emit(record, channelSelector.selectChannel(record)) 以此类为例,channel就是一个nettyClient和nettyServer的连接 RecordWriter#targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition) 序列化 RecordWriter#if flushAlways targetPartition.flush(targetSubpartition) 真正把数据写出到下一个task PipelinedSubpartition#flush PipelinedSubpartition#notifyDataAvailable=!isBlocked&& buffers.size() == 1&& buffers.peek().getBufferConsumer().isDataAvailable() 只要buffer中有数据,就要调用notifyDataAvailable PipelinedSubpartition#notifyDataAvailable() 就是发送数据 PartitionRequestQueue#ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader)); PartitionRequestQueue#userEventTriggered PartitionRequestQueue#enqueueAvailableReader((NetworkSequenceViewReader) msg) PartitionRequestQueue#channel.writeAndFlush(msg).addListener(writeListener); 此处真正完成,从NettyServer写一条数据到NettyClient。此时代码去NettyClient绑定的那个Handler: CreditbasedPartitionRequestClientHandler ⭐️此处会经历网络传输,到NettyClient端接收数据 CreditbasedPartitionRequestClientHandler#channelRead CreditbasedPartitionRequestClientHandler#decodeMsg(msg) CreditbasedPartitionRequestClientHandler#NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg; buffer就是正常数据,event就类似CheckpointBarrier CreditbasedPartitionRequestClientHandler#decodeBufferOrEvent(inputChannel, bufferOrEvent) RemoteInputChannel#onBuffer RemoteInputChannel#receivedBuffers.add(sequenceBuffer) RemoteInputChannel#notifyChannelNonEmpty() 通知Task去消费 SingleInputGate#queueChannel 接收到数据,变成buffer,加入到队列 receivedBuffers;对应到某个inputChannel变成可用的,所以加入到可用inputChannel队列 SingleInputGate#queueChannelUnsafe SingleInputGate#inputChannelsWithData.add(channel, priority, alreadyEnqueued) 如果某个channel接收倒数,也就意味着这个channel是可用,则加入到这个队列。⭐️OneInputStreamTask的processInput()方法会消费这个队列,新起一个堆栈来看 RemoteInputChannel#onSenderBacklog(backlog) 消费完成,给上游Task发送一个反馈
2.2 下游op接收-处理源码
以StreamOneInputProcessor和StreamTwoInputProcessor为例。
StreamTask#processInput StreamTask#inputProcessor.processInput() StreamOneInputProcessor#processInput() StreamOneInputProcessor#input.emitNext(output) AbstractStreamTaskNetworkInput#emitNext AbstractStreamTaskNetworkInput#checkpointedInputGate.pollNext() AbstractStreamTaskNetworkInput#if (currentRecordDeserializer != null) 如果有数据,就会处理数据 AbstractStreamTaskNetworkInput#processElement(deserializationDelegate.getInstance(), output); ⭐️真正的处理数据 AbstractStreamTaskNetworkInput#checkpointedInputGate.pollNext() 如果没有数据,拉取数据 CheckpointedInputGate#pollNext() SingleInputGate#pollNext() SingleInputGate#waitAndGetNextData(blocking) SingleInputGate#getChannel(blocking) SingleInputGate#inputChannelsWithData.poll() ⭐️ 之前netty接收到数据写到这个队列,这里是拿数据的地方 AbstractStreamTaskNetworkInput#processBuffer(bufferOrEvent.get()) 将数据加入到currentRecordDeserializer AbstractStreamTaskNetworkInput#processEvent(bufferOrEvent.get()) StreamTwoInputProcessor#processInput() StreamTwoInputProcessor#this.processor1.processInput() 如果readingInputIndex == 0执行processor1,否则执行processor2 StreamOneInputProcessor#input.emitNext(output) AbstractStreamTaskNetworkInput#processElement(deserializationDelegate.getInstance(), output) AbstractStreamTaskNetworkInput#output.emitRecord(recordOrMark.asRecord()) StreamTwoInputProcessorFactory#this.recordConsumer.accept(record) StreamTwoInputProcessorFactory#processRecord1 创建是注册好的回调函数 StreamTwoInputProcessorFactory#streamOperator.processElement1(record) AbstractStreamingJoinOperator#processElement1 AbstractStreamingJoinOperator#this.processLeft(this.leftSerializer.toSerializedRow((RowData)element.getValue(), this.requiresCopy)) ⭐️这个时候会对rowData进行反序列化 StreamingSemiAntiJoinOperator# processLeft(RowData input) 调用具体的JoinOperator执行处理 StreamTwoInputProcessor#this.processor2.processInput() 同上processor1.processInput()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)