YARN中MRAppMaster的事件驅動模型與狀態機處理訊息過程的分析
在MRv1中,物件之間的作用關係是基於函式呼叫實現的,當一個物件向另外一個物件傳遞訊息時,會直接採用函式呼叫的方式,並且這個過程是序列的。比如,當TaskTracker需要執行一個Task的時候,將首先下載Task依賴的檔案(JAR包,二進位制檔案等,字典檔案等),然後執行Task。在整個過程中,下載依賴檔案是阻塞式的,也就是說,前一個任務未完成檔案下載之前,後一個新任務將一直處於等待狀態,只有在下載完成之後,才會啟動一個獨立程序執行該任務。基於函式呼叫式的程式設計模型是低效的,它隱含著整個過程是序列,同步進行的。
相比之下,MRv2引入的時間驅動變成模型則是一種更加高效的方式。在基於事件驅動的程式設計模型中,所有物件被抽象成了事件處理器,而事件處理器之間通過事件相互關聯。每種事件處理一種型別的事件,同時根據需要出發另外一種事件。相比於基於函式呼叫的程式設計模型,這種程式設計方式具有非同步併發等特點,更加高效,更加適合大型分散式系統。
下面,我們以負責控制MapReduce作業的AppllicationMaster,也就是MRAppMaster為例,看一下整個事件驅動模型與狀態機。
首先,根據原始碼,我們可以看到在MRAppMaster中有一個最重要的中央非同步訊息排程器 AsyncDispatcher,它負責整個MRAppMaster模組的訊息排程。除此之外,還有多個其他的訊息排程器,比如 JobEventDispatcher, TaskEventDispatcher,TaskAttemptEventDispatcher,SpeculatorEventDispatcher 等等。在MRAppMaster的初始時,在serviceInit() 函式中有以下關鍵程式碼
this.jobEventDispatcher = new JobEventDispatcher(); //register the event dispatchers dispatcher.register(JobEventType.class, jobEventDispatcher); dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); dispatcher.register(CommitterEventType.class, committerEventHandler);
這裡的意思是,當中央訊息排程器dispatcher如果收到JobEventType的訊息後,將會把這個訊息轉發給名為jobEventDispatcher的JobEventDispatcher的訊息排程器。這個JobEventDispatcher是專門用來處理JobEventType事件的訊息處理器。底層實現其實就是將一個pair<JobEventType.class, JobEventDispatcher>這個KV放入中央訊息排程器dispatcher的HashMap中。剩下的幾個訊息排程器一樣,註冊相應的訊息型別和其相對應的訊息排程器。
之後dispatcher啟動一個處理訊息執行緒,可以在下面程式碼看到中央訊息處理器處理訊息。
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
Event event;
try {
event = eventQueue.take(); // 從中央訊息處理器dispatcher的訊息佇列中獲得訊息
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
dispatch(event); // 分發該訊息
}
}
}
};
}
protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
}
Class<? extends Enum> type = event.getType().getDeclaringClass(); //獲取event的型別
try{
EventHandler handler = eventDispatchers.get(type); // 根據event型別從HashMap中獲取對應的訊息處理器
if(handler != null) {
handler.handle(event); // 使用獲得的訊息處理器處理這個event,跳轉至相應的處理器實現
} else {
throw new Exception("No handler for registered for " + type);
}
}
catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread", t);
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
}
當Client提交一個Job之後,MRAppMaster建立一個job,併發送一個JOB_INIT事件給中央訊息處理器dispatcher。下面我們以JOB_INIT事件為例,看看發生了什麼。
首先dispatcher收到JOB_INIT事件之後,根據JOB_INIT的事件型別(JobEventType.class)獲得事件的處理器應該是jobEventDispatcher,便將其傳送給jobEventDispatcher。
private class JobEventDispatcher implements EventHandler<JobEvent> {
@SuppressWarnings("unchecked")
@Override
public void handle(JobEvent event) {
((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event); // 由這個job處理這個event
}
}
JobEventDispatcher處理這個JOB_INIT訊息。之後進入JobImpl 的訊息處理函式public void handle(JobEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing " + event.getJobId() + " of type "
+ event.getType());
}
try {
writeLock.lock();
JobStateInternal oldState = getInternalState();
try {
getStateMachine().doTransition(event.getType(), event); // 獲取job的當前的狀態機,根據當前訊息event進行狀態變換。
} catch (InvalidStateTransitonException e) { // 會跳轉至 StateMachine 實現的 doTransition()函式。
LOG.error("Can't handle this event at current state", e);
addDiagnostic("Invalid event " + event.getType() +
" on Job " + this.jobId);
eventHandler.handle(new JobEvent(this.jobId,
JobEventType.INTERNAL_ERROR));
}
//notify the eventhandler of state change
if (oldState != getInternalState()) {
LOG.info(jobId + "Job Transitioned from " + oldState + " to "
+ getInternalState());
rememberLastNonFinalState(oldState);
}
}
finally {
writeLock.unlock();
}
}
MRAppMaster訊息分發過程到此結束,這之後就進入了Job狀態機的操作,當然根據任務的不同,也許會進入Task狀態機,TaskAttempt狀態機等等。總之,狀態機以及狀態機中的各種hook進行對應Job/Task/TaskAttempt的控制。
我們知道一個job對應了一個狀態機,我們也應該知道在YARN的實現中一個狀態機由以下三個部分組成: 1. 狀態(節點) 2. 事件(弧) 3. Hook(觸發事件後的處理)。
在JobImpl.java檔案中,我們可以看到構建job狀態機的過程:
protected static final
StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
stateMachineFactory
= new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
(JobStateInternal.NEW) // 構造JobImpl狀態機,初始狀態是 NEW 狀態
// Transitions from NEW state
.addTransition(JobStateInternal.NEW, JobStateInternal.NEW, // 新增一個狀態變化
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobStateInternal.NEW, JobStateInternal.NEW, // 新增一個狀態變化
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition // 新增一個狀態變化
(JobStateInternal.NEW,
EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
JobEventType.JOB_INIT,
new InitTransition())
.addTransition(JobStateInternal.NEW, JobStateInternal.KILLED,
JobEventType.JOB_KILL,
new KillNewJobTransition())
.addTransition(JobStateInternal.NEW, JobStateInternal.ERROR,
JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
.addTransition(JobStateInternal.NEW, JobStateInternal.REBOOT,
JobEventType.JOB_AM_REBOOT,
INTERNAL_REBOOT_TRANSITION)
...
...
後面還有很多,job狀態機是比較一個複雜的狀態機,涉及到很多狀態與事件,可以通過 對YARN狀態機視覺化深入瞭解,在此不做更多討論。呼叫addTransition()函式可以新增一個狀態變化。addTransition()函式中有四個引數,分別為: 1.preState 2.postState 3.eventType 4.hook。 它們表示 1.事件發生前的狀態,2.事件發生後的狀態, 3.事件, 4.事件發生時觸發的hook
程式碼中 addTransition(JobStateInternal.NEW,EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),JobEventType.JOB_INIT,new InitTransition()) 這一個狀態變化表示,事件發生前是 NEW狀態,事件發生後是 INITED狀態 或者 FAILED狀態,事件是JOB_INIT, hook是InitTransition物件。也就是如果Job狀態機當前狀態是NEW,這是發生了一個JOB_INIT事件,那麼狀態機會觸發InitTransition物件中的doTransition()函式,如果transition()函式返回 INITED狀態,那麼狀態機最新狀態就是 INITED, 如果doTransition()函式返回 FAILED狀態,那麼狀態機最新狀態就是FAILED。
那麼我們來看一下狀態機的狀態變換的過程。
private class InternalStateMachine
implements StateMachine<STATE, EVENTTYPE, EVENT> {
private final OPERAND operand;
private STATE currentState;
InternalStateMachine(OPERAND operand, STATE initialState) {
this.operand = operand;
this.currentState = initialState;
if (!optimized) {
maybeMakeStateMachineTable();
}
}
@Override
public synchronized STATE getCurrentState() {
return currentState;
}
@Override
public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event)
throws InvalidStateTransitonException {
currentState = StateMachineFactory.this.doTransition // 發生狀態機變化
(operand, currentState, eventType, event);
return currentState;
}
}
private STATE doTransition
(OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event)
throws InvalidStateTransitonException {
// We can assume that stateMachineTable is non-null because we call
// maybeMakeStateMachineTable() when we build an InnerStateMachine ,
// and this code only gets called from inside a working InnerStateMachine .
Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
= stateMachineTable.get(oldState); // 根據當前狀態獲取所有該狀態有可能發生的事件
if (transitionMap != null) {
Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition
= transitionMap.get(eventType); // 根據事件獲取對應的hook,為一個Transition物件(Transition是一個介面)
if (transition != null) {
return transition.doTransition(operand, oldState, event, eventType); // 呼叫該物件的doTransition()函式
}
}
throw new InvalidStateTransitonException(oldState, eventType);
}
private class SingleInternalArc
implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {
private STATE postState;
private SingleArcTransition<OPERAND, EVENT> hook; // transition hook
SingleInternalArc(STATE postState,
SingleArcTransition<OPERAND, EVENT> hook) {
this.postState = postState;
this.hook = hook;
}
@Override
public STATE doTransition(OPERAND operand, STATE oldState,
EVENT event, EVENTTYPE eventType) {
if (hook != null) {
hook.transition(operand, event); // 呼叫 hook 的 transition()函式
}
return postState;
}
}
這個 stateMachineTable 是一個 Map<state, Map<eventType, Transition>> 資料結構。根據當前狀態可以獲取這個狀態的所有<eventType, Transition>對應關係。之後根據event型別可以獲得 Transition物件(也就是一個hook)。那麼,狀態機就可以由當前狀態,事件呼叫hook函數了。本質上就是 StateMachine.hook(preState,
event) 的形式,因為OO的思想發生了變形。之後就是Transition物件的具體實現了,由於在一開始構造狀態機的時候,JobImpl 構造了一個狀態變化 addTransition(JobStateInternal.NEW,EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),JobEventType.JOB_INIT,new InitTransition()),這裡的hook是 InitTransition 物件。那麼hook被呼叫的時候,就會呼叫
InitTransition 物件的 transition() 函式。InitTransition 物件的 transition() 函式中包括了很多初始化一個job所做的操作,比如建立MapTask,建立ReduceTask,建立MapTask所需要的輸入檔案Split 等等,在這裡就不做過多敘述,可以自行閱讀相關文章。
這個就是YARN的狀態機工作的基本流程以及如何與 MRAppMaster的訊息分發機制相關聯的過程。由於YARN相對於MRv1的諸多改進,現在MRAppMaster取代了過去MRv1中的JobTracker對Job與Task進行控制,結構更加清晰,模組化更加明顯。總體而言,MRAppMaster是一個相對比較複雜的模組,需要進行更多的更加仔細分析。
如需轉載請表明 轉載自 http://blog.csdn.net/gjt19910817/article/details/43441801