1. 程式人生 > >YARN中MRAppMaster的事件驅動模型與狀態機處理訊息過程的分析

YARN中MRAppMaster的事件驅動模型與狀態機處理訊息過程的分析

在MRv1中,物件之間的作用關係是基於函式呼叫實現的,當一個物件向另外一個物件傳遞訊息時,會直接採用函式呼叫的方式,並且這個過程是序列的。比如,當TaskTracker需要執行一個Task的時候,將首先下載Task依賴的檔案(JAR包,二進位制檔案等,字典檔案等),然後執行Task。在整個過程中,下載依賴檔案是阻塞式的,也就是說,前一個任務未完成檔案下載之前,後一個新任務將一直處於等待狀態,只有在下載完成之後,才會啟動一個獨立程序執行該任務。基於函式呼叫式的程式設計模型是低效的,它隱含著整個過程是序列,同步進行的。

相比之下,MRv2引入的時間驅動變成模型則是一種更加高效的方式。在基於事件驅動的程式設計模型中,所有物件被抽象成了事件處理器,而事件處理器之間通過事件相互關聯。每種事件處理一種型別的事件,同時根據需要出發另外一種事件。相比於基於函式呼叫的程式設計模型,這種程式設計方式具有非同步併發等特點,更加高效,更加適合大型分散式系統。

下面,我們以負責控制MapReduce作業的AppllicationMaster,也就是MRAppMaster為例,看一下整個事件驅動模型與狀態機。

首先,根據原始碼,我們可以看到在MRAppMaster中有一個最重要的中央非同步訊息排程器 AsyncDispatcher,它負責整個MRAppMaster模組的訊息排程。除此之外,還有多個其他的訊息排程器,比如 JobEventDispatcher, TaskEventDispatcher,TaskAttemptEventDispatcher,SpeculatorEventDispatcher 等等。在MRAppMaster的初始時,在serviceInit() 函式中有以下關鍵程式碼

this.jobEventDispatcher = new JobEventDispatcher();
//register the event dispatchers
dispatcher.register(JobEventType.class, jobEventDispatcher);
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
dispatcher.register(CommitterEventType.class, committerEventHandler);

這裡的意思是,當中央訊息排程器dispatcher如果收到JobEventType的訊息後,將會把這個訊息轉發給名為jobEventDispatcher的JobEventDispatcher的訊息排程器。這個JobEventDispatcher是專門用來處理JobEventType事件的訊息處理器。底層實現其實就是將一個pair<JobEventType.class, JobEventDispatcher>這個KV放入中央訊息排程器dispatcher的HashMap中。剩下的幾個訊息排程器一樣,註冊相應的訊息型別和其相對應的訊息排程器。

之後dispatcher啟動一個處理訊息執行緒,可以在下面程式碼看到中央訊息處理器處理訊息。

  Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          Event event;
          try {
            event = eventQueue.take();   // 從中央訊息處理器dispatcher的訊息佇列中獲得訊息
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
            dispatch(event);   // 分發該訊息
          }
        }
      }
    };
  }
  protected void dispatch(Event event) {
    //all events go thru this loop
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
          + event.toString());
    }

    Class<? extends Enum> type = event.getType().getDeclaringClass();  //獲取event的型別

    try{
      EventHandler handler = eventDispatchers.get(type);  // 根據event型別從HashMap中獲取對應的訊息處理器
      if(handler != null) {
        handler.handle(event);  // 使用獲得的訊息處理器處理這個event,跳轉至相應的處理器實現
      } else {
        throw new Exception("No handler for registered for " + type);
      }
    }
    catch (Throwable t) {
      //TODO Maybe log the state of the queue
      LOG.fatal("Error in dispatcher thread", t);
      if (exitOnDispatchException
          && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
        LOG.info("Exiting, bbye..");
        System.exit(-1);
      }
    }
  }

當Client提交一個Job之後,MRAppMaster建立一個job,併發送一個JOB_INIT事件給中央訊息處理器dispatcher。下面我們以JOB_INIT事件為例,看看發生了什麼。

首先dispatcher收到JOB_INIT事件之後,根據JOB_INIT的事件型別(JobEventType.class)獲得事件的處理器應該是jobEventDispatcher,便將其傳送給jobEventDispatcher。

private class JobEventDispatcher implements EventHandler<JobEvent> {
  @SuppressWarnings("unchecked")
  @Override
  public void handle(JobEvent event) {
    ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);  // 由這個job處理這個event
  }
}
JobEventDispatcher處理這個JOB_INIT訊息。之後進入JobImpl 的訊息處理函式
public void handle(JobEvent event) {
  if (LOG.isDebugEnabled()) {
    LOG.debug("Processing " + event.getJobId() + " of type "
        + event.getType());
  }
  try {
    writeLock.lock();
    JobStateInternal oldState = getInternalState();
    try {
       getStateMachine().doTransition(event.getType(), event);   // 獲取job的當前的狀態機,根據當前訊息event進行狀態變換。
    } catch (InvalidStateTransitonException e) {                 // 會跳轉至 StateMachine 實現的 doTransition()函式。
      LOG.error("Can't handle this event at current state", e);
      addDiagnostic("Invalid event " + event.getType() + 
          " on Job " + this.jobId);
      eventHandler.handle(new JobEvent(this.jobId,
          JobEventType.INTERNAL_ERROR));
    }
    //notify the eventhandler of state change
    if (oldState != getInternalState()) {
      LOG.info(jobId + "Job Transitioned from " + oldState + " to "
               + getInternalState());
      rememberLastNonFinalState(oldState);
    }
  }
  
  finally {
    writeLock.unlock();
  }
}
MRAppMaster訊息分發過程到此結束,這之後就進入了Job狀態機的操作,當然根據任務的不同,也許會進入Task狀態機,TaskAttempt狀態機等等。總之,狀態機以及狀態機中的各種hook進行對應Job/Task/TaskAttempt的控制。

我們知道一個job對應了一個狀態機,我們也應該知道在YARN的實現中一個狀態機由以下三個部分組成: 1. 狀態(節點)  2. 事件(弧)  3. Hook(觸發事件後的處理)。

在JobImpl.java檔案中,我們可以看到構建job狀態機的過程:

protected static final
  StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> 
     stateMachineFactory
   = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
            (JobStateInternal.NEW)       // 構造JobImpl狀態機,初始狀態是 NEW 狀態

        // Transitions from NEW state
        .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,          // 新增一個狀態變化
            JobEventType.JOB_DIAGNOSTIC_UPDATE,
            DIAGNOSTIC_UPDATE_TRANSITION)
        .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,          // 新增一個狀態變化
            JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
        .addTransition                                                      // 新增一個狀態變化
            (JobStateInternal.NEW,
            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
            JobEventType.JOB_INIT,
            new InitTransition())
        .addTransition(JobStateInternal.NEW, JobStateInternal.KILLED,
            JobEventType.JOB_KILL,
            new KillNewJobTransition())
        .addTransition(JobStateInternal.NEW, JobStateInternal.ERROR,
            JobEventType.INTERNAL_ERROR,
            INTERNAL_ERROR_TRANSITION)
        .addTransition(JobStateInternal.NEW, JobStateInternal.REBOOT,
            JobEventType.JOB_AM_REBOOT,
            INTERNAL_REBOOT_TRANSITION)
            ...
            ...   
後面還有很多,job狀態機是比較一個複雜的狀態機,涉及到很多狀態與事件,可以通過 對YARN狀態機視覺化深入瞭解,在此不做更多討論。
呼叫addTransition()函式可以新增一個狀態變化。addTransition()函式中有四個引數,分別為: 1.preState  2.postState  3.eventType 4.hook。  它們表示 1.事件發生前的狀態,2.事件發生後的狀態, 3.事件, 4.事件發生時觸發的hook

程式碼中  addTransition(JobStateInternal.NEW,EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),JobEventType.JOB_INIT,new InitTransition()) 這一個狀態變化表示,事件發生前是 NEW狀態,事件發生後是 INITED狀態 或者 FAILED狀態,事件是JOB_INIT, hook是InitTransition物件。也就是如果Job狀態機當前狀態是NEW,這是發生了一個JOB_INIT事件,那麼狀態機會觸發InitTransition物件中的doTransition()函式,如果transition()函式返回 INITED狀態,那麼狀態機最新狀態就是 INITED, 如果doTransition()函式返回 FAILED狀態,那麼狀態機最新狀態就是FAILED。

那麼我們來看一下狀態機的狀態變換的過程。

private class InternalStateMachine
      implements StateMachine<STATE, EVENTTYPE, EVENT> {
  private final OPERAND operand;
  private STATE currentState;

  InternalStateMachine(OPERAND operand, STATE initialState) {
    this.operand = operand;
    this.currentState = initialState;
    if (!optimized) {
      maybeMakeStateMachineTable();
    }
  }

  @Override
  public synchronized STATE getCurrentState() {
    return currentState;
  }

  @Override
  public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event)
       throws InvalidStateTransitonException  {
    currentState = StateMachineFactory.this.doTransition                       // 發生狀態機變化
        (operand, currentState, eventType, event);
    return currentState;
  }
}
private STATE doTransition
         (OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event)
    throws InvalidStateTransitonException {
  // We can assume that stateMachineTable is non-null because we call
  //  maybeMakeStateMachineTable() when we build an InnerStateMachine ,
  //  and this code only gets called from inside a working InnerStateMachine .
  Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
    = stateMachineTable.get(oldState);                                // 根據當前狀態獲取所有該狀態有可能發生的事件
  if (transitionMap != null) {
    Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition
        = transitionMap.get(eventType);                               // 根據事件獲取對應的hook,為一個Transition物件(Transition是一個介面)
    if (transition != null) {
      return transition.doTransition(operand, oldState, event, eventType);  // 呼叫該物件的doTransition()函式
    }
  }
  throw new InvalidStateTransitonException(oldState, eventType);
}
private class SingleInternalArc
                  implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {

  private STATE postState;
  private SingleArcTransition<OPERAND, EVENT> hook; // transition hook

  SingleInternalArc(STATE postState,
      SingleArcTransition<OPERAND, EVENT> hook) {
    this.postState = postState;
    this.hook = hook;
  }

  @Override
  public STATE doTransition(OPERAND operand, STATE oldState,
                            EVENT event, EVENTTYPE eventType) {
    if (hook != null) {
      hook.transition(operand, event);          // 呼叫 hook 的 transition()函式
    }
    return postState;
  }
}
這個 stateMachineTable 是一個 Map<state, Map<eventType, Transition>> 資料結構。根據當前狀態可以獲取這個狀態的所有<eventType, Transition>對應關係。之後根據event型別可以獲得 Transition物件(也就是一個hook)。那麼,狀態機就可以由當前狀態,事件呼叫hook函數了。本質上就是  StateMachine.hook(preState, event) 的形式,因為OO的思想發生了變形。之後就是Transition物件的具體實現了,由於在一開始構造狀態機的時候,JobImpl 構造了一個狀態變化 addTransition(JobStateInternal.NEW,EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),JobEventType.JOB_INIT,new InitTransition()),這裡的hook是 InitTransition 物件。那麼hook被呼叫的時候,就會呼叫 InitTransition 物件的 transition() 函式。InitTransition 物件的 transition() 函式中包括了很多初始化一個job所做的操作,比如建立MapTask,建立ReduceTask,建立MapTask所需要的輸入檔案Split 等等,在這裡就不做過多敘述,可以自行閱讀相關文章。

這個就是YARN的狀態機工作的基本流程以及如何與 MRAppMaster的訊息分發機制相關聯的過程。由於YARN相對於MRv1的諸多改進,現在MRAppMaster取代了過去MRv1中的JobTracker對Job與Task進行控制,結構更加清晰,模組化更加明顯。總體而言,MRAppMaster是一個相對比較複雜的模組,需要進行更多的更加仔細分析。

如需轉載請表明 轉載自 http://blog.csdn.net/gjt19910817/article/details/43441801