本文主要从源码角度对flink底层是如何将task分配到各个taskExecutor上执行,以及task执行的线程模型进行分析。会涉及到jobmaster如何将task分发到taskExecutor中,以及taskExecutor执行task的详细过程,以及task的mailBox线程模型。
JobMaster部署task TM启动Task线程jobmaster主要通过以及分配的slot,获取到slot对应的taskmanager网管,然后提交task。TaskExecutor创建并启动Task线程,TaskExecutor的线程池调度Task线程的run方法,然后执行主要计算逻辑(StreamTask)。
- jobmaster获取slot对应的taskManager网管,通过Rpc调用提交任务
public void deploy() throws JobException { assertRunningInJobMasterMainThread(); //获取到slot的描述信息对象 final LogicalSlot slot = assignedResource; // ..... //任务描述信息对象,用于将task部署到TaskManager上 final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory .fromExecutionVertex(vertex, attemptNumber) .createDeploymentDescriptor( slot.getAllocationId(), slot.getPhysicalSlotNumber(), taskRestore, producedPartitions.values()); // null taskRestore to let it be GC'ed taskRestore = null; //通过slot获取TaskManager对应网管 final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final ComponentMainThreadExecutor jobMasterMainThreadExecutor = vertex.getExecutionGraph().getJobMasterMainThreadExecutor(); // We run the submission in the future executor so that the serialization of large TDDs does not block // the main thread and sync back to the main thread once submission is completed. //rpc异步调用,提交task到对应的TaskExecutor CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor) .thenCompose(Function.identity()) .whenCompleteAsync( (ack, failure) -> { // only respond to the failure case //... 一系列回调处理rpc回调 } }, jobMasterMainThreadExecutor); } catch (Throwable t) { //... } }
- TaskManager根据提交的task信息,创建并运行Task线程
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { @Override public CompletableFuture submitTask( TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) { //创建Task线程 Task task = new Task(...); //... boolean taskAdded; try { taskAdded = taskSlotTable.addTask(task); } catch (SlotNotFoundException | SlotNotActiveException e) { throw new TaskSubmissionException("Could not submit task.", e); } if (taskAdded) { //如果task添加成功,则执行Task线程 task.startTaskThread(); //... } else { //... } catch (TaskSubmissionException e) { return FutureUtils.completedExceptionally(e); } } }
- TaskExecutor中的Task实现了Runnable接口,会被TaskExecutor中的线程池调度执行。Task线程的主要逻辑在doRun()方法中,主要包括两部分:1、初始化Task执行环境。2、启动AbstractInvokable类的计算逻辑(这里就会调用StreamTask的invoke方法)
public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionProducerStateProvider, CheckpointListener, BackPressureSampleableTask { //TaskExecutor的线程池会调度执行Task线程的run() @Override public void run() { try { doRun(); } finally { terminationFuture.complete(executionState); } } private void doRun() { // ---------------------------- // 初始化Task运行环境 // ---------------------------- //.. AbstractInvokable invokable = null; // 反射加载Task中的invokeable代码,生成AbstractInvokable对象 invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- this.invokable = invokable; //... // 执行AbstractInvokable对象(StreamTask)的invoke方法 invokable.invoke(); } }StreamTask执行流程
StreamTask继承实现了AbstractInvokable抽象类,StreamTask最终会运行在TaskExecutor的Task线程中,StreamTask定义的Task线程内部需要执行的逻辑。
-
StreamTask的主要结构
【headOperator】:StreamTask的头部算子,既operatorChain中的第一个算子(StreamTask通过DataOutput将数据传给头部算子)。
【operatorChain】:Task中执行的一系列算子。
【StreamInputProcess】:mailboxProcessor的默认 *** 作,用于从网络或数据源中读取处理数据
【stateBackend】:StreamTask使用的状态后端。
【mailboxProcessor】:所有数据读取(通过StreamInputProcessor对象的processInput())和事件 *** 作(checkpoint等)都通过该对象串行进行,将Task的执行过程变为单线程+阻塞队列的形式。使用Actor模型的邮箱机制取代了之前的多线程模型(处理checkpoint等事件时,不用加锁)。 -
StreamTask的执行流程
StreamTask通过mailboxProcessor读取数据和事件,并执行算子逻辑
public abstract class StreamTaskStreamTask线程模型——MailboxProcessor> extends AbstractInvokable implements AsyncExceptionHandler{ protected StreamTask( Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor actionExecutor, TaskMailbox mailbox) { super(environment); //... //StreamTask中的mailboxProcessor的默认执行逻辑是StreamTask的processInput() this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor); } @Override public final void invoke() throws Exception { try { beforeInvoke(); //... // 通过mailBox模式执行Task isRunning = true; runMailboxLoop(); //... afterInvoke(); } finally { cleanUpInvoke(); } } private void runMailboxLoop() throws Exception { try { //通过mailboxProcessor读取数据(执行算子逻辑)和mail(执行checkpoint等事件)。 mailboxProcessor.runMailboxLoop(); } catch (Exception e) { //... } } //mailBox执行的默认逻辑(读取数据) protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { //通过StreamInputProcessor读取数据,并将数据传递给headerOperator InputStatus status = inputProcessor.processInput(); //... } }
-
StreamTask线程模型的主要逻辑
【Mail】:封装了事件的执行逻辑(run方法,例如triggerCheckPoint、notifyCheckPointComplete等 *** 作),同时还有priority参数控制Mail执行的优先级,防止出现死锁。
【MailboxDefaultAction】:默认数据处理逻辑(StreamTask.processInput())
【MailboxExecutor】:提供了向Mailbox提交Mail的 *** 作
【TaskMailbox】:存储Mail,包括一个queue阻塞队列和一个batch非阻塞队列。通过调用createBatch()将queue中的Mail存储到batch中,然后通过tryTakeFromBatch()批量获取Mail。 -
MailboxProcessor的执行逻辑
MailboxProcessor的处理逻辑是优先处理Mailbox中的Mail事件,然后在通过StreamInputProcessor对象读取数据。
public class MailboxProcessor implements Closeable { public void runMailboxLoop() throws Exception { final TaskMailbox localMailbox = mailbox; //... //循环处理Mail和读取数据 while (processMail(localMailbox)) { // 执行StreamTask.processInput(),既调用StreamInputProcessor对象处理数据 mailboxDefaultAction.runDefaultAction(defaultActionContext); } } //会循环将Mailbox中的Mail处理完再返回 private boolean processMail(TaskMailbox mailbox) throws Exception { //将Mail从queue阻塞队列全部加入到batch非阻塞队列中 if (!mailbox.createBatch()) { return true; } OptionalStreamInputProcessor处理数据maybeMail; while (isMailboxLoopRunning() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) { maybeMail.get().run();//执行Mail的run方法 } while (isDefaultActionUnavailable() && isMailboxLoopRunning()) { mailbox.take(MIN_PRIORITY).run(); } return isMailboxLoopRunning(); } }
- StreamInputProcessor的结构
【StreamTaskInput】:从Task外部获取数据(从网络或者数据源中读取数据),同时还要更新WaterMark以及对齐barrier。
【DataOutput】:将StreamTaskInput读取的数据发送给当前Task的headOperator进行处理
【OperatorChain】:同一个Task中运行的一系列算子以及RecordWriter(对Record进行分区,并缓存等待下游拉取)
- StreamInputProcessor处理数据流程
将在下一节flink网络通信剖析(网络栈)中详细分析。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)