Flink-Task启动源码

Flink-Task启动源码,第1张

Flink-Task启动源码 1. 总结 1.1 submitTask

TaskExecutor#submitTask开始,主要有4个工作:

  1. 首先将 TaskSlot 的状态修改为 Active,然后取消 Slot 的使用超时任务:Timeout
  2. 从 BlobService 下载 Task 执行需要的各种资源
  3. 构造 Task 对象 new Task
    • 封装一个 Task信息对象 TaskInfo,(TaskInfo, JobInfo,JobMasterInfo)
    • 初始化 ResultPartition 和 ResultSubPartition
    • 初始化 InputGate 和 InputChannel
    • 初始化执行线程
  4. 启动 Task 的执行 Task#doRun

doRun主要有13个动作:

Task.run();
    // 内部总共 13 个动作
    Task.doRun();
        // 1、先更改 Task 的状态: CREATED ==> DEPLOYING
        transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING); 
        // 2、准备 ExecutionConfig
        final ExecutionConfig executionConfig = serializedExecutionConfig.deserializevalue(userCodeClassLoader); 
        // 3、注册输入和输出组件, 拉起 ResultPartition 和 InputGate
        setupPartitionsAndGates(consumableNotifyingPartitionWriters, inputGates);
        // 4、注册 ResultPartition 到 taskEventDispatcher
        for(ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {
                    taskEventDispatcher.registerPartition(partitionWriter.getPartitionId());
                }
        // 5、从分布式缓存中,拷贝下来一些运行 Task 所需要的资源文件 
        DistributedCache.readFileInfoFromConfig(jobConfiguration);
        // 6、初始化环境对象 RuntimeEnvironment, 包装在 Task 执行过程中需要的各种组件 
        Environment env = new RuntimeEnvironment(jobId, vertexId, executionId, ....);
        // 7、初始化调用对象 ,通过反射实例化 StreamTask 实例(可能的两种情况: SourceStreamTask, OneInputStreamTask)
        AbstractInvokable invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);         
        // 8、为该 Task 保存 invokable 启动实例 
        this.invokable = invokable;
        // 9、先更改 Task 的状态: DEPLOYING ==> RUNNING t
        ransitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
        // 10、Task 切换进入 RUNNING 状态, 并告知 JobMaster
        taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
        // 11、真正把 Task 启动起来了(最最最重要) invokable.invoke();
        // 12、StreamTask 需要正常结束,处理 buffer 中的数据
        for(ResultPartitionWriter partitionWriter : consumableNotifyingPartitionWriters) {
            if(partitionWriter != null) {
                partitionWriter.finish();
        } }
        // 13、先更改 Task 的状态: RUNNING ==> FINISHED 
        transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED);
1.2 数据流转
  • JobMaster通过RPC往TaskExecutor提交task
  • 一个task中在初始化的时候会初始化 AbstractInvokable,其子类如下:
    • DataSinkTask
    • DataSourceTask
    • StreamTask
      • OneInputStreamTask
      • TwoInputStreamTask
      • MultiInputStreamTask
      • SourceStreamTask
      • SourceOperatorStreamTask
      • BoundedStreamTask
    • BatchTask
    • IterationSynchronizationSinkTask
  • Task执行: Task#invokable.invoke()
    • 执行SteamTask中注册好的 processInput方法
      • SourceStreamTask#processInput 建立一个到数据源的连接,然后ctx.collect(record) 往后面传
      • StreamTask#processInput 会从input拿数据,然后执行 operator.processElement(record)


注:NettyClient和Task之间通过缓存inputChannelsWithData传输数据。

1.3 Task的类封装结构

Task: 一个task可能包含一个operator chain
operator:一个operator可能包含userFunction,比如SourceStreamTask中会存在LegacySourceFunctionThread,该thread运行时会调用userFunction.run(ctx),userFunction中实现了从外部数据源读取数据的逻辑。

1.4 output.collect

operator执行数据处理逻辑完成后,都是通过output.collect将数据输出。
output.collect常用的有两种:ChainingOutput 和 RecordWriterOutput
ChainingOutput: chain之间的output,collect的时候直接执行 pushToOperator
RecordWriterOutput: 不同Task之间使用netty传输数据,nettyClient收到上游数据后将数据放在inputChannelsWithData,当前task去inputChannelsWithData中拿数据,然后处理。

2. StreamTask执行 2.1 source接收-输出源码

output包括ChainingOutput和RecordWriterOutput逻辑。

StreamTask#doRun
    StreamTask#invokable.invoke()
        StreamTask#executeInvoke()
            StreamTask#runMailboxLoop()
                MailboxProcessor#runMailboxLoop()
                    MailboxProcessor#processMail
                        MailboxProcessor#mailboxDefaultAction.runDefaultAction
                            StreamTask#processInput
                                SourceStreamTask#processInput
                                    SourceStreamTask#sourceThread.start();
                                        StreamSource#userFunction.run(ctx)   内部执行for循环,阻塞在reader,如果收到数据就会往下游传
                                            ctx.collect(record)
                                                StreamSourceContext#processAndCollect
                                                    StreamSourceContext#output.collect(reuse.replace(element, lastRecordTime))   output有两种情况:ChainingOutput(同一个chain中的下一个operator) 和 RecordWriterOutput(下一个task) 注:reuse.replace 内存复用。流式处理,来一条算一条,事实上,在计算过程中只需要一个对象的内存就够了。
                                                        ChainingOutput#collect   以ChainingOutput为例
                                                            ChainingOutput#pushToOperator
                                                                ChainingOutput#input.processElement(castRecord)   intput是一个StreamOperator  
                                                                    StreamMap#output.collect(element.replace(userFunction.map(element.getValue())));   下游已StreamMap为例,
                                                                        element.getValue是获取数据,
                                                                        userFunction.map执行转换
                                                                        element.replace  把element替换成最新的值
                                                        RecordWriterOutput#collect
                                                            RecordWriterOutput#pushToOperator   每个Task在初始化的时候,都启动一个recordReader也启动一个recordWriter
                                                                RecordWriterOutput#serializationDelegate.setInstance(record)   
                                                                RecordWriterOutput#recordWriter.emit(serializationDelegate)
                                                                    ChannelSelectorRecordWriter#emit(record, channelSelector.selectChannel(record))   以此类为例,channel就是一个nettyClient和nettyServer的连接
                                                                        RecordWriter#targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition)  序列化
                                                                        RecordWriter#if flushAlways targetPartition.flush(targetSubpartition)   真正把数据写出到下一个task
                                                                            PipelinedSubpartition#flush
                                                                                PipelinedSubpartition#notifyDataAvailable=!isBlocked&& buffers.size() == 1&& buffers.peek().getBufferConsumer().isDataAvailable()  只要buffer中有数据,就要调用notifyDataAvailable
                                                                                PipelinedSubpartition#notifyDataAvailable()  就是发送数据
                                                                                    PartitionRequestQueue#ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
                                                                                        PartitionRequestQueue#userEventTriggered
                                                                                            PartitionRequestQueue#enqueueAvailableReader((NetworkSequenceViewReader) msg)
                                                                                                PartitionRequestQueue#channel.writeAndFlush(msg).addListener(writeListener); 此处真正完成,从NettyServer写一条数据到NettyClient。此时代码去NettyClient绑定的那个Handler: CreditbasedPartitionRequestClientHandler ⭐️此处会经历网络传输,到NettyClient端接收数据
                                                                                                    CreditbasedPartitionRequestClientHandler#channelRead
                                                                                                        CreditbasedPartitionRequestClientHandler#decodeMsg(msg)
                                                                                                            CreditbasedPartitionRequestClientHandler#NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;  buffer就是正常数据,event就类似CheckpointBarrier
                                                                                                            CreditbasedPartitionRequestClientHandler#decodeBufferOrEvent(inputChannel, bufferOrEvent)
                                                                                                                RemoteInputChannel#onBuffer
                                                                                                                    RemoteInputChannel#receivedBuffers.add(sequenceBuffer)
                                                                                                                    RemoteInputChannel#notifyChannelNonEmpty()  通知Task去消费
                                                                                                                        SingleInputGate#queueChannel  接收到数据,变成buffer,加入到队列 receivedBuffers;对应到某个inputChannel变成可用的,所以加入到可用inputChannel队列
                                                                                                                            SingleInputGate#queueChannelUnsafe
                                                                                                                                SingleInputGate#inputChannelsWithData.add(channel, priority, alreadyEnqueued)  如果某个channel接收倒数,也就意味着这个channel是可用,则加入到这个队列。⭐️OneInputStreamTask的processInput()方法会消费这个队列,新起一个堆栈来看
                                                                                                                    RemoteInputChannel#onSenderBacklog(backlog)  消费完成,给上游Task发送一个反馈

2.2 下游op接收-处理源码
以StreamOneInputProcessor和StreamTwoInputProcessor为例。

StreamTask#processInput
    StreamTask#inputProcessor.processInput()
        StreamOneInputProcessor#processInput()
            StreamOneInputProcessor#input.emitNext(output)
                AbstractStreamTaskNetworkInput#emitNext
                    AbstractStreamTaskNetworkInput#checkpointedInputGate.pollNext()
                        AbstractStreamTaskNetworkInput#if (currentRecordDeserializer != null)   如果有数据,就会处理数据
                            AbstractStreamTaskNetworkInput#processElement(deserializationDelegate.getInstance(), output);   ⭐️真正的处理数据
                        AbstractStreamTaskNetworkInput#checkpointedInputGate.pollNext()  如果没有数据,拉取数据
                            CheckpointedInputGate#pollNext()
                                SingleInputGate#pollNext()
                                    SingleInputGate#waitAndGetNextData(blocking)
                                        SingleInputGate#getChannel(blocking)
                                            SingleInputGate#inputChannelsWithData.poll()  ⭐️ 之前netty接收到数据写到这个队列,这里是拿数据的地方
                        AbstractStreamTaskNetworkInput#processBuffer(bufferOrEvent.get())  将数据加入到currentRecordDeserializer
                        AbstractStreamTaskNetworkInput#processEvent(bufferOrEvent.get())
        StreamTwoInputProcessor#processInput()
            StreamTwoInputProcessor#this.processor1.processInput()   如果readingInputIndex == 0执行processor1,否则执行processor2
                StreamOneInputProcessor#input.emitNext(output)     
                    AbstractStreamTaskNetworkInput#processElement(deserializationDelegate.getInstance(), output)
                        AbstractStreamTaskNetworkInput#output.emitRecord(recordOrMark.asRecord())
                            StreamTwoInputProcessorFactory#this.recordConsumer.accept(record)
                                StreamTwoInputProcessorFactory#processRecord1  创建是注册好的回调函数
                                    StreamTwoInputProcessorFactory#streamOperator.processElement1(record)
                                        AbstractStreamingJoinOperator#processElement1
                                            AbstractStreamingJoinOperator#this.processLeft(this.leftSerializer.toSerializedRow((RowData)element.getValue(), this.requiresCopy))   ⭐️这个时候会对rowData进行反序列化
                                                StreamingSemiAntiJoinOperator# processLeft(RowData input)   调用具体的JoinOperator执行处理
            StreamTwoInputProcessor#this.processor2.processInput()    同上processor1.processInput() 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5679193.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存