1. 程式人生 > >原始碼走讀-Yarn-ResourceManager02-RM的啟動之RM詳解

原始碼走讀-Yarn-ResourceManager02-RM的啟動之RM詳解

0x03 RM的啟動之RM詳解

3.1 ResourceManager的繼承關係

3.1.1 ResourceManager第一印象

我們先來看看這個類:

/**
 * The ResourceManager is the main class that is a set of components.
 * "I am the ResourceManager. All your resources belong to us..."
 * 
 *  ResourceManager是一個擁有一系列元件的主類,他擁有所有資源。
 */
@SuppressWarnings("unchecked"
) public class ResourceManager extends CompositeService implements Recoverable

我們可以看到他繼承了CompositeService類,實現了Recoverable介面。ResourceManager的關係類圖如下:

ResourceManager類關係圖

弄懂Service和上層類的關係是很重要的,不然後面程式碼你頭都要看大,所以下面我們會詳細說下上層的介面和類。

3.1.2 Recoverable介面與實現

我們先看看Recoverable介面:

//這個類很簡單,就只有一個recover方法
public interface Recoverable
{
public void recover(RMState state) throws Exception; }

下面我們看看ResourceManager內實現的recover方法:

  @Override
  public void recover(RMState state) throws Exception {
    // recover RMdelegationTokenSecretManager
    rmContext.getRMDelegationTokenSecretManager().recover(state);

    // recover AMRMTokenSecretManager
rmContext.getAMRMTokenSecretManager().recover(state); // recover applications rmAppManager.recover(state); setSchedulerRecoveryStartAndWaitTime(state, conf); }

這個方法裡分別恢復了RMdelegationTokenSecretManagerAMRMTokenSecretManager以及所有的app,最後記錄排程器的恢復過程的開始和結束時間。

3.1.3 Closeable與Service介面

現在我們看看另一路ResourceManager的繼承關係。回顧一下,由下往上的關係是ResourceManager->CompositeService->AbstractService->Service->Closeable->AutoCloseable

我們先說下Closeable介面。很簡單,就是說實現該介面的類就表明有資源可以被關閉,關閉時呼叫close()方法,但要注意處理IOException。他的爸即AutoCloseable的不同就是close方法丟擲的是Exception異常,這裡不再贅述。

public interface Closeable extends AutoCloseable {
    public void close() throws IOException;
}

接下來是Service介面,位於org.apache.hadoop.service包內:

//定義服務的宣告週期
@Public
@Evolving
public interface Service extends Closeable {
   // 服務狀態列舉類
  public enum STATE {
    /** 已構建但尚未初始化 */
    NOTINITED(0, "NOTINITED"),

    /** 已初始化但還沒有開始或結束 */
    INITED(1, "INITED"),

    /** 已開始,尚未結束 */
    STARTED(2, "STARTED"),

    /** 已結束,不允許再過度到其他狀態 */
    STOPPED(3, "STOPPED");

    // 一個int值,用來在陣列查詢和JXM介面。
    // 雖然Enamu的ordinal()方法有這個作用,但是隨著時間推移提供更多的穩定性保證
    private final int value;

    private final String statename;

    // 狀態列舉類的構造方法,跟前文定義的狀態序號和狀態名匹配
    private STATE(int value, String name) {
      this.value = value;
      this.statename = name;
    }

    public int getValue() {
      return value;
    }

    @Override
    public String toString() {
      return statename;
    }
  }

 /**
  * 服務初始化的方法
  * 
  * 狀態必須從 NOINITED -> INITED。
  * 除非init操作失敗而且帶有異常丟擲時,在這種情況下stop方法必須被呼叫隨後服務狀態變為STOPPED
  * 
  * config引數是關於服務的配置
  * 此外,要注意當操作過程中發生任何異常時會丟擲RuntimeException
  */
  void init(Configuration config);

 /**
  * 服務開始的方法
  * 
  * 狀態必須從 INITED -> STARTED。
  * 除非start操作失敗而且帶有異常丟擲時,在這種情況下stop方法必須被呼叫隨後服務狀態變為STOPPED
  * 
  * 要注意當操作過程中發生任何異常時會丟擲RuntimeException
  */
  void start();

 /**
  * 服務停止的方法
  * 
  * 如果服務已經處於STOPPED狀態時,則該操作必須是個空操作。
  * 該方法的實現中應該儘量關閉該服務的所有部分,不論服務處於什麼狀態都應該完成。
  * 
  * 當操作過程中發生任何異常時會丟擲RuntimeException
  */
  void stop();

 /**
  * 服務停止的方法
  * 設計為可在Java7閉包子句中使用的stop()版本
  *
  * 永遠不應該丟擲IOException
  * 當操作過程中發生任何異常時會丟擲RuntimeException
  */
  void close() throws IOException;

  // 註冊服務狀態更改事件的偵聽器,如果已經註冊過就為空操作
  void registerServiceListener(ServiceStateChangeListener listener);

  // 登出一個註冊過的監聽器,如果已經登出就為空操作
  void unregisterServiceListener(ServiceStateChangeListener listener);

  // 獲取服務名
  String getName();

  // 獲取服務的配置
  Configuration getConfig();

  STATE getServiceState();

  // 獲取服務開始時間,若尚未開始就返回0
  long getStartTime();

  // 判斷服務是否處於傳入的狀態(判斷結果僅限於呼叫時)
  boolean isInState(STATE state);

  // 獲取服務中第一個丟擲的異常,若沒有異常記錄就返回空
  Throwable getFailureCause();

  //返回當getFailureCause()發生時的狀態,如果沒有發生過就返回空
  STATE getFailureState();

  /**
  * 在指定時間內阻塞等待服務結束。
  * 
  * 這個方法僅會在所有服務結束操作被執行完成後(得到成功或失敗的結果)或者超出指定時間後才會返回。
  * 這個方法可以在服務INITED或是STARTED狀態前呼叫,這樣做是為了為了消除在此事件發生之前服務停止的任何競爭條件。
  *
  * timeout 引數為超時毫秒,0代表永遠
  * 當服務在指定時間內停止時就返回true
  */
  boolean waitForServiceToStop(long timeout);

  // 返回狀態轉移的歷史快照(靜態list),如果沒有記錄就返回一個沒有元素的非Null list
  public List<LifecycleEvent> getLifecycleHistory();
}

介紹完了Service類,可以看出這個類的主要作用就是確定服務狀態列舉以及狀態轉移相關的規範和定義。

3.1.4 AbstractService

下面看看AbstractService,他是所有服務的基礎實現類,十分重要,我們這裡重點介紹下常用的物件和方法,請注意我為了程式碼易讀修改了部分順序:

@Public
@Evolving
public abstract class AbstractService implements Service{
  // 注意服務名用final修飾,一旦指定後不可改變
  private final String name;

  // 封裝了服務狀態的模型類
  private final ServiceStateModel stateModel;

  // 構造方法
  public AbstractService(String name) {
    this.name = name;
    stateModel = new ServiceStateModel(name);
  }

  @Override
  public String getName() {
    return name;
  }

   // 獲取服務狀態
  @Override
  public final STATE getServiceState() {
    return stateModel.getState();
  }

  // 服務開始的時間戳,在服務開始前是0 
  private long startTime;

  @Override
  public long getStartTime() {
    return startTime;
  }


  // 服務狀態變化的監聽器, 
  private final ServiceOperations.ServiceListeners listeners
    = new ServiceOperations.ServiceListeners();

  // 注意和上面的監聽器區分,這個是全域性的、橫跨所有服務的監聽器
  private static ServiceOperations.ServiceListeners globalListeners
    = new ServiceOperations.ServiceListeners();

  // 生命週期歷史組成的list
  private final List<LifecycleEvent> lifecycleHistory
    = new ArrayList<LifecycleEvent>(5);

  // 狀態轉義時用來做物件鎖
  private final Object stateChangeLock = new Object();

  // 重寫自Service類,判斷服務狀態是否是指定狀態
  @Override
  public final boolean isInState(Service.STATE expected) {
    return stateModel.isInState(expected);
  }

  // 配置,在服務初始化後才會前都是Null 
  private volatile Configuration config;

  // 在呼叫init方法時呼叫該setConfig方法,而且僅應在因為某些原因導致服務實現需要重寫初始化配置時:
  // 比如,用繼承自Configuration的子類代替
  protected void setConfig(Configuration conf) {
    this.config = conf;
  }

  @Override
  public synchronized Configuration getConfig() {
    return config;
  }

  // 新增一個狀態變更事件到生命週期歷史中
  private void recordLifecycleEvent() {
    LifecycleEvent event = new LifecycleEvent();
    event.time = System.currentTimeMillis();
    event.state = getServiceState();
    lifecycleHistory.add(event);
  }

  // 重寫Service中的該方法,返回生命週期歷史組成的list
  @Override
  public synchronized List<LifecycleEvent> getLifecycleHistory() {
    return new ArrayList<LifecycleEvent>(lifecycleHistory);
  }

  // 進入指定狀態,並會呼叫recordLifecycleEvent記錄轉移事件。
  // 返回之前的狀態
  private STATE enterState(STATE newState) {
    assert stateModel != null : "null state in " + name + " " + this.getClass();
    STATE oldState = stateModel.enterState(newState);
    if (oldState != newState) {
      if (LOG.isDebugEnabled()) {
        LOG.debug(
          "Service: " + getName() + " entered state " + getServiceState());
      }
      recordLifecycleEvent();
    }
    return oldState;
  }

  // 將狀態變更通知本地和全域性監聽器。通知監聽器時發生的異常不允許向上傳遞。
  private void notifyListeners() {
    try {
      listeners.notifyListeners(this);
      globalListeners.notifyListeners(this);
    } catch (Throwable e) {
      LOG.warn("Exception while notifying listeners of " + this + ": " + e,
               e);
    }
  }

  // 重寫自Service類,會呼叫 serviceInit方法
  // 程式碼中可以看到,該方法如果多次反覆呼叫,除了第一次以外的呼叫都不會起作用
  // 也就是說該方法不可重入
  // configuration引數為空、狀態轉移非法或者是其他出錯時,會丟擲ServiceStateException
  @Override
  public void init(Configuration conf) {
    if (conf == null) {
      throw new ServiceStateException("Cannot initialize service "
                                      + getName() + ": null configuration");
    }
    if (isInState(STATE.INITED)) {
      return;
    }
    synchronized (stateChangeLock) {
      if (enterState(STATE.INITED) != STATE.INITED) {
        setConfig(conf);
        try {
          serviceInit(config);
          if (isInState(STATE.INITED)) {
            // 如果服務在INIT狀態結束,就通過監聽器通知
            notifyListeners();
          }
        } catch (Exception e) {
          noteFailure(e);
          ServiceOperations.stopQuietly(LOG, this);
          throw ServiceStateException.convert(e);
        }
      }
    }
  }

  // 重寫自Service類,會呼叫serviceStart方法
  // 當前服務狀態不允許start時,會丟擲ServiceStateException
  @Override
  public void start() {
    if (isInState(STATE.STARTED)) {
      return;
    }
    synchronized (stateChangeLock) {
      if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
        try {
          startTime = System.currentTimeMillis();
          serviceStart();
          if (isInState(STATE.STARTED)) {
            // 如果服務成功啟動了,通知監聽器
            notifyListeners();
          }
        } catch (Exception e) {
          noteFailure(e);
          ServiceOperations.stopQuietly(LOG, this);
          throw ServiceStateException.convert(e);
        }
      }
    }
  }

  // 服務失敗異常
  private Exception failureCause;

  // 服務失敗時處於的狀態,僅當服務因為一個錯誤導致失敗時合法
  private STATE failureState = null;

  @Override
  public final synchronized Throwable getFailureCause() {
    return failureCause;
  }

  @Override
  public synchronized STATE getFailureState() {
    return failureState;
  }

  // 記錄觸發該方法的異常
  protected final void noteFailure(Exception exception) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("noteFailure " + exception, null);
    }
    if (exception == null) {
      //make sure failure logic doesn't itself cause problems
      return;
    }
    //record the failure details, and log it
    synchronized (this) {
      if (failureCause == null) {
        failureCause = exception;
        failureState = getServiceState();
        LOG.info("Service " + getName()
                 + " failed in state " + failureState
                 + "; cause: " + exception,
                 exception);
      }
    }
  }

  // 用來在多執行緒中協助 waitForServiceToStop方法,為true代表該服務已經終止
  private final AtomicBoolean terminationNotification =
    new AtomicBoolean(false);

  // 重寫自Service類,會呼叫serviceStart方法
  // 當前服務狀態不允許start時,會丟擲ServiceStateException
  @Override
  public void stop() {
    if (isInState(STATE.STOPPED)) {
      return;
    }
    synchronized (stateChangeLock) {
      if (enterState(STATE.STOPPED) != STATE.STOPPED) {
        try {
          serviceStop();
        } catch (Exception e) {
          //stop-time exceptions are logged if they are the first one,
          noteFailure(e);
          throw ServiceStateException.convert(e);
        } finally {
          // 最終記錄改服務已經終止
          terminationNotification.set(true);
          // 喚醒阻塞在terminationNotification上面的所有執行緒
          synchronized (terminationNotification) {
            terminationNotification.notifyAll();
          }
          //notify anything listening for events
          notifyListeners();
        }
      } else {
        // 處理之前就已經是STOPPED,那就只做debug記錄
        if (LOG.isDebugEnabled()) {
          LOG.debug("Ignoring re-entrant call to stop()");
        }
      }
    }
  }

  @Override
  public final void close() throws IOException {
    stop();
  }

  // 等待服務在指定時間內終止
  @Override
  public final boolean waitForServiceToStop(long timeout) {
    boolean completed = terminationNotification.get();
    // 當服務未完成的時候,就等待指定timeout然後判斷是否完成
    while (!completed) {
      try {
        //獲取terminationNotification物件鎖
        synchronized(terminationNotification) {
          terminationNotification.wait(timeout);
        }
        // here there has been a timeout, the object has terminated,
        // or there has been a spurious wakeup (which we ignore)
        completed = true;
      } catch (InterruptedException e) {
        // interrupted; have another look at the flag
        completed = terminationNotification.get();
      }
    }
    // 最後返回當前是否服務終止
    return terminationNotification.get();
  }

  /* ===================================================================== */
  /* Override Points */
  /* ===================================================================== */

 /**
  * 服務所需的所有初始化程式碼
  * 
  * 這個方法僅會在特定的service類例項生命週期內被呼叫一次
  * 
  * 方法實現中不需要使用synchronized,因為init方法內已阻止了方法重入
  * 
  * 基本的實現是檢查傳入的conf引數和現有config物件是否一致,不一致就更新現有config
  */
  protected void serviceInit(Configuration conf) throws Exception {
    if (conf != config) {
      LOG.debug("Config has been overridden during init");
      setConfig(conf);
    }
  }

 /**
  * 在 INITED->STARTED 轉移時呼叫
  * 
  * 這個方法僅會在特定的service類例項生命週期內被呼叫一次
  * 
  * 方法實現中不需要使用synchronized,因為start方法內已阻止了方法重入
  * 
  * 按需丟擲異常,被呼叫者捕獲後然後觸發服務停止
  */
  protected void serviceStart() throws Exception {}

 /**
  * 在 狀態向 STARTED 轉移時呼叫
  * 
  * 這個方法僅會在特定的service類例項生命週期內被呼叫一次
  * 
  * 方法實現中不需要使用synchronized,因為stop方法內已阻止了方法重入
  * 
  * 實現該方法的程式碼在失敗處理方面必須是十分健壯的,包括檢查空引用等
  * 
  * 按需丟擲異常,被呼叫者捕獲後會記錄日誌
  */
  protected void serviceStop() throws Exception {

  }

  @Override
  public void registerServiceListener(ServiceStateChangeListener l) {
    listeners.add(l);
  }

  @Override
  public void unregisterServiceListener(ServiceStateChangeListener l) {
    listeners.remove(l);
  }

  // 註冊全域性的監聽JVM所有服務狀態變更的監聽器。注意和前面的例項監聽器區分
  public static void registerGlobalListener(ServiceStateChangeListener l) {
    globalListeners.add(l);
  }

  //登出一個全域性監聽器,當找到時就返回true,然後登出
  public static boolean unregisterGlobalListener(ServiceStateChangeListener l) {
    return globalListeners.remove(l);
  }

  // Package-scoped 方法,測試用:重置全域性監聽器列表
  @VisibleForTesting
  static void resetGlobalListeners() {
    globalListeners.reset();
  }

  // 重寫了toString方法
  @Override
  public String toString() {
    return "Service " + name + " in state " + stateModel;
  }

} 

以上就介紹完了AbstractService,可以看出這個類的主要實現了很多Service介面中的服務狀態和狀態轉移相關的方法,十分重要。我們已經對該套體系有了初步瞭解。比如呼叫某個繼承了AbstractService的類的init方法的套路就是先呼叫AbstractService中的init方法控制狀態轉移,然後呼叫這個類的serviceInit做具體初始化邏輯。腦袋裡記住這個流程,下面很多的service類都是這樣做的。

3.1.5 CompositeService

下面看看ResourceManager的直接父類 CompositeService

/**
 * Composition of services.
 */
@Public
@Evolving
public class CompositeService extends AbstractService

註釋很簡單,我負責組合所有服務。繼承關係也很簡單,我只有一個爹-AbstractService。下面說下他的主要物件和方法:

首先是靜態內部類、Runnable CompositeServiceShutdownHook,他的自我介紹很扯:JVM關閉的時候,我可以優雅地關閉CompositeService。

後面會提到,他會被RM註冊到ShutdownHookManager內。

/**
   * JVM Shutdown hook for CompositeService which will stop the give
   * CompositeService gracefully in case of JVM shutdown.
   */
  public static class CompositeServiceShutdownHook implements Runnable {

    private CompositeService compositeService;

    public CompositeServiceShutdownHook(CompositeService compositeService) {
      this.compositeService = compositeService;
    }

    @Override
    public void run() {
      ServiceOperations.stopQuietly(compositeService);
    }
  }

下面介紹幾個物件和方法:

//下面這個常量表示總是關閉所有,包括STARTED、INITED狀態的服務。
protected static final boolean STOP_ONLY_STARTED_SERVICES = false;

//存各子服務的list,前面提到過這個類是組合了多個服務
private final List<Service> serviceList = new ArrayList<Service>();

//構造方法,傳入服務名
public CompositeService(String name) {
    super(name);
  }

//獲取子服務列表,注意這裡使用了serviceList物件作為物件鎖,
//也就是說獲取的時候如果剛好有service新增將被阻塞不能被獲取到
public List<Service> getServices() {
    synchronized (serviceList) {
      return new ArrayList<Service>(serviceList);
    }
  }

//新增服務到子服務列表
protected void addService(Service service) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Adding service " + service.getName());
    }
    synchronized (serviceList) {
      serviceList.add(service);
    }
  }

//新增服務,在此之前判斷是否是Service實現類
protected boolean addIfService(Object object) {
    if (object instanceof Service) {
      addService((Service) object);
      return true;
    } else {
      return false;
    }
  }

//刪除service
protected synchronized boolean removeService(Service service) {
    synchronized (serviceList) {
      return serviceList.remove(service);
    }
  }

接下來是幾個特別重要的、並且重寫自AbstractService的狀態轉移操作的方法:

//服務啟動方法,很簡單,獲取本物件中存放的服務的list,挨個呼叫其init方法啟動
protected void serviceInit(Configuration conf) throws Exception {
    List<Service> services = getServices();
    if (LOG.isDebugEnabled()) {
      LOG.debug(getName() + ": initing services, size=" + services.size());
    }
    for (Service service : services) {
      service.init(conf);
    }
    super.serviceInit(conf);
  }

  protected void serviceStart() throws Exception {
    List<Service> services = getServices();
    if (LOG.isDebugEnabled()) {
      LOG.debug(getName() + ": starting services, size=" + services.size());
    }
    for (Service service : services) {
      // start the service. If this fails that service
      // will be stopped and an exception raised
      service.start();
    }
    super.serviceStart();
  }

  protected void serviceStop() throws Exception {
    //stop all services that were started
    int numOfServicesToStop = serviceList.size();
    if (LOG.isDebugEnabled()) {
      LOG.debug(getName() + ": stopping services, size=" + numOfServicesToStop);
    }
    stop(numOfServicesToStop, STOP_ONLY_STARTED_SERVICES);
    super.serviceStop();
  }

CompositeService到這裡我們就講完了,其實他很簡單,就是把多個service組合到一起,重寫了如serviceInit等狀態轉移方法,這裡是去操作多個service的相關狀態轉移方法。

到這裡,我們把ResourceManager的長輩們挨個介紹完了。

下面一節我們會進入這一章的主角-ResourceManager

3.2 ResourceManager詳解

3.2.1 main方法

因為該類程式碼量太大,直接看可能有點暈,我們不妨先看看main方法知道他是在幹嘛:

public static void main(String argv[]) {
    //設定主執行緒出現未定義捕獲處理的異常時的handler
    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
    //列印啟動日誌
    StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
    try {
      //初始化一個Yarn配置類例項
      Configuration conf = new YarnConfiguration();
      // If -format-state-store, then delete RMStateStore; else startup normally
      if (argv.length >= 1) {
        if (argv[0].equals("-format-state-store")) {
          deleteRMStateStore(conf);
        } else if (argv[0].equals("-remove-application-from-state-store")
            && argv.length == 2) {
          removeApplication(conf, argv[1]);
        } else {
          printUsage(System.err);
        }
      } else {
        // 我們的啟動指令碼引數會走這個分支
        ResourceManager resourceManager = new ResourceManager();
        //把RM的CompositeService的shutDownHook新增到一個統一的ShutdownHookManager,後面有專門章節講 ShutdownHookManager 的機制
        ShutdownHookManager.get().addShutdownHook(
          new CompositeServiceShutdownHook(resourceManager),
          SHUTDOWN_HOOK_PRIORITY);
        // 這裡就是呼叫AbstractService.init,然後呼叫ResourceManager.serviceInit
        resourceManager.init(conf);
        // 和上面類似,呼叫ResourceManager.serviceStart
        resourceManager.start();
      }
    } catch (Throwable t) {
      LOG.fatal("Error starting ResourceManager", t);
      System.exit(-1);
    }
  }

3.2.2 serviceInit

ResourceManager.serviceInit方法程式碼如下:

@Override
  protected void serviceInit(Configuration conf) throws Exception {
    this.conf = conf;
    // RM上下文,存有RM的許多重要成員
    this.rmContext = new RMContextImpl();

    //配置管理初始化
    this.configurationProvider =
        ConfigurationProviderFactory.getConfigurationProvider(conf);
    this.configurationProvider.init(this.conf);
    rmContext.setConfigurationProvider(configurationProvider);

    // 載入core-site.xml
    InputStream coreSiteXMLInputStream =
        this.configurationProvider.getConfigurationInputStream(this.conf,
            YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
    if (coreSiteXMLInputStream != null) {
      this.conf.addResource(coreSiteXMLInputStream,
          YarnConfiguration.CORE_SITE_CONFIGURATION_FILE);
    }

    // 從已載入的 core-site.xml檔案中獲取 使用者<->組 的對映表
    Groups.getUserToGroupsMappingServiceWithLoadedConfiguration(this.conf)
        .refresh();

    // 從已載入的 core-site.xml檔案中獲取 超級使用者<->組 的對映表
    // Or use RM specific configurations to overwrite the common ones first
    // if they exist
    RMServerUtils.processRMProxyUsersConf(conf);
    ProxyUsers.refreshSuperUserGroupsConfiguration(this.conf);

    // 載入 yarn-site.xml
    InputStream yarnSiteXMLInputStream =
        this.configurationProvider.getConfigurationInputStream(this.conf,
            YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
    if (yarnSiteXMLInputStream != null) {
      this.conf.addResource(yarnSiteXMLInputStream,
          YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
    }

     // 驗證config
    validateConfigs(this.conf);

    // 填充是否配置了RM 高可用
    this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
    //如果確認配置了RM高可用,就需要驗證現有配置的引數是否支援高可用,驗證不通過就丟擲異常
    if (this.rmContext.isHAEnabled()) {
      HAUtil.verifyAndSetConfiguration(this.conf);
    }

    // Set UGI and do login
    // If security is enabled, use login user
    // If security is not enabled, use current user
    this.rmLoginUGI = UserGroupInformation.getCurrentUser();
    try {
      doSecureLogin();
    } catch(IOException ie) {
      throw new YarnRuntimeException("Failed to login", ie);
    }

    // 註冊一個非同步Dispatcher,有一個單獨的執行緒來處理所有持續開啟的服務的各種EventType。
    // Yarn中採用了事件驅動的程式設計模型,後面很多不同的事件都用了這個dispatcher來處理。後面會詳細說
    rmDispatcher = setupDispatcher();
    // 將rmDispatcher放到CompositeService的serviceList
    addIfService(rmDispatcher);
    // 並放入RM上下文中
    rmContext.setDispatcher(rmDispatcher);

     // 註冊管理員服務
     // AdminService為管理員提供了一套獨立的服務介面,以防止大量的普通使用者的請求使得管理員傳送的管理命令餓死。
     // 管理員可以通過這些介面命令管理叢集,比如動態更新節點列表,更新ACL列表,更新佇列資訊等
    adminService = createAdminService();
    addService(adminService);
    rmContext.setRMAdminService(adminService);

    // 建立和初始化一批服務
    createAndInitActiveServices();

    webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
                      YarnConfiguration.RM_BIND_HOST,
                      WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));

     // 接著呼叫父類CompositeService的serviceInit方法,將他管理的服務全部初始化
    super.serviceInit(this.conf);
  }

可以看到以上程式碼主要是在做一些RM用到的服務的初始化操作並放入RMcontext中。下面我們看看上面提到的幾個重要方法。

首先看看setupDispatcher這個方法在做啥:

  private Dispatcher setupDispatcher() {
    Dispatcher dispatcher = createDispatcher();
    dispatcher.register(RMFatalEventType.class,
        new ResourceManager.RMFatalEventDispatcher());
    return dispatcher;
  }

好吧,又嵌套了兩個方法,我們挨個看:

  protected Dispatcher createDispatcher() {
    return new AsyncDispatcher();
  }

也就是說建立的實際上是建立了AsyncDispatcher類,可以點選這裡跳轉到該章節瞭解更多。上面我們就分析完了Dispatcher dispatcher = createDispatcher();,一句話看著簡單其實需要理解的不少。

下面接著看dispatcher.register(RMFatalEventType.class,new ResourceManager.RMFatalEventDispatcher())

其實這裡就用的是我們前面分析的AsyncDispatcher.register方法,將RMFatalEventType.class型別的Event處理器指定為RMFatalEventDispatcher。我們可以簡單看一下這個event和eventHandler:

// RMFatalEventType就是一個擁有表示兩種RM的Fatal錯時列舉類
@InterfaceAudience.Private
public enum RMFatalEventType {
  // Source <- Store
  STATE_STORE_OP_FAILED,

  // Source <- Embedded Elector
  EMBEDDED_ELECTOR_FAILED
}

// 這個處理器實現了介面EventHandler的唯一方法handle,定義了該事件處理邏輯
public static class RMFatalEventDispatcher
      implements EventHandler<RMFatalEvent> {

    @Override
    public void handle(RMFatalEvent event) {
      LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " +
          event.getType().name() + ". Cause:\n" + event.getCause());
        // 簡單粗暴,退出。我們就不再細看了
      ExitUtil.terminate(1, event.getCause());
    }
 }

講完了Dispatcher,接下來說說createAndInitActiveServices這個方法:

 protected void createAndInitActiveServices() throws Exception {
     // 新建RMActiveServices例項
    activeServices = new RMActiveServices(this);
    // 初始化RMActiveServices
    activeServices.init(conf);
  }

RMActiveServices的初始化方法中呼叫了多個重要服務,請點選這裡查閱:

到這裡,RM的serviceInit方法就講完了,主要工作是做很多RM用到的服務的初始化工作。下面開始講RM.serviceStart

3.2.3 serviceStart

 @Override
  protected void serviceStart() throws Exception {
    if (this.rmContext.isHAEnabled()) {
        // 允許ha,就變為StandBy狀態
      transitionToStandby(true);
    } else {
        // 否則,就變為Active狀態
      transitionToActive();
    }
     // 開啟webapp服務,主要是使用者認證服務
    startWepApp();
    if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,
        false)) {
      int port = webApp.port();
      WebAppUtils.setRMWebAppPort(conf, port);
    }
    // 最後把CompositeService裡面的所有服務全部呼叫start方法
    super.serviceStart();
  }

3.3 RM的重要內部類

3.3.1 RMActiveServices

  /**
   * 這個RMActiveServices是繼承自CompositeService,那麼他應該也是組合了多個Service
   * RMActiveServices 處理所有RM中的活躍的(Active)服務
   */
  @Private
  public class RMActiveServices extends CompositeService {

    private DelegationTokenRenewer delegationTokenRenewer;
    // 排程器對應的EventHandler
    private EventHandler<SchedulerEvent> schedulerDispatcher;
    private ApplicationMasterLauncher applicationMasterLauncher;
    private ContainerAllocationExpirer containerAllocationExpirer;
    private ResourceManager rm;
    private boolean recoveryEnabled;
    // 掌控所有RMActiveService的上下文
    private RMActiveServiceContext activeServiceContext;

    RMActiveServices(ResourceManager rm) {
      super("RMActiveServices");
      this.rm = rm;
    }

    @Override
    protected void serviceInit(Configuration configuration) throws Exception {
      activeServiceContext = new RMActiveServiceContext();
      rmContext.setActiveServiceContext(activeServiceContext);

       // 用來判斷出錯時是否直接退出程式
      conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);

       //RMSecretManagerService 主要提供了一些Token相關服務
      rmSecretManagerService = createRMSecretManagerService();
      // 這個地方其實是呼叫父類CompositeService的方法,加入serviceList
      addService(rmSecretManagerService);

        // 監控Container是否過期(提交ApplicationMaster時檢查)
      containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
      addService(containerAllocationExpirer);
      rmContext.setContainerAllocationExpirer(containerAllocationExpirer);

        // AM存活監控,繼承自AbstractLivelinessMonitor,過期發生時會觸發回撥函式
      AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
      addService(amLivelinessMonitor);
      rmContext.setAMLivelinessMonitor(amLivelinessMonitor);

        // AM結束監控
      AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
      addService(amFinishingMonitor);
      rmContext.setAMFinishingMonitor(amFinishingMonitor);

      // RM Node標籤管理者
      RMNodeLabelsManager nlm = createNodeLabelManager();
      addService(nlm);
      rmContext.setNodeLabelManager(nlm);

      boolean isRecoveryEnabled = conf.getBoolean(
          YarnConfiguration.RECOVERY_ENABLED,
          YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);

        // 管理RM狀態的儲存,我們用的是ZKRMStateStore
        // 這個配置是 yarn.resourcemanager.sotre.class
      RMStateStore rmStore = null;
      if (isRecoveryEnabled) {
        recoveryEnabled = true;
        rmStore = RMStateStoreFactory.getStore(conf);
        boolean isWorkPreservingRecoveryEnabled =
            conf.getBoolean(
              YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
              YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
        rmContext
          .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled);
      } else {
        recoveryEnabled = false;
        rmStore = new NullRMStateStore();
      }

      try {
        rmStore.init(conf);
        rmStore.setRMDispatcher(rmDispatcher);
        rmStore.setResourceManager(rm);
      } catch (Exception e) {
        // the Exception from stateStore.init() needs to be handled for
        // HA and we need to give up master status if we got fenced
        LOG.error("Failed to init state store", e);
        throw e;
      }
      rmContext.setStateStore(rmStore);

      if (UserGroupInformation.isSecurityEnabled()) {
        delegationTokenRenewer = createDelegationTokenRenewer();
        rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
      }

        // 持久化RMApp, RMAppAttempt, RMContainer的資訊
      RMApplicationHistoryWriter rmApplicationHistoryWriter =
          createRMApplicationHistoryWriter();
      addService(rmApplicationHistoryWriter);
      rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);

        // 生產系統指標資料
      SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher();
      addService(systemMetricsPublisher);
      rmContext.setSystemMetricsPublisher(systemMetricsPublisher);

      // Node列表管理器,還用rmDispatcher註冊了一個NodesListManagerEventType事件處理(節點可用\不可用)
      nodesListManager = new NodesListManager(rmContext);
      rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
      addService(nodesListManager);
      rmContext.setNodesListManager(nodesListManager);

      // ResourceScheduler 排程器的建立,他的子類之一就是FairScheduler
      scheduler = createScheduler();
      scheduler.setRMContext(rmContext);
      addIfService(scheduler);
      rmContext.setScheduler(scheduler);

        // 用rmDispatcher註冊了一個SchedulerEventType事件處理
      schedulerDispatcher = createSchedulerEventDispatcher();
      addIfService(schedulerDispatcher);
      rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);

      // Register event handler for RmAppEvents(App事件)
      rmDispatcher.register(RMAppEventType.class,
          new ApplicationEventDispatcher(rmContext));

      // Register event handler for RmAppAttemptEvents(App嘗試事件)
      rmDispatcher.register(RMAppAttemptEventType.class,
          new ApplicationAttemptEventDispatcher(rmContext));

      // Register event handler for RmNodes(RM節點事件)
      rmDispatcher.register(
          RMNodeEventType.class, new NodeEventDispatcher(rmContext));

        //NM存活監控
      nmLivelinessMonitor = createNMLivelinessMonitor();
      addService(nmLivelinessMonitor);

      /**
       * 建立資源管理服務。處理來自NodeManager的請求,主要包括兩種請求:註冊和心跳.
       * 其中,註冊是NodeManager啟動時發生的行為,請求包中包含節點ID,可用的資源上限等資訊;
       * 而心跳是週期性行為,包含各個Container執行狀態,執行的Application列表、節點健康狀況(可通過一個指令碼設定),
       * 以上請求呼叫通過hadoop自己實現的一套RPC協議實現,具體看看YarnRPC。
       */
      resourceTracker = createResourceTrackerService();
      addService(resourceTracker);
      rmContext.setResourceTrackerService(resourceTracker);

        // 監控jvm執行狀況,異常就記錄日誌
      DefaultMetricsSystem.initialize("ResourceManager");
      JvmMetrics jm = JvmMetrics.initSingleton("ResourceManager", null);
      pauseMonitor = new JvmPauseMonitor();
      addService(pauseMonitor);
      jm.setPauseMonitor(pauseMonitor);

      // Initialize the Reservation system