1. 程式人生 > >Yarn原始碼分析之MRAppMaster:作業執行方式Local、Uber、Non-Uber

Yarn原始碼分析之MRAppMaster:作業執行方式Local、Uber、Non-Uber

        基於作業大小因素,MRAppMaster提供了三種作業執行方式:本地Local模式、Uber模式、Non-Uber模式。其中,

        1、本地Local模式:通常用於除錯;

        2、Uber模式:為降低小作業延遲而設計的一種模式,所有任務,不管是Map Task,還是Reduce Task,均在同一個Container中順序執行,這個Container其實也是MRAppMaster所在Container;

        3、Non-Uber模式:對於執行時間較長的大作業,先為Map Task申請資源,當Map Task執行完成數目達到一定比例後再為Reduce Task申請資源。

         在Yarn中,作業執行的資源,統一被抽象為容器Container,在MRAppMaster中關於作業執行時需要的資源的分配與載入程式碼中,容器分配申請服務、容器分配完成後載入服務中,都有關於Uber模式和Non-Uber模式的處理,如下:

        1、容器分配申請路由服務

         容器分配申請路由服務ContainerAllocatorRouter繼承自AbstractService,是Hadoop中一個典型的服務,其正常提供服務需要經歷初始化init、啟動start等過程,而在服務啟動的serviceStart()方法中,存在以下關於Uber模式和Non-Uber模式的處理:

      // 如果Job在Uber模式下執行,啟動構造容器分配器LocalContainerAllocator例項
      if (job.isUber()) {
        MRApps.setupDistributedCacheLocal(getConfig());
        this.containerAllocator = new LocalContainerAllocator(
            this.clientService, this.context, nmHost, nmPort, nmHttpPort
            , containerID);
      } else {
    	  
    	// 否則構造RM容器分配器RMContainerAllocator例項
        this.containerAllocator = new RMContainerAllocator(
            this.clientService, this.context);
      }

        可見,如果Job在Uber模式下執行,啟動構造容器分配器LocalContainerAllocator例項,否則構造RM容器分配器RMContainerAllocator例項。而LocalContainerAllocator代表的是本地容器分配器,其構造過程中傳入的containerID為MRAppMaster的成員變數containerID,什麼意思呢?不就正好說明LocalContainerAllocator所使用的容器,也就是Uber模式下所使用的容器,就是MRAppMaster所在Container,與上面所介紹的Uber模式正好一致,而Non-Uber模式下則需要使用Yarn的RMContainerAllocator,通過與ResourceManager進行通訊來申請容器的分配,總的原則就是:先為Map Task申請資源,當Map Task執行完成數目達到一定比例後再為Reduce Task申請資源。

        2、容器載入路由服務

        容器載入路由服務ContainerLauncherRouter同樣繼承自AbstractService,也是Hadoop中一個典型的服務,我們同樣看下服務啟動serviceStart()方法,如下:

      // 如果Job在Uber模式下執行,啟動構造本地容器載入器LocalContainerLauncher例項
      if (job.isUber()) {
        this.containerLauncher = new LocalContainerLauncher(context,
            (TaskUmbilicalProtocol) taskAttemptListener);
      } else {
    	// 否則,構造容器載入器ContainerLauncherImpl例項
        this.containerLauncher = new ContainerLauncherImpl(context);
      }
        也是針對Uber模式和Non-Uber模式分別處理,如果Job在Uber模式下執行,啟動構造本地容器載入器LocalContainerLauncher例項;否則,構造容器載入器ContainerLauncherImpl例項。

        另外,由於Uber模式下不管是Map Task,還是Reduce Task,均在同一個Container中順序執行,所以MapReduce的推測執行機制對於Uber模式是不適用的,故在MRAppMaster服務啟動的serviceStart()方法中,對於Uber模式,會禁用推測執行機制,相關程式碼如下:

      if (job.isUber()) {
    	// Uber模式下禁用推測執行機制,即Disable Speculation
        speculatorEventDispatcher.disableSpeculation();
        LOG.info("MRAppMaster uberizing job " + job.getID()
            + " in local container (\"uber-AM\") on node "
            + nmHost + ":" + nmPort + ".");
      } else {
        // send init to speculator only for non-uber jobs. 
        // This won't yet start as dispatcher isn't started yet.
    	// Non-Uber模式下發送SpeculatorEvent事件,初始化speculator
        dispatcher.getEventHandler().handle(
            new SpeculatorEvent(job.getID(), clock.getTime()));
        LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
            + "job " + job.getID() + ".");
      }
        可以看到,Uber模式下禁用推測執行機制,即Disable Speculation,Non-Uber模式下發送SpeculatorEvent事件,初始化speculator,因此,對於Uber模式,會禁用推測執行機制。

        在作業的抽象實現JobImpl中,會針對Uber模式進行一些特定引數設定,如下:

    if (isUber) {
      LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
          + numReduceTasks + "r tasks (" + dataInputLength
          + " input bytes) will run sequentially on single node.");

      // make sure reduces are scheduled only after all map are completed
      // mapreduce.job.reduce.slowstart.completedmaps引數設定為1,
      // 即全部Map任務完成後才會為Reduce任務分配資源
      conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
                        1.0f);
      // uber-subtask attempts all get launched on same node; if one fails,
      // probably should retry elsewhere, i.e., move entire uber-AM:  ergo,
      // limit attempts to 1 (or at most 2?  probably not...)
      // 引數mapreduce.map.maxattempts、mapreduce.reduce.maxattempts設定為1,即Map、Reduce任務的最大嘗試次數均為1
      conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
      conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);

      // disable speculation
      // 引數mapreduce.map.speculative、mapreduce.reduce.speculative設定為false,即禁用Map、Reduce任務的推測執行機制
      conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
      conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
    }
        主要包括:

        1、mapreduce.job.reduce.slowstart.completedmaps引數設定為1,即全部Map任務完成後才會為Reduce任務分配資源;

        2、引數mapreduce.map.maxattempts、mapreduce.reduce.maxattempts設定為1,即Map、Reduce任務的最大嘗試次數均為1;

        3、引數mapreduce.map.speculative、mapreduce.reduce.speculative設定為false,即禁用Map、Reduce任務的推測執行機制;