TezChild get a task from containerReporter
ListenableFutureContainerReporter.callInternalgetTaskFuture = executor.submit(containerReporter); containerTask = getTaskFuture.get();
umbilical is umbilical to App Master.
@Override protected ContainerTask callInternal() throws Exception { containerTask = umbilical.getTask(containerContext);TezChild.run
After retrieve a ContainerTask object, create a TezTaskRunner2 object and call run method of it.
TezTaskRunner2 taskRunner = new TezTaskRunner2(defaultConf, childUGI, localDirs, containerTask.getTaskSpec(), appAttemptNumber, serviceConsumermetadata, serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim, sharedExecutor);TezTaskRunner2. constructor
create task.
this.task = new LogicalIOProcessorRuntimetask(taskSpec, appAttemptNumber, taskConf, localDirs, umbilicalAndErrorHandler, serviceConsumermetadata, serviceProviderEnvMap, startedInputsMap, objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim, sharedExecutor == null ? localExecutor : sharedExecutor);
TezTaskRunner2. run
create a TaskRunner2Callable object and execute it in a thread.
public TaskRunner2Result run() { taskRunnerCallable = new TaskRunner2Callable(task, ugi, umbilicalAndErrorHandler); future = executor.submit(taskRunnerCallable); future.get();TaskRunner2Callable
extends CallableWithNdc, and CallableWithNdc.call calls callInteranl, so finally callInternal() in TaskRunner2Callable is called.
public class TaskRunner2Callable extends CallableWithNdc
CallableWithNdc.call
@Override public final T call() throws Exception { NDC.inherit(ndcStack); try { return callInternal(); } finally { NDC.clear(); } }TaskRunner2Callable.callInternal
The type of task is LogicalIOProcessorRuntimetask
@Override task.initialize(); task.run(); task.close(); task.cleanup(); } }LogicalIOProcessorRuntimetask constructor
define processorDescriptor in constructor
this.processorDescriptor = taskSpec.getProcessorDescriptor();LogicalIOProcessorRuntimetask.initialize
The type of processor is AbstractLogicalIOProcessor. In case of Hive the class name is org.apache.hadoop.hive.ql.exec.tez.TezProcessor.
this.processor = createProcessor(processorDescriptor.getClassName(), processorContext);run
public void run() throws Exception { Preconditions.checkState(this.state.get() == State.INITED, "Can only run while in INITED state. Current: " + this.state); this.state.set(State.RUNNING); processor.run(runInputMap, runOutputMap); }close
public void close() throws Exception { processor.close(); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)