1. 程式人生 > >MapReduce剖析筆記之五:Map與Reduce任務分配過程

MapReduce剖析筆記之五:Map與Reduce任務分配過程

轉載:https://www.cnblogs.com/esingchan/p/3940565.html

在上一節分析了TaskTracker和JobTracker之間通過週期的心跳訊息獲取任務分配結果的過程。中間留了一個問題,就是任務到底是怎麼分配的。任務的分配自然是由JobTracker做出來的,具體來說,存在一個抽象類:TaskScheduler,主要負責分配任務,繼承該類的有幾個類:

CapacityTaskScheduler、FairScheduler、JobQueueTaskScheduler(LimitTasksPerJobTaskScheduler又繼承於該類)。

從名字大致可以看出,CapacityTaskScheduler應該是根據容量進行分配,FairScheduler實現相對公平的一些分配,JobQueueTaskScheduler似乎是按照佇列分配,難道是根據佇列的任務按照順序來分配?那豈不是沒有優先順序、搶佔等等的概念了,微觀來看,多執行緒併發執行的時候,執行緒還有個搶佔機制呢,Map或Reduce任務最終實際上就是個JAVA程序,只是一個分散式的,其分配策略應該與作業系統中的執行緒分配有類似的思想,不過沒研究過作業系統裡執行緒是咋分配的,所以這裡也無法對比,這裡只是提一句,任務分配作為一個大到社會資源,小到CPU資源,都有類似的分配過程,理解其核心策略才是最重要的,Hadoop裡也不可能突破人類認知範圍,出現十全十美的分配策略,只是一種適應某種場景的分配策略。

MapReduce裡預設是使用JobQueueTaskScheduler分配任務的,下文進行分析。

 

任務分配過程在JobQueueTaskScheduler的assignTasks方法中。

這個方法的輸入引數是TaskTracker taskTracker,也就是對某個TaskTracker進行分配。

首先來看它第一步做什麼。

複製程式碼

    TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); 
    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
    final int numTaskTrackers = clusterStatus.getTaskTrackers();
    final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
    final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();

    Collection<JobInProgress> jobQueue =
      jobQueueJobInProgressListener.getJobQueue();

複製程式碼

首先,獲得該TaskTracker的狀態,這個狀態是通過心跳響應從TaskTracker傳給JobTracker的,這個也好理解,只有TaskTracker知道自己處於什麼狀態,通過心跳把自身的狀態上報。

之後,獲得叢集狀態。ClusterStatus是一個描述叢集狀態的類,用哪些引數來描述叢集的狀態呢?下面是他的主要變數:

複製程式碼

public class ClusterStatus implements Writable {

  private int numActiveTrackers;
  private Collection<String> activeTrackers = new ArrayList<String>();
  private Collection<String> blacklistedTrackers = new ArrayList<String>();
  private Collection<String> graylistedTrackers = new ArrayList<String>();
  private int numBlacklistedTrackers;
  private int numGraylistedTrackers;
  private int numExcludedNodes;
  private long ttExpiryInterval;
  private int map_tasks;
  private int reduce_tasks;
  private int max_map_tasks;
  private int max_reduce_tasks;
  private JobTracker.State state;

複製程式碼

可以看出,主要有當前活動的TaskTracker(numActiveTrackers),處於黑名單和灰名單的TaskTracker,這個概念在上一節分析時提過,主要是指一些TaskTracker在應該心跳的時候可能沒出現心跳,可能原因是機器出現一些故障了,效能降低了等等,總之,在JobTracker這個總管看來,凡是沒有在該出現的時間出現的小弟,就被認為是不信任的小弟,於是加入黑名單。

另外,還有當前Map,Reduce的任務數量,以及叢集最大的Map,Reduce任務數量。看起來也比較簡單。

之後,獲取當前的作業佇列。

複製程式碼

    //
    // Get map + reduce counts for the current tracker.
    //
    final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots();
    final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots();
    final int trackerRunningMaps = taskTrackerStatus.countMapTasks();
    final int trackerRunningReduces = taskTrackerStatus.countReduceTasks();

    // Assigned tasks
    List<Task> assignedTasks = new ArrayList<Task>();

複製程式碼

之後,獲取TaskTracker的最大容量,即Map最大數量、Reduce最大數量、以及正在執行著的Map、Reduce數量。

接著,計算當前佇列裡所有需要分配的任務數量(與TaskTracker無關,理論上,所有任務都有可能分配到這個TaskTracker)。

複製程式碼

    int remainingReduceLoad = 0;
    int remainingMapLoad = 0;
    synchronized (jobQueue) {
      for (JobInProgress job : jobQueue) {
        if (job.getStatus().getRunState() == JobStatus.RUNNING) {
          remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
          if (job.scheduleReduces()) {
            remainingReduceLoad += 
              (job.desiredReduces() - job.finishedReduces());
          }
        }
      }
    }

複製程式碼

根據這個總數,除以叢集的全部容量(就是全部TaskTracker的容量之和),可以得到目前Map和Reduce的負載因子,這實際上是評估目前叢集的負載壓力:

複製程式碼

    // Compute the 'load factor' for maps and reduces
    double mapLoadFactor = 0.0;
    if (clusterMapCapacity > 0) {
      mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
    }
    double reduceLoadFactor = 0.0;
    if (clusterReduceCapacity > 0) {
      reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
    }

複製程式碼

整個叢集的負載壓力跟對某個TaskTracker分配任務有什麼關係呢?試想一下,如果我們要給某個TaskTracker分配任務,總不能把很多工都分給他把,應該要使他的負載壓力跟整個叢集的負載情況大致差不多,這樣才算做到了負載均衡把?所以先得到叢集的負載情況,可以為後面TaskTracker的負載情況進行一個對比。接著往下看:

    final int trackerCurrentMapCapacity = 
      Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), 
                              trackerMapCapacity);
    int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;

這句話是計算要分配任務這個TaskTracker的當前容量,獲得目前可以分配的Map數量。可以想見,一個TaskTracker的容量實際上事先是指定好的,是靜態的,比如可以執行多少個Map,多少個Reduce,但當前的容量,實際上是根據叢集負載情況計算得到的,就是前面說的那個原則,雖然某個機器的最大容量很大,但因為當前負載可能比較低,所以也不能拼命給他分配任務,否則後面如果有大量任務來了(叢集整體負載上去了)這個機器就分配不了多少任務,但有可能上面有不少資料,這些資料要計算,豈不是要搬到其他機器上去算?IO消耗必然帶來計算耗時加大。

好了,在得到TaskTracker中可以分配的Map數量後,是不是就可以從Job佇列裡取出Map隨意進行分配了呢?還不是,還需要進行一個檢測,Hadoop認為需要為一些任務預留一些資源,哪些任務需要預留資源呢?從原始碼的解釋來看,主要是:failures and speculative executions,直觀上看,其中文含義是失敗的任務和預測執行,或者投機任務。失敗的任務好理解,就是一個任務在某個機器上執行失敗了,應該重新排程執行,而不應該不管了;預測執行到底是什麼意思呢?要理解這句話,需要理解Hadoop裡面的一個優化方式。我們知道,一個作業會分為很多Map任務併發執行,當執行完了Map後才開始Reduce任務的執行,因此,一個作業的執行時間取決於Map的時間和Reduce的時間之和。對於Map或Reduce階段,因為有很多工同時執行,這個階段的執行時間就取決於執行最慢的那個Map或Reduce任務,為了加速執行,對於很慢的那些Map或Reduce任務,Hadoop會啟動多份,執行相同的程式碼,那麼Hadoop怎麼知道哪個任務執行最慢呢?自然是靠心跳的時候TaskTracker把各個任務的進度彙報上來,當大部分都執行完了,就剩幾個進度很慢的任務,就可能會啟動推測執行,實際上個人覺得叫冗餘執行更好理解。在Hadoop裡一個在執行的任務稱為Task Attemt,Task就像一個類,而Task Attemt就如同一個例項化的物件,是任務的一次實現。這種實現可以有多個,因為啟動了多個,那麼,最先完的那個就代表這個任務執行完了,其它還沒執行完的就停了就行了。是一種用空間換時間的策略,如果不保留一點資源,當出現任務很慢的時候,想換都沒得換了。失敗的任務也類似,一個失敗的任務,需要排程到其它機器上重新執行一遍,可能就成功了。所以叢集內的機器需要保留一小部分資源,防止出現這些情況時需要利用資源。

這種預留資源稱為Padding,不能把一臺叢集內的資源全部都分完了,如果出現這種狀況,我稱之為到達了預警狀態,此時就得慢慢分才行,一次心跳只給一個機器分配一個任務就返回,否則可以分配多個任務返回。那麼, 到底要預留多少資源呢?從程式碼來看,最多預留當前正在執行任務的1%的資源,也就是由下面的變數控制:

    padFraction = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad", 0.01f);

比如,整個叢集正在執行1000個Map任務,此時最大容量是多少算到達了預警狀態呢?Hadoop裡認為跟要分配任務的機器有關,比如目前的機器如果最大容量是4個Map任務,而1000x1%=10個,兩者取其小值,即4,如果當前容量是1004,即到達了預警狀態,此次心跳就只能為這個機器分配1個Map任務。而如果這臺機器最大容量是20,大於10,則如果當前容量是1010,則到達了預警狀態。為什麼這種預警狀態的判斷跟具體分配任務的機器有關呢?1%當然是一個上限,說白了預留的資源也不能太多了。最多預留10個,最少預留多少呢?跟機器容量有關,極端一點,假如每臺機器最多就能執行1個Map任務,那麼最少預留1個。這個似乎也可以理解,如果機器效能都很好,那就多預留點,如果機器效能都不咋地,那就少預留點,但是也不能不預留。判斷是否到達預警狀態的方法如下,其中maxTaskTrackerSlots就是指當前需要分配任務的機器的最大容量:

複製程式碼

  private boolean exceededPadding(boolean isMapTask, 
                                  ClusterStatus clusterStatus, 
                                  int maxTaskTrackerSlots) { 
    int numTaskTrackers = clusterStatus.getTaskTrackers();
    int totalTasks = 
      (isMapTask) ? clusterStatus.getMapTasks() : 
        clusterStatus.getReduceTasks();
    int totalTaskCapacity = 
      isMapTask ? clusterStatus.getMaxMapTasks() : 
                  clusterStatus.getMaxReduceTasks();

    Collection<JobInProgress> jobQueue =
      jobQueueJobInProgressListener.getJobQueue();

    boolean exceededPadding = false;
    synchronized (jobQueue) {
      int totalNeededTasks = 0;
      for (JobInProgress job : jobQueue) {
        if (job.getStatus().getRunState() != JobStatus.RUNNING ||
            job.numReduceTasks == 0) {
          continue;
        }

        //
        // Beyond the highest-priority task, reserve a little 
        // room for failures and speculative executions; don't 
        // schedule tasks to the hilt.
        //
        totalNeededTasks += 
          isMapTask ? job.desiredMaps() : job.desiredReduces();
        int padding = 0;
        if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
          padding = 
            Math.min(maxTaskTrackerSlots,
                     (int) (totalNeededTasks * padFraction));
        }
        if (totalTasks + padding >= totalTaskCapacity) {
          exceededPadding = true;
          break;
        }
      }
    }

    return exceededPadding;
  }

複製程式碼

接下來,JobQueueTaskScheduler會對該機器上所有剩餘容量(也就是還能執行多少個Map或Reduce)進行逐一分配,呼叫的方法是:

          // Try to schedule a Map task with locality between node-local 
          // and rack-local
          t = job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, 
                numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());

即從佇列裡迴圈取出一個JobInProgress物件,執行該方法。當前心跳的TaskTracker是某個伺服器上的一個JAVA程序,這臺伺服器稱為S1,一個Job可能涉及到很多待處理的資料,這些資料可能位於S1的磁碟上,也可能在其他伺服器上,而其他伺服器,可能與S1位於同一機架內,也可能位於其他機架。這個方法的目的就是獲取一個Map任務,這個Map任務要處理的資料要麼在S1的磁碟上,要麼在與S1位於同一機架內的伺服器的磁碟上。因為把這樣的任務排程到S1上,資料就不需要跨機架傳輸了,甚至就在本地。這種原則是Hadoop裡面最為核心的一個原則,稱為資料本地性。

obtainNewNodeOrRackLocalMapTask呼叫了該JobInProgress物件的另一個方法obtainNewMapTaskCommon。
在前面分析Job初始化的時候,我們知道Job初始化過程中根據Split的個數建立了一個map物件陣列:

複製程式碼

    maps = new TaskInProgress[numMapTasks];
    for(int i=0; i < numMapTasks; ++i) {
      inputLength += splits[i].getInputDataLength();
      maps[i] = new TaskInProgress(jobId, jobFile, 
                                   splits[i], 
                                   jobtracker, conf, this, i, numSlotsPerMap);
    }

複製程式碼

可以想見,分配過程應該就是從作業的這個Map陣列中取出Map任務返回。不過取哪些Map呢?難道又是按照佇列來取?因為不同的Map對應於不同的Split,而不同的Split位於不同的機器,考慮到上面說的本地化策略,則不會那麼簡單,需要考慮資料本地性。

obtainNewMapTaskCommon方法裡呼叫了另一個方法findNewMapTask,這個方法是Map任務分配中最核心的方法,理解了這個方法,也就理解了Map任務分配過程。其宣告為:

  private synchronized int findNewMapTask(final TaskTrackerStatus tts, 
                                          final int clusterSize,
                                          final int numUniqueHosts,
                                          final int maxCacheLevel,
                                          final double avgProgress)

其中,tts也就是跟JobTracker保持心跳的那個TaskTracker的狀態,ClusterSize就是當前叢集中所有TaskTracker的數量,numUniqueHosts是指叢集中機器的數量,從這裡可以看出,機器數量和TaskTracker數量還不一樣,一個機器也可以有多個TaskTracker,因為TaskTracker本質上就是一個JAVA程序。maxCacheLevel表示本地化策略,這個變數十分關鍵,控制了findNewMapTask方法中應該執行哪些程式碼。該變數是前面由方法obtainNewNodeOrRackLocalMapTask傳過來的,具體程式碼為:

  public synchronized Task obtainNewNodeOrRackLocalMapTask(
      TaskTrackerStatus tts, int clusterSize, int numUniqueHosts)
  throws IOException {
    return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, maxLevel);
  }

也就是maxLevel,該變數是JobInProgress類中的一個固定變數。預設值是:

    this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;

DEFAULT_HOST_LEVEL的值等於2,即兩個層次,Hadoop將網路拓撲看做一顆樹,主機作為一層,其上層是機架,兩層的意思就是在主機本身和機架內部尋找任務。比如如果這個值是anyCacheLevel:

    this.anyCacheLevel = this.maxLevel+1;

那就意味著,對於某個要分配任務的節點來說,某個任務的處理資料只要是在節點本地的,或者跟這個節點在一個機架內的(還是在一個交換機下面),甚至是隔著一個交換機的(第三層),都可以分配給這個節點,這就失去了本地化的意義了,所以obtainNewNodeOrRackLocalMapTask這個方法顧名思義就是限定在主機和機架內,而通過maxLevel引數設定為2,即可限定查詢範圍。

另外,avgProgress是一個0-1之間的數,表示Map任務的進度,這個引數後面會用來評估哪些Map任務執行進度較慢,以啟動推測執行,後面分析中會看到。avgProgress屬於JobStatus這個類,指的是一個作業中所有Map任務的平均進度。

下面我們仔細分析findNewMapTask這個方法的具體流程。

1,執行shouldRunOnTaskTracker方法,該方法判斷是否應該在這個TaskTracker上分配任務,其程式碼為:

複製程式碼

  private boolean shouldRunOnTaskTracker(String taskTracker) {
    //
    // Check if too many tasks of this job have failed on this
    // tasktracker prior to assigning it a new one.
    //
    int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
    if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) && 
        taskTrackerFailedTasks >= maxTaskFailuresPerTracker) {
      if (LOG.isDebugEnabled()) {
        String flakyTracker = convertTrackerNameToHostName(taskTracker); 
        LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker 
                  + "' for assigning a new task");
      }
      return false;
    }
    return true;
  }

複製程式碼

簡單點說,如果該TaskTracker上執行失敗的任務太多的話,就不適合繼續分配任務了。那麼, 到底什麼叫失敗任務過多呢?即:

taskTrackerFailedTasks >= maxTaskFailuresPerTracker,表明失敗任務過多了。

maxTaskFailuresPerTracker是這麼來的:

  public int getMaxTaskFailuresPerTracker() {
    return getInt("mapred.max.tracker.failures", 4); 
  }

也就是由預設4個以上任務失敗了,就不適合分配任務。當然,這裡還有一個條件:

flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT))

flakyTaskTrackers是超過4個失敗任務的TaskTracker的數量,如果這個數量少於整個叢集大小的25%:

  private static final double CLUSTER_BLACKLIST_PERCENT = 0.25;

那麼就不合適分配,去其它TaskTracker上分配。當然,如果每臺機器都失敗了很多工,那麼可能就不是機器的問題,可能是在某些時候出現了一些有問題的任務,不應該把問題歸結為機器,機器仍然可以分配任務。

 

2,檢查TaskTracker是否有足夠資源來執行任務:

複製程式碼

    // Check to ensure this TaskTracker has enough resources to 
    // run tasks from this job
    long outSize = resourceEstimator.getEstimatedMapOutputSize();
    long availSpace = tts.getResourceStatus().getAvailableSpace();
    if(availSpace < outSize) {
      LOG.warn("No room for map task. Node " + tts.getHost() + 
               " has " + availSpace + 
               " bytes free; but we expect map to take " + outSize);

      return -1; //see if a different TIP might work better. 
    }

複製程式碼

getEstimatedMapOutputSize方法對Map任務的輸出需要佔的空間進行估計,這裡的空間是指磁碟大小,如果一個機器磁碟空間都不夠,而Map任務執行後的中間結果不存入HDFS,而是存入本地檔案系統(MapReduce的設計原則之一),就會導致Map任務失敗。不過這個任務還沒執行,怎麼知道要輸出多少中間資料呢?Hadoop是根據歷史經驗來判斷的,以前的Map任務輸出了多少內容,在TaskStatus這個類裡面會記錄,而ResourceEstimator這個類就是進行空間估計的,裡面有一個方法:

複製程式碼

  protected synchronized void updateWithCompletedTask(TaskStatus ts, 
      TaskInProgress tip) {

    //-1 indicates error, which we don't average in.
    if(tip.isMapTask() &&  ts.getOutputSize() != -1)  {
      completedMapsUpdates++;

      completedMapsInputSize+=(tip.getMapInputSize()+1);
      completedMapsOutputSize+=ts.getOutputSize();

      if(LOG.isDebugEnabled()) {
        LOG.debug("completedMapsUpdates:"+completedMapsUpdates+"  "+
                  "completedMapsInputSize:"+completedMapsInputSize+"  " +
                  "completedMapsOutputSize:"+completedMapsOutputSize);
      }
    }
  }

複製程式碼

可以看出,這個類對以前任務執行的結果進行統計,就得到了平均一個Map任務到底會佔用多少本地空間。而估計新Map任務所佔空間的方法也就依賴於此,就不細講了。

另外,TaskTracker會把自己的剩餘空間通過心跳傳遞過來,兩個比一下,如果不夠,自然就不合適分配任務。

 進行了一些預先檢測後,要開始排程任務了,任務的排程順序是怎樣的呢?是不是從JobInProgress佇列裡面逐一取出Job,然後再從該Job裡面的TaskInProgress佇列裡逐一取出Map任務,分給這臺機器呢?並不是,Hadoop把佇列裡的任務分為四類進行排程,首先對執行失敗的任務進行排程,如果一個任務執行失敗了,按理來說應該馬上再次執行,因為這類任務時間比較緊迫,而且Hadoop的JobTracker在排程這類任務的時候並不區分資料本地性,也就是不管這類任務要處理的資料是不是在跟自己心跳的這臺機器(或者是不是一個機架內等等),總之都排程給他。換句話說,Map階段實際上也是有可能存在資料從一臺機器拷貝到另一臺機器的可能,並不是嚴格的資料本地性。

不過有一個問題,如果一個任務在機器A上已經執行失敗了,再排程給它很可能還會失敗,所以排程的都是其它機器執行失敗的任務。

失敗的任務優先順序最高。之後是排程那些還沒執行的任務,也就是一般的正在等待中的任務,之後是推測任務,也就是那些執行得很慢的任務,需要冗餘執行。在排程還沒執行的任務、以及正在執行的但是很慢的任務(正在執行的速度也不錯的任務當然就不需要排程了)的過程中,會遵從先本地、再機架、再機架外、最後無位置資訊的任務查詢順序,這就是資料本地性的實現方式,按照優先順序逐步查詢任務。簡單點說,心跳節點知道IP地址了,一個任務與Split緊密對應,而Split的位置資訊可以由HDFS提供出來,兩者一比較就知道這個任務執行在心跳節點上是否划算了。

 

3,先找失敗的任務,分配給這個節點。

 其程式碼為:

複製程式碼

    // 0) Schedule the task with the most failures, unless failure was on this
    //    machine
    tip = findTaskFromList(failedMaps, tts, numUniqueHosts, false);
    if (tip != null) {
      // Add to the running list
      scheduleMap(tip);
      LOG.info("Choosing a failed task " + tip.getTIPId());
      return tip.getIdWithinJob();
    }

複製程式碼

可見,利用了findTaskFromList這個方法,從failedMaps也就是失敗的Map任務佇列裡取出任務。failedMaps是一個SortedSet<TaskInProgress> 物件,存在一個自定義的排序準則,就是任務失敗的次數:this.failedMaps = new TreeSet<TaskInProgress>(failComparator),failComparator的定義如下:

複製程式碼

  // keep failedMaps, nonRunningReduces ordered by failure count to bias
  // scheduling toward failing tasks
  private static final Comparator<TaskInProgress> failComparator =
    new Comparator<TaskInProgress>() {
      @Override
      public int compare(TaskInProgress t1, TaskInProgress t2) {
        if (t1 == null) return -1;
        if (t2 == null) return 1;
        
        int failures = t2.numTaskFailures() - t1.numTaskFailures();
        return (failures == 0) ? (t1.getTIPId().getId() - t2.getTIPId().getId())
            : failures;
      }
    };

複製程式碼

也就是說,失敗次數最多的那些Map任務被先排程。

後面我們可以看到,後面的幾類任務,都是使用了findTaskFromList這個方法,所以對這個方法進行分析。這個方法輸入引數的說明為:

複製程式碼

  /**
   * Find a non-running task in the passed list of TIPs
   * @param tips a collection of TIPs
   * @param ttStatus the status of tracker that has requested a task to run
   * @param numUniqueHosts number of unique hosts that run trask trackers
   * @param removeFailedTip whether to remove the failed tips
   */

複製程式碼

即從一個任務集合裡面找出一個任務。整個過程就是迴圈地對集合裡面的任務進行判斷。其程式碼為:

複製程式碼

    Iterator<TaskInProgress> iter = tips.iterator();
    while (iter.hasNext()) {
      TaskInProgress tip = iter.next();

      // Select a tip if
      //   1. runnable   : still needs to be run and is not completed
      //   2. ~running   : no other node is running it
      //   3. earlier attempt failed : has not failed on this host
      //                               and has failed on all the other hosts
      // A TIP is removed from the list if 
      // (1) this tip is scheduled
      // (2) if the passed list is a level 0 (host) cache
      // (3) when the TIP is non-schedulable (running, killed, complete)
      if (tip.isRunnable() && !tip.isRunning()) {
        // check if the tip has failed on this host
        if (!tip.hasFailedOnMachine(ttStatus.getHost()) || 
             tip.getNumberOfFailedMachines() >= numUniqueHosts) {
          // check if the tip has failed on all the nodes
          iter.remove();
          return tip;
        } else if (removeFailedTip) { 
          // the case where we want to remove a failed tip from the host cache
          // point#3 in the TIP removal logic above
          iter.remove();
        }
      } else {
        // see point#3 in the comment above for TIP removal logic
        iter.remove();
      }
    }
    return null;

複製程式碼

可見,如果一個任務可以執行,並且沒有在執行,那麼再判斷兩個條件,如果沒有在這個機器上失敗過,那麼表明可以排程;或者已經在所有機器上都失敗了,那也可以排程。因為所有機器上都失敗了,應該不是機器的問題了,可能是任務本身存在什麼問題,不管怎樣,既然失敗了,還是繼續排程,如果多執行幾次都失敗了,那當然就沒法了。可能就停止了,向用戶報錯。

 

4,因為findNewMapTask這個方法返回的是某個Job的Map任務的編號,所以如果沒有找到失敗的任務,就接著排程那些未執行的普通任務。從前面傳進來的引數來看,只查詢那些資料位於本地或者同一機架內的任務,排程到心跳TaskTracker執行。

在這類任務的排程裡,考慮了資料本地性,我們對資料本地性再進行一些解釋,這個概念在Hadoop是如此重要,值得大書特書。Hadoop裡的資料本地性反應的是要執行任務的機器與這個任務處理的資料所在機器之間的距離,這兩臺機器如果是一臺機器當然最好,就省得跨機器搬移資料了,這稱為local;如果兩臺機器不是一臺,不過位於一個機架,那麼就稱為Rack-Local,熟悉資料中心機房的同學都知道,目前機房的一般方案是有很多機架,每個機架放著很多1U、2U(不知道1U是多高的建議百度,這是基本概念)的伺服器,一個機架可以放多達幾十臺伺服器,一般有一個交換機,稱為Top-Of-Rack,交換機都放在一個機架上面,這個機架的所有伺服器都用乙太網線或者光纜往上走線連線到這個交換機(比如24口等等),也就是說,在一個機架內部兩臺伺服器要通訊,需要經過這個交換機一次,如果兩個機架的兩臺伺服器要通訊,則至少需要經過兩個交換機,交換機之間的走線在機架上面。所以,除了Rack-Local,稍微遠點的稱為Off-Switch,如果繼續細分,還可以分為兩臺伺服器是否在一個機房,還是跨越了機房等等。因此,Hadoop把這種呈現樹狀的結構稱為網路拓撲(NetworkTopology),網路拓撲中的伺服器或者機架稱為節點(Node),需要注意的是Hadoop所說的Node並不一定是一臺伺服器,可以是一個機架,甚至是一個數據中心,都抽象為一個節點。節點存在一個名稱,比如"/default-rack"是一個節點的名字,可以表示為一個機架,機架下面的伺服器Server1可以表示為"/default-rack/Server1",而這個伺服器的父節點即為這個機架。這樣的好處是如果知道了伺服器擁有哪些本地任務,那麼通過計算就得到機架擁有哪些任務,其實就是機架下面這些伺服器所有任務的並集。

說到這裡,可能會有一個疑問,Hadoop怎麼知道哪些伺服器在同一個機架下面呢?實際上這是需要使用者按照一個格式寫個配置指令碼,Hadoop通過解析這個指令碼知道的,於是知道了伺服器之間的距離,比如XML裡,一個機架標籤裡面把伺服器的IP地址全部填進去即可。

有了上面的背景知識,需要著重理解Hadoop裡面的關鍵物件。在JobTracker中有幾個比較重要的物件,首先一個是: 

// NetworkTopology Node to the set of TIPs
  Map<Node, List<TaskInProgress>> nonRunningMapCache;

這個變數記錄了該節點對應的任務。比如對於一個伺服器而言,其對應的Map任務就是那些處理資料位於該伺服器的任務,也就是本地任務;舉例而言,假如一個作業需要對一個大檔案進行處理,該大檔案分為5份:F1,F2,F3,F4,F5,分別儲存於3臺伺服器:S1、S2、S3,對應儲存關係(依賴於HDFS告知MapReduce)為:

S1:F1、F2、F3、F4

S2:F2、F3、F4、F5

S3:F3、F4、F5、F1

那麼,S1這個節點對應的任務也就是處理F1、F2、F3、F4的這四個任務;S2這個節點對應的任務也就是處理F2、F3、F4、F5的這四個任務。

Job在初始化的時候,會根據Splits(即=5)分別建立5個任務,即5個TaskInProgress物件,然後會執行一個createCache方法,將這些任務分別與節點對應起來:

  private Map<Node, List<TaskInProgress>> createCache(
                                 TaskSplitMetaInfo[] splits, int maxLevel)
                                 throws UnknownHostException {
    Map<Node, List<TaskInProgress>> cache = 
      new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);

其中maxLevel也就是記錄到哪個級別,預設是    this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL,即為2,就是我們前面提到的,本機和本機架兩個層次。也就是記錄了Local和Rack-Local這兩個級別。對應於上面的例子,第一級就是記錄S1等與各個任務的關係,第二級會記錄S1、S2、S3這些伺服器的所有父節點(也就是機架)對應的任務,顯然,因為一個機架會有很多伺服器,所以一個機架會對應很多工。比如S1、S2,S3屬於兩個機架C1、C2,那麼,會有以下對應關係:

C1:對應於S1和S2任務的並集;

C2:對應於S3的任務。

最終,nonRunningMapCache這個Map就記錄了以下內容:

S1、S2、S3對應的任務,以及C1、C2對應的任務。

為什麼要這麼記錄下來呢?理論來說,在任務分配的時候,逐一取出Job,然後逐一取出Job裡面事先建立好的TaskInProgress,通過查詢其Split可以獲知該任務位於哪個機器,然後再根據網路拓撲計算心跳機器與該機器之間的距離,不過這樣做豈不是耗時較長,本來,任務分配就是當心跳發生時JobTracker執行的,速度當然越快越好,所以Hadoop的策略是當Job初始化時,就建立好任務,並且將這些任務與機器的對應關係快取起來,所以命名為nonRunningMapCache,即還未執行的Map任務的快取資訊。當某個機器與JobTracker心跳時,因為這個對映表的索引就是Node,如果要查詢本地任務,則可以以TaskTracker對應的Node為索引,就快速獲得了本地任務;而如果要查詢機架內任務,則獲得其父節點直接查詢即可,進而快速完成滿足資料本地性的任務分配。
所以,在我們分析任務分配的時候,一定要有以下基本認識:

1)一個JobTracker有一個JobInProgress佇列,所有Job在初始化的時候,都會建立TaskInProgress物件陣列(作業是佇列,存在到達順序;任務是陣列,無先後順序),每個JobInProgress會有以下物件陣列:

  TaskInProgress maps[] = new TaskInProgress[0];
  TaskInProgress reduces[] = new TaskInProgress[0];
  TaskInProgress cleanup[] = new TaskInProgress[0];
  TaskInProgress setup[] = new TaskInProgress[0];

其中,cleanup和setup用於任務執行的開始和結束時候的初始化和清理等工作。

2)雖然上面的任務陣列已經記錄了該作業的所有任務,但是還存在一些資料結構對這些任務進行了另外形式的記錄,首先是目前還未執行的Map任務的對映表,以節點Node為索引,將所有Map任務打亂了重新記錄,以提升後面任務分配的速度,如果已經分配了,這些任務就會從這個表裡面移除掉;以及正在執行的Map對映表;以及非本地的Map,這些Map任務的執行比較倒黴,並不是以資料本地化進行執行的,其原因有可能是那臺伺服器已經沒有計算資源了,或者執行失敗了被排程到其它機器了等等;另外還會記錄失敗的任務。這些所有的資料結構幾乎都是為了更好地對任務進行合理、高效分配而建立的。仔細想想,假如我們要來寫這一段任務分配程式碼,我們自然而然也會加入一些額外的資料結構來記錄這些任務。

複製程式碼

  // NetworkTopology Node to the set of TIPs
  Map<Node, List<TaskInProgress>> nonRunningMapCache;
  // Map of NetworkTopology Node to set of running TIPs
  Map<Node, Set<TaskInProgress>> runningMapCache;
  // A list of non-local, non-running maps
  final List<TaskInProgress> nonLocalMaps;
  // Set of failed, non-running maps sorted by #failures
  final SortedSet<TaskInProgress> failedMaps;
  // A set of non-local running maps
  Set<TaskInProgress> nonLocalRunningMaps;
  // A list of non-running reduce TIPs
  Set<TaskInProgress> nonRunningReduces;
  // A set of running reduce TIPs
  Set<TaskInProgress> runningReduces;

複製程式碼

有了上面的認識,我們來看分配一個普通任務的過程。繼續分析findNewMapTask方法。

首先獲得心跳機器對應的節點,該節點顯然是一個伺服器節點(不是機架節點):

    Node node = jobtracker.getNode(tts.getHost());

另外,注意tts表示TaskTrackerStatus,是心跳的那臺TaskTracker主動發過來的。

獲得了該節點後,就以該Node為索引,去剛才分析過的那個對映表nonRunningMapCache裡面查詢該節點對應的任務,這些任務是該節點的本地任務,所以應該說是排程的首選,如果不存在這類任務,那麼就取出該節點的父節點對應的任務列表,也就是說,如果有TaskTracker對應的本地任務,那就最好;如果沒有,那就找出那些與TaskTracker位於同一機架內的任務,也行,其程式碼為:

複製程式碼

      for (level = 0;level < maxLevelToSchedule; ++level) {
        List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
        if (cacheForLevel != null) {
          tip = findTaskFromList(cacheForLevel, tts, 
              numUniqueHosts,level == 0);
          if (tip != null) {
            // Add to running cache
            scheduleMap(tip);

            // remove the cache if its empty
            if (cacheForLevel.size() == 0) {
              nonRunningMapCache.remove(key);
            }

            return tip.getIdWithinJob();
          }
        }
        key = key.getParent();
      }

複製程式碼

其中,level從0開始到maxLevelToSchedule表示按照本地、機架的順序依次取出任務來進行分配。因為findNewMapTask每次返回一個任務號,所以如果存在本地任務,自然優先返回這類任務。

上面的maxLevelToSchedule是方法引數maxCacheLevel和maxLevel的較小者。從程式碼來看,預設都是2。也就是會獲取Level=0,1兩種情況下的任務。也就是本地或同一個機架內的任務。

當獲得了一個任務後,會把它從nonRunningMapCache中移除掉,這個程式碼在findTaskFromList這個方法裡,前面已經看過。

獲得了任務,就執行scheduleMap這個方法,這個方法顧名思義就是排程任務的意思,實際上,所謂的排程任務,就是把這個任務加入到正在執行任務的那個資料結構中,沒有特別的東西,完全只是記錄改變了。前面看到,除了nonRunningMapCache這個對映表,還有runningMapCache和nonLocalRunningMaps兩個資料結構,分別記錄當前有位置資訊的正在執行的Map任務,以及無位置資訊的正在執行的Map任務。

任務排程的部分程式碼為(存在位置資訊時):

複製程式碼

    for(String host: splitLocations) {
      Node node = jobtracker.getNode(host);

      for (int j = 0; j < maxLevel; ++j) {
        Set<TaskInProgress> hostMaps = runningMapCache.get(node);
        if (hostMaps == null) {
          // create a cache if needed
          hostMaps = new LinkedHashSet<TaskInProgress>();
          runningMapCache.put(node, hostMaps);
        }
        hostMaps.add(tip);
        node = node.getParent();
      }
    }

複製程式碼

可見, 是對於所有Split的主機集合,獲得該主機的節點,之後從runningMapCache中去找找有沒有(比如已經處於執行狀態了,有的任務可能排程多次,執行多個),如果沒有就加進去。

另外,從上面的程式碼key = key.getParent()可以看出,如果沒有本地性的任務,就按照距離由近及遠尋找其他任務。這樣,排程本地化(但不算完全資料本地性)的任務就結束了。

另外,從下面最後一行比較關鍵的程式碼可以看出,尋找本地任務和機架內節點任務的工作就結束了,程式不會往下執行:

複製程式碼

    if (node != null) {
      Node key = node;
      int level = 0;
      // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
      // called to schedule any task (local, rack-local, off-switch or
      // speculative) tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if
      // findNewMapTask is (i.e. -1) if findNewMapTask is to only schedule
      // off-switch/speculative tasks
      int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
      for (level = 0;level < maxLevelToSchedule; ++level) {
。。。。。。。。。。。
      // Check if we need to only schedule a local task (node-local/rack-local)
      if (level == maxCacheLevel) {
        return -1;
      }
    }

複製程式碼

因為level從0到達maxLevelToSchedule的迴圈後,level的值等於maxLevelToSchedule,而因為傳入的引數maxCacheLevel=maxLevel,maxLevel=2,也就是兩者相同,level最後的值就等於maxCacheLevel,所以如果在本地和機架內沒有找到任務,就返回-1,也就是沒有找到。至於後面的程式碼,下面的分析可以看到也會得到執行,只不過是賦了新的引數,又重新呼叫了這個方法,執行後面的程式碼。

 

5,findNewMapTask這個核心方法暫時告一段落,實際上我們分析了前半部分,前半部分的功能是在本地和機架內尋找任務。該方法會返回一個任務號(return tip.getIdWithinJob(); 沒有則返回-1,見上面的程式碼)。我們假設已經在本地或機架內找到了任務(如果沒找到下面再分析),之後回到JobInProgress的方法obtainNewMapTaskCommon中。

接下來的程式碼為:

複製程式碼

    Task result = maps[target].getTaskToRun(tts.getTrackerName());
    if (result != null) {
      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
      // DO NOT reset for off-switch!
      if (maxCacheLevel != NON_LOCAL_CACHE_LEVEL) {
        resetSchedulingOpportunities();
      }
    }
    return result;

複製程式碼

也就是建立要執行的任務。getTaskToRun這個方法會建立一個TaskAttemptID,並建立任務物件:

複製程式碼

    Task t = null;
    if (isMapTask()) {
      if(LOG.isDebugEnabled()) {
        LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
          + failedRanges.getIndicesCount());
      }
      t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(),
                      numSlotsNeeded);
    } else {
      t = new ReduceTask(jobFile, taskid, partition, numMaps, 
                         numSlotsNeeded);
    }

複製程式碼

這裡需要注意不同的任務類。TaskInProgress是Job在初始化過程中就建立好了的,而MapTask是在分配任務時建立的,是可以序列化並最後會傳到TaskTracker端的,而TaskInProgress只是JobTracker維護的。基本上可以這麼理解,TaskInProgress用於任務在佇列中等待的階段;MapTask,ReduceTask則記錄一個要執行的任務的主要引數,比如Job檔案位置,任務ID,Split的位置資訊等等。

接下來是JobInProgress對任務的各類狀態、引數進行記錄,在方法addRunningTaskToTIP中,比如任務ID啊,任務名啊,任務的本地特性,有下面一些型別:

enum Locality {
  NODE_LOCAL,
  GROUP_LOCAL,
  RACK_LOCAL,
  OFF_SWITCH
}

從上面可以看到,Hadoop這一版本還不支援資料中心之間的排程策略,不支援跨機房排程,它將跨越了一個機架的那些機器認為都一樣。另外,任務是第一次執行還是推測執行(也就是太慢了被重新排程執行)有下面的列舉變數:

enum Avataar {
  VIRGIN,
  SPECULATIVE
}

Avataar中文是阿凡達,天神下凡的意思,主要有兩種型別,碼農的英文比較詭異,VIRGIN用來表示這是任務的第一次執行,SPECULATIVE表示二次排程執行,這是哪個碼農取的名字?

此時,將建立的Task返回至JobQueueTaskScheduler的assignTasks方法。

 

6,上面我們假設在本地或機架內找到了任務,如果沒有找到任務呢?那麼,上面的getTaskToRun等等程式碼不會得到執行,而會直接返回null:

    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxCacheLevel, 
                                status.mapProgress());
    if (target == -1) {
      return null;
    }

此時,直接返回到JobQueueTaskScheduler的assignTasks方法。可以看出,如果經過obtainNewNodeOrRackLocalMapTask的執行返回null,也就是在本地和機架內沒有找到任務,則會往下執行另一個方法:obtainNewNonLocalMapTask

 

複製程式碼

          t =  job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, 
                numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());
          if (t != null) {
            assignedTasks.add(t);
            ++numLocalMaps;
            。。。。。。。。。。。
          }          
          // Try to schedule a node-local or rack-local Map task
          t = 
            job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
                                   taskTrackerManager.getNumberOfUniqueHosts());

複製程式碼

 

我們前面提過,任務有幾種排程順序,再來小結一下,首先是排程那些失敗的任務,因為這些任務比較緊急;其次是未執行的任務以及執行得很慢的任務,在尋找未執行任務的過程中,首先查詢本地任務,之後查詢機架內的任務。obtainNewNodeOrRackLocalMapTask限定了只找兩層節點,本地或父節點,父節點即機架內)這兩種任務的查詢我們已經分析過了,如果沒找到就返回null,此時obtainNewNonLocalMapTask的執行實際上就是去尋找後面兩種任務:執行較慢的任務,以及沒有位置資訊的任務。

這部分程式碼的不好理解之處就在於obtainNewNonLocalMapTask和obtainNewNodeOrRackLocalMapTask一樣,都是呼叫了obtainNewMapTaskCommon這個方法,唯一的區別就是引數不同:

複製程式碼

  public synchronized Task obtainNewNodeOrRackLocalMapTask(
      TaskTrackerStatus tts, int clusterSize, int numUniqueHosts)
  throws IOException {
    return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, maxLevel);
  }
  
  public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts,
                                                    int clusterSize, 
                                                    int numUniqueHosts)
      throws IOException {
    return obtainNewMapTaskCommon(tts, clusterSize, numUniqueHosts, 
        NON_LOCAL_CACHE_LEVEL);
  }

複製程式碼

可以看出,上一個傳的是maxLevel,預設是2,在本地和機架內查詢;後一個傳的引數是NON_LOCAL_CACHE_LEVEL,預設是-1,也就是不關心位置資訊了。其實這個引數輸入後,前面我們分析過的從本地開始逐層往上尋找任務的程式碼就不會得到執行,只會執行無位置資訊和推測執行的部分程式碼。具體可見:

 

複製程式碼

    if (node != null) {
      Node key = node;
      int level = 0;
      int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
      for (level = 0;level < maxLevelToSchedule; ++level) {
。。。。。。。。。。。。。。。。。
      }
      
      // Check if we need to only schedule a local task (node-local/rack-local)
      if (level == maxCacheLevel) {
        return -1;
      }
    }

複製程式碼

 

因為此時輸入的引數maxCacheLevel=-1,所以maxLevelToSchedule=-1,因此就不會執行這個查詢本地和機架內任務的迴圈體程式碼。而是直接跳出。執行接下來的程式碼。

 

 

7,如果在本地和機架內找不到任務,則需要擴大範圍去找,應該按照什麼順序去找呢?這就是我們需要分析的findNewMapTask這個方法的接下來的程式碼。首先獲得所有最高級別的節點,也就是說,直接獲取最高層的那些節點(不管有幾層了,最高層總是含有更多工):

    // collection of node at max level in the cache structure
    Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();

這些節點比如就是機架等等層次較高的節點。這些節點可能是心跳TaskTracker所對應主機的父節點,或父節點的父節點等等。對於這些父節點,逐一去快取中查詢。不過因為在上面的查詢過程(obtainNewNodeOrRackLocalMapTask)中,已經找了該節點對應的任務以及該節點所在機架對應的任務,所以需要跳過這些已經查詢過的節點,去其它父節點中去找。查詢方式和前面一樣。這種找的策略Hadoop稱為breadth-wise across parents at max level,是一種類似廣度優先遍歷的思想,比如其節點的叔叔節點、伯父節點(如果只有兩層)、爺爺節點(如果有三層)去找。此時的尋找仍然是在快取nonRunningMapCache中尋找的。

 

8、如果利用廣度優先遍歷的方法在nonRunningMapCache中都找不到呢?看起來更高層的節點都找了,會是什麼原因?從程式碼來看主要原因是Split沒有帶著位置資訊。按理來說Split是有位置資訊的,但如果底層不是執行在HDFS上,或者其他一些原因(待考慮),也就無法得到Split的位置資訊,此時nonRunningMapCache甚至可能會為空集。前面在分析createCache的時候暫時忽略了一點,就是如果作業上傳了Split的位置資訊,任務會加入到對映表nonRunningMapCache中,該對映表的索引是Node,值是任務列表;而如果沒有Split的位置資訊呢?會加入到另外一個任務列表中,也就是nonLocalMaps,表示缺少位置資訊條件下的那些還未執行的Map任務的列表,自然不是一個對映表了,因為無法獲知Map任務要處理的Split到底在哪臺機器上,也就是沒有Node節點資訊,只是一個List物件:

    this.nonLocalMaps = new LinkedList<TaskInProgress>();

在createCache方法中,其程式碼為:

    for (int i = 0; i < splits.length; i++) {
      String[] splitLocations = splits[i].getLocations();
      if (splitLocations == null || splitLocations.length == 0) {
        nonLocalMaps.add(maps[i]);
        continue;
      }

即對於所有Split,如果缺乏位置資訊,就加入到這個列表中。因此,在分配任務時,按照先nonRunningMapCache後nonLocalMaps的順序分配,其程式碼與上面一樣:

複製程式碼

    // 3. Search non-local tips for a new task
    tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false);
    if (tip != null) {
      // Add to the running list
      scheduleMap(tip);

      LOG.info("Choosing a non-local task " + tip.getTIPId());
      return tip.getIdWithinJob();
    }

複製程式碼

 

9,終於完成了最複雜,也是最普通的未執行任務分配過程的分析了,如果上面的任務都沒找到,那麼就進入下一類任務的分配,也就是推測執行的任務,前面已經提到,如果一個任務在執行,但是進度很慢,Hadoop會考慮將其多次排程,這樣的話,可能可以降低最慢那個任務的影響。

與前面通用的方法findTaskFromList類似,查詢推測任務的方法是findSpeculativeTask。如果明白了前面查詢普通任務的順序,這裡也就簡單了,和前面一樣,只是要到正在執行的任務快取中查詢,這種快取分為兩類,即存在位置資訊的和不存在位置資訊的。首先在runningMapCache中尋找(前面是nonRunningMapCache),如果找到這種任務,就返回;如果沒找到,就在nonLocalRunningMaps(前面是nonLocalMaps,我認為這個名字應該改為nonLocalnonRunningMaps,否則容易引起歧義)中尋找。而在runningMapCache中尋找的過程中,也是有兩個階段,首先是從TaskTracker對應的節點開始找,然後擴充套件到機架等等,如果沒有,就廣度優先,在父節點的兄弟節點、爺爺節點等去找。具體程式碼流程與尋找普通任務幾乎一樣。因此,唯一有較大區別的就是findSpeculativeTask這個方法。

比較一下這兩個方法,可以看出,findSpeculativeTask多了avgProgress這個引數,表示當前這個Job的所有Map任務的平均執行進度,這個引數我們前面提過。currentTime表示當前時間。

複製程式碼

  private synchronized TaskInProgress findTaskFromList(
      Collection<TaskInProgress> tips, TaskTrackerStatus ttStatus,
      int numUniqueHosts, boolean removeFailedTip)

  protected synchronized TaskInProgress findSpeculativeTask(
      Collection<TaskInProgress> list, TaskTrackerStatus ttStatus,
      double avgProgress, long currentTime, boolean shouldRemove) 

複製程式碼

還是依次獲取list中的任務,如果正在執行,則不在考慮之列:

      // should never be true! (since we delete completed/failed tasks)
      if (!tip.isRunning() || !tip.isRunnable()) {
        iter.remove();
        continue;
      }

否則檢測TaskInProgress這個任務是否需要推測執行,採用的方法是TaskInProgress這個類中的hasSpeculativeTask方法:

複製程式碼

  /**
   * Return whether the TIP has a speculative task to run.  We
   * only launch a speculative task if the current TIP is really
   * far behind, and has been behind for a non-trivial amount of 
   * time.
   */
  boolean hasSpeculativeTask(long currentTime, double averageProgress) {
    //
    // REMIND - mjc - these constants should be examined
    // in more depth eventually...
    //

    if (!skipping && activeTasks.size() <= MAX_TASK_EXECS &&
        (averageProgress - progress >= SPECULATIVE_GAP) &&
        (currentTime - startTime >= SPECULATIVE_LAG) 
        && completes == 0 && !isOnlyCommitPending()) {
      return true;
    }
    return false;
  }

複製程式碼

其程式碼比較簡單,主要是在幾個條件下判斷是否應該推測執行。首先看skipping,該標誌記錄了該任務是否不應該進行推測執行,其程式碼為:

複製程式碼

  /**
   * Get whether to start skipping mode. 
   */
  private boolean startSkipping() {
    if(maxSkipRecords>0 && 
        numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) {
      return true;
    }
    return false;
  }

  public static int getAttemptsToStartSkipping(Configuration conf) {
    return conf.getInt(ATTEMPTS_TO_START_SKIPPING, 2);
  }

複製程式碼

可以看出,如果失敗的次數已經大於預設的2次,就應該跳過這個任務,因為可能是任務本身有問題,繼續推測執行也沒用。

另外一個條件是activeTasks.size() <= MAX_TASK_EXECS,其中MAX_TASK_EXECS=1,也就是activeTasks.size()<=1。

activeTasks記錄了當前這個任務(類似一個類)正在執行(類似例項化物件)的數量,其定義為:

  // Map from task Id -> TaskTracker Id, contains tasks that are
  // currently runnings
  private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();

是一個對映表,索引是TaskAttemptID,值是TaskTracker的ID。

前面已經分析過,一個任務可能有多個執行例項。每個例項用TaskAttemptID表示,一個例子是(從其它部落格拷過來的):

”attempt_201108091551_0001_m_000000_0表示2011年8月9日15時51分啟動的JobTracker中第0001號作業的第000000號map task的第0號task attempt“

從前面可以看出,如果activeTasks.size()>1,也就是說,已經有多個執行例項了,那就別再排程了,這個很容易理解。

另外一個條件是averageProgress - progress >= SPECULATIVE_GAP,SPECULATIVE_GAP等於0.2,averageProgress是整個Job的所有Map任務的平均執行進度,是作為引數傳到這個方法中的,progress則表示某個任務的進度。一個任務的進度是怎麼算出來的呢?是TaskInProgress類中的recomputeProgress這個方法計算出來的,從程式碼來看,主要過程為(省略了一些其他程式碼):

複製程式碼

  void recomputeProgress() {
    if (isComplete()) {
      this.progress = 1;
    } else if (failed) {
      this.progress = 0;
    } else {
      double bestProgress = 0;
      String bestState = "";
      Counters bestCounters = new Counters();
      for (Iterator<TaskAttemptID> it = taskStatuses.keySet().iterator(); it.hasNext();) {
        TaskAttemptID taskid = it.next();
        TaskStatus status = taskStatuses.get(taskid);
        if (status.getRunState() == TaskStatus.State.SUCCEEDED) {
          bestProgress = 1;
          break;
        } else if (status.getRunState() == TaskStatus.State.COMMIT_PENDING) {
          //for COMMIT_PENDING, we take the last state that we recorded
          //when the task was RUNNING
          bestProgress = this.progress;
        } else if (status.getRunState() == TaskStatus.State.RUNNING) {
          if (status.getProgress() >= bestProgress) {
            bestProgress = status.getProgress();
          }
        }
      }
      this.progress = bestProgress;
    }
  }

複製程式碼

實際上也就是說,如果完成了,進度就是1;如果失敗了,進度就是0;否則,對於該任務的所有TaskAttemptID,通過其status.getProgress()得到TaskAttemptID的進度,status是一個TaskStatus物件,描述了一個TaskAttempt的狀態,Hadoop裡面命名不是特別規範,