Yarn原始碼分析之MapReduce作業中任務Task排程整體流程(一)
v2版本的MapReduce作業中,作業JOB_SETUP_COMPLETED事件的發生,即作業SETUP階段完成事件,會觸發作業由SETUP狀態轉換到RUNNING狀態,而作業狀態轉換中涉及作業資訊的處理,是由SetupCompletedTransition來完成的,它主要做了四件事:
1、通過設定作業Job的成員變數setupProgress為1,標記作業setup已完成;
2、排程作業Job的Map Task;
3、排程作業的JobReduce Task;
4、如果沒有task了,則生成JOB_COMPLETED事件並交由作業的事件處理器eventHandler進行處理。
本文,我們就將研究作業Job中Task是如何被排程的。
首先看下SetupCompletedTransition中transition()方法關於作業Job中Task排程的程式碼,如下:
// 排程作業Job的Map Task
job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
// 排程作業Job的Reduce Task
job.scheduleTasks(job.reduceTasks, true);
它實際上是通過Job,也就是JobImpl的scheduleTasks()完成的,這個方法需要兩個引數,第一個是作業Job待排程任務的任務ID集合taskIDs,第二個引數是表示是否恢復任務輸出的標誌位recoverTaskOutput,對於Map-Only型作業中Map任務和所有型別作業的Reduce任務,都需要恢復,標誌位recoverTaskOutput為true,具體程式碼如下:scheduleTasks()方法遍歷傳入的任務集合taskIDs中的每個TaskId例項taskID,對taskID做以下處理:protected void scheduleTasks(Set<TaskId> taskIDs, boolean recoverTaskOutput) { // 遍歷傳入的任務集合taskIDs中的每個TaskId,對taskID做以下處理: for (TaskId taskID : taskIDs) { // 根據taskID從集合completedTasksFromPreviousRun中移除對應元素,並獲取被移除的元素TaskInfo例項taskInfo TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID); if (taskInfo != null) {// 若存在taskID對應任務資訊TaskInfo例項taskInfo // 構造T_RECOVER型別任務恢復事件TaskRecoverEvent,交給eventHandler處理,標誌位recoverTaskOutput表示是否恢復任務的輸出, // 對於Map-Only型Map任務和所有的Reduce任務,都需要恢復,標誌位recoverTaskOutput為true eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo, committer, recoverTaskOutput)); } else { // 否則,構造T_SCHEDULE型別任務排程事件TaskEvent,交給eventHandler處理 eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE)); } } }
1、根據taskID從集合completedTasksFromPreviousRun中移除對應元素,並獲取被移除的元素TaskInfo例項taskInfo;
2、若存在taskID對應任務資訊TaskInfo例項taskInfo,構造T_RECOVER型別任務恢復事件TaskRecoverEvent,交給eventHandler處理,標誌位recoverTaskOutput表示是否恢復任務的輸出,對於Map-Only型Map任務和所有的Reduce任務,都需要恢復,標誌位recoverTaskOutput為true;
3、否則,構造T_SCHEDULE型別任務排程事件TaskEvent,交給eventHandler處理。
我們先看T_SCHEDULE型別任務排程事件TaskEvent的處理,它是交由Job的eventHandler來處理的,而這個eventHandler是在Job被建立時(即構造JobImpl例項時)由MRAppMaster的dispatcher來賦值的,而在MRAppMaster中,dispatcher被建立後就會註冊任務事件的處理器TaskEventDispatcher例項,程式碼如下:
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
而這個任務事件處理器TaskEventDispatcher中處理任務事件TaskEvent的handle()方法定義如下: private class TaskEventDispatcher implements EventHandler<TaskEvent> {
@SuppressWarnings("unchecked")
@Override
public void handle(TaskEvent event) {
Task task = context.getJob(event.getTaskID().getJobId()).getTask(
event.getTaskID());
((EventHandler<TaskEvent>)task).handle(event);
}
}
它實際上是通過作業Job中相關任務Task的handle()方法來處理的,而這個任務Task的實現則是TaskImpl,其中對於各種任務事件的處理,也是類似作業Job,由一個任務Task的狀態機進行處理,關於任務Task的狀態機,我們會有專門的文章進行介紹,這裡,您只需要知道在TaskImpl中,對於上述兩種任務狀態機中任務狀態的轉換、觸發事件及事件處理者定義如下: private static final StateMachineFactory
<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
stateMachineFactory
= new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
(TaskStateInternal.NEW)
// 省略部分程式碼
.addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED,
TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
// 省略部分程式碼
.addTransition(TaskStateInternal.NEW,
EnumSet.of(TaskStateInternal.FAILED,
TaskStateInternal.KILLED,
TaskStateInternal.RUNNING,
TaskStateInternal.SUCCEEDED),
TaskEventType.T_RECOVER, new RecoverTransition())
// 省略部分程式碼
由此可見,對於T_RECOVER型別任務恢復事件TaskRecoverEvent,Task狀態機指定由RecoverTransition處理,並且任務Task的狀態會由NEW轉換為RUNNING、FAILED、KILLED、SUCCEEDED等,而對於T_SCHEDULE型別任務排程事件TaskEvent,則由Task狀態機指定為InitialScheduleTransition處理,並且任務Task的狀態會由NEW轉換為SCHEDULED。下面,我們挨個進行分析。一、T_SCHEDULE型別任務排程事件TaskEvent
由InitialScheduleTransition進行處理,任務Task的狀態會由NEW轉換為SCHEDULED,InitialScheduleTransition程式碼如下:
private static class InitialScheduleTransition
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
// 新增並排程任務執行嘗試TaskAttempt,Avataar.VIRGIN表示它是第一個Attempt,
// 而剩餘的Avataar.SPECULATIVE表示它是為拖後腿任務開啟的一個Attempt,即推測執行原理
task.addAndScheduleAttempt(Avataar.VIRGIN);
// 設定任務的排程時間scheduledTime為當前時間
task.scheduledTime = task.clock.getTime();
// 傳送任務啟動事件
task.sendTaskStartedEvent();
}
}
InitialScheduleTransition的處理邏輯比較簡單,大體如下:1、呼叫addAndScheduleAttempt()方法,新增並排程任務執行嘗試TaskAttempt,Avataar.VIRGIN表示它是第一個Attempt,而剩餘的Avataar.SPECULATIVE表示它是為拖後腿任務開啟的一個Attempt,即推測執行原理;
2、設定任務的排程時間scheduledTime為當前時間;
3、傳送任務啟動事件。
其中,1中的addAndScheduleAttempt()方法實現如下:
// This is always called in the Write Lock
private void addAndScheduleAttempt(Avataar avataar) {
// 呼叫addAttempt()方法,建立一個任務執行嘗試TaskAttempt例項attempt,
// 並將其新增到attempt集合attempts中,還會設定attempt的Avataar屬性
TaskAttempt attempt = addAttempt(avataar);
// 將attempt的id新增到正在執行的attempt集合inProgressAttempts中
inProgressAttempts.add(attempt.getID());
//schedule the nextAttemptNumber
// 排程TaskAttempt
// 如果集合failedAttempts大小大於0,說明該Task之前有TaskAttempt失敗過,此次為重新排程,
// TaskAttemp事件型別為TA_RESCHEDULE,
if (failedAttempts.size() > 0) {
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_RESCHEDULE));
} else {
// 否則為TaskAttemp事件型別為TA_SCHEDULE
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_SCHEDULE));
}
}
addAndScheduleAttempt()方法處理邏輯如下:1、呼叫addAttempt()方法,建立一個任務執行嘗試TaskAttempt例項attempt,並將其新增到attempt集合attempts中,還會設定attempt的Avataar屬性;
2、將attempt的id新增到正在執行的attempt集合inProgressAttempts中;
3、排程TaskAttempt:如果集合failedAttempts大小大於0,說明該Task之前有TaskAttempt失敗過,此次為重新排程,TaskAttemp事件型別為TA_RESCHEDULE,否則為TaskAttemp事件型別為TA_SCHEDULE。
而addAttempt()方法實現如下:
private TaskAttemptImpl addAttempt(Avataar avataar) {
// 呼叫createAttempt()方法建立任務執行嘗試TaskAttemptImpl例項attempt
TaskAttemptImpl attempt = createAttempt();
// 設定attempt的Avataar屬性
attempt.setAvataar(avataar);
// 記錄debug級別日誌資訊:Created attempt ... ...
if (LOG.isDebugEnabled()) {
LOG.debug("Created attempt " + attempt.getID());
}
// 將建立的任務執行嘗試TaskAttemptImpl例項attempt與其ID的對應關係新增到TaskImpl的任務執行嘗試集合attempts中,
// attempts先被初始化為Collections.emptyMap()
// this.attempts = Collections.emptyMap();
switch (attempts.size()) {
case 0:
// 如果attempts大小為0,即為Collections.emptyMap(),則將其更換為Collections.singletonMap(),並加入該TaskAttemptImpl例項attempt
attempts = Collections.singletonMap(attempt.getID(),
(TaskAttempt) attempt);
break;
case 1:
// 如果attempts大小為1,即為Collections.singletonMap(),則將其替換為LinkedHashMap,並加入之前和現在的TaskAttemptImpl例項attempt
Map<TaskAttemptId, TaskAttempt> newAttempts
= new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts);
newAttempts.putAll(attempts);
attempts = newAttempts;
attempts.put(attempt.getID(), attempt);
break;
default:
// 如果attempts大小大於1,說明其實一個LinkedHashMap,直接put吧
attempts.put(attempt.getID(), attempt);
break;
}
// 累加TaskAttempt計數器nextAttemptNumber
++nextAttemptNumber;
// 返回TaskAttemptImpl例項attempt
return attempt;
}
其處理邏輯如下:1、呼叫createAttempt()方法建立任務執行嘗試TaskAttemptImpl例項attempt;
2、設定attempt的Avataar屬性;
3、記錄debug級別日誌資訊:Created attempt ... ...;
4、將建立的任務執行嘗試TaskAttemptImpl例項attempt與其ID的對應關係新增到TaskImpl的任務執行嘗試集合attempts中,attempts先被初始化為Collections.emptyMap():
4.1、如果attempts大小為0,即為Collections.emptyMap(),則將其更換為Collections.singletonMap(),並加入該TaskAttemptImpl例項attempt;
4.2、如果attempts大小為1,即為Collections.singletonMap(),則將其替換為LinkedHashMap,並加入之前和現在的TaskAttemptImpl例項attempt;
4.3、如果attempts大小大於1,說明其實一個LinkedHashMap,直接put吧;
5、累加TaskAttempt計數器nextAttemptNumber;
6、返回TaskAttemptImpl例項attempt。
繼續往下追蹤createAttempt()方法,其在TaskImpl中程式碼如下:
protected abstract TaskAttemptImpl createAttempt();
這是一個抽象方法,由其子類實現,而它的子類有兩個,表示Map任務的MapTaskImpl和表示Reduce任務的ReduceTaskImpl,其createAttempt()方法分別實現如下:1、MapTaskImpl.createAttempt()
@Override
protected TaskAttemptImpl createAttempt() {
return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
eventHandler, jobFile,
partition, taskSplitMetaInfo, conf, taskAttemptListener,
jobToken, credentials, clock, appContext);
}
生成一個MapTaskAttemptImpl例項,傳入表示Attempt序號的nextAttemptNumber、事件處理器eventHandler、作業檔案jobFile、分割槽資訊partition、分片元資料資訊taskSplitMetaInfo等關鍵變數。2、ReduceTaskImpl.createAttempt()
@Override
protected TaskAttemptImpl createAttempt() {
return new ReduceTaskAttemptImpl(getID(), nextAttemptNumber,
eventHandler, jobFile,
partition, numMapTasks, conf, taskAttemptListener,
jobToken, credentials, clock, appContext);
}
生成一個ReduceTaskAttemptImpl例項,除不需要分片元資料資訊taskSplitMetaInfo,和需要一個Map任務數numMapTasks外,其他與MapTaskAttemptImpl基本相同。
TaskAttempt生成了,接下來就應該進行排程執行了。我們再折回去看看addAndScheduleAttempt()方法中,傳送的TA_SCHEDULE或TA_RESCHEDULE型別的TaskAttemptEvent,其與JobImpl、TaskImpl一樣,是由TaskAttempt狀態機負責處理的,如下所示:
// 在事件TaskAttemptEventType.TA_SCHEDULE的觸發下,經過RequestContainerTransition的處理,
// TaskAttempt的狀態由NEW轉換成UNASSIGNED
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))
// 在事件TaskAttemptEventType.TA_SCHEDULE的觸發下,經過RequestContainerTransition的處理,
// TaskAttempt的狀態由NEW轉換成UNASSIGNED
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
TaskAttemptEventType.TA_RESCHEDULE, new RequestContainerTransition(true))
// 上述二者的區別是RequestContainerTransition傳入的標誌位rescheduled,前者為false,後者為true
在事件TaskAttemptEventType.TA_SCHEDULE的觸發下,經過RequestContainerTransition的處理,TaskAttempt的狀態由NEW轉換成UNASSIGNED;在事件TaskAttemptEventType.TA_SCHEDULE的觸發下,經過RequestContainerTransition的處理,TaskAttempt的狀態由NEW轉換成UNASSIGNED;上述二者的區別是RequestContainerTransition傳入的標誌位rescheduled,前者為false,後者為true。我們再看下RequestContainerTransition的實現,程式碼如下:
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
// Tell any speculator that we're requesting a container
// taskAttempt的事件處理器eventHandler處理SpeculatorEvent事件,告訴所有的speculator,此時正在申請一個容器
taskAttempt.eventHandler.handle
(new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1));
//request for container
// 申請容器
if (rescheduled) {// Task的Attempt重新排程
// 構造容器申請事件ContainerRequestEvent,並交由taskAttempt的事件處理器eventHandler處理,
// 這個eventHandler實際上是MRAppMaster中的dispatcher,依次經過TaskImpl、TaskAttemptImpl的建立傳遞過來的,
taskAttempt.eventHandler.handle(
ContainerRequestEvent.createContainerRequestEventForFailedContainer(
taskAttempt.attemptId,
taskAttempt.resourceCapability));
} else {// Task的Attempt第一次排程
// 構造容器申請事件ContainerRequestEvent,並交由taskAttempt的事件處理器eventHandler處理,
taskAttempt.eventHandler.handle(new ContainerRequestEvent(
taskAttempt.attemptId, taskAttempt.resourceCapability,
taskAttempt.dataLocalHosts.toArray(
new String[taskAttempt.dataLocalHosts.size()]),
taskAttempt.dataLocalRacks.toArray(
new String[taskAttempt.dataLocalRacks.size()])));
}
// 兩者建立的ContainerRequestEvent事件的區別是,rescheduled時,不需要考慮Node和Lock位置屬性,因為此時Attempt之前已經失敗過,此時應當能夠以完成Attempt為首要任務,
// 同時,兩者的事件型別都是ContainerAllocator.EventType.CONTAINER_REQ,
// MRAppMaster中的dispatcher針對該事件ContainerAllocator.EventType註冊的事件處理器是LocalContainerAllocator或RMContainerAllocator
}
RequestContainerTransition的transition()方法處理邏輯如下:1、TaskAttempt的事件處理器eventHandler處理SpeculatorEvent事件,告訴所有的speculator,此時正在申請一個容器;
2、申請容器:
2.1、如果是Task的Attempt重新排程,構造容器申請事件ContainerRequestEvent,並交由taskAttempt的事件處理器eventHandler處理,這個eventHandler實際上是MRAppMaster中的dispatcher,依次經過TaskImpl、TaskAttemptImpl的建立傳遞過來的;
2.2、否則如果是Task的Attempt第一次排程,構造容器申請事件ContainerRequestEvent,並交由taskAttempt的事件處理器eventHandler處理。
兩者建立的ContainerRequestEvent事件的區別是,rescheduled時,不需要考慮Node和Lock位置屬性,因為此時Attempt之前已經失敗過,此時應當能夠以完成Attempt為首要任務,同時,兩者的事件型別都是ContainerAllocator.EventType.CONTAINER_REQ,MRAppMaster中的dispatcher針對該事件ContainerAllocator.EventType註冊的事件處理器是LocalContainerAllocator或RMContainerAllocator。
關於Yarn容器等資源申請與分配RMContainerAllocator的介紹,我會在以後的文章中為大家講解,這裡,你只需要瞭解其執行的大體流程即可:
1、RMContainerAllocator首先間接繼承自AbstractService,它是Hadoop中的一種服務,有服務初始化serviceInit()及服務啟動serviceStart()方法要執行;
2、RMContainerAllocator針對容器請求分配事件,是一個雙重生產者-消費者模式,第一層生產者通過其handle()方法,將容器請求分配ContainerAllocatorEvent加入其內部eventQueue佇列,第一層消費者通過其內部事件處理執行緒eventHandlingThread,不斷的從事件佇列eventQueue中take事件進行消費,而消費的方式是做為第二層生產者,將事件按照任務型別放入排程請求列表scheduledRequests、pendingReduces中,scheduledRequests是一個複雜的區分Map和Reduce任務的會立即被排程的請求列表,而pendingReduces只是儲存等待被排程的Reduce任務請求的列表,其會根據Yarn中資源情況和Map任務完成情況確定是將事件移送至(即rampUp)scheduledRequests,還是從scheduledRequests移回Reduce任務排程請求至pendingReduces(即rampDown),而第二層的消費者則是RMContainerAllocator祖先父類RMCommunicator中的心跳執行緒allocatorThread,它週期性的呼叫heartbeat()方法,從Yarn的RM中獲取可用資源,然後消費scheduledRequests列表中的請求,進行容器分配;
3、RMContainerAllocator中,對於Map任務來說,它經歷的資料結構,或者生命週期為scheduled->assigned->completed,而Reduce任務則是pending->scheduled->assigned->completed;
4、經過一些的複雜邏輯後,包括綜合判斷資源情況、任務本地性、優先排程失敗任務、Map任務完成比例、針對拖後退的任務進行推測執行等,無論是Map任務還是Reduce任務,最終在分配到容器Container後,都會發送一個TaskAttemptContainerAssignedEvent事件,交由TaskAttemptImpl的狀態機中ContainerAssignedTransition進行處理,而其方法則最終會構造ContainerRemoteLaunchEvent事件,進行Container遠端載入,在遠端或本機或本程序Container中Launch任務嘗試進行任務的執行。
關於RMContainerAllocator,因為其結構、處理邏輯比較複雜,我會專門寫文章進行分析,敬請期待!
二、T_RECOVER型別任務恢復事件TaskRecoverEvent
未完待續!敬請關注後續文章!