1. 程式人生 > >MapReduce多使用者任務排程器——容量排程器(Capacity Scheduler)原理和原始碼研究

MapReduce多使用者任務排程器——容量排程器(Capacity Scheduler)原理和原始碼研究

前言:為了研究需要,將Capacity Scheduler和Fair Scheduler的原理和程式碼進行學習,用兩篇文章作為記錄。如有理解錯誤之處,歡迎批評指正。

容量排程器(Capacity Scheduler)是Yahoo公司開發的多使用者排程器。多使用者排程器的使用場景很多,根據資料1的說法,Hadoop叢集的使用者量越來越大,不同使用者提交的應用程式具有不同的服務質量要求(QoS):

1. 批處理作業:耗時較長,對完成時間沒有嚴格要求。如資料探勘、機器學習等應用。

2. 互動式作業:期望及時返回結果。如Hive等應用。

3. 生產性作業:要求一定量的的資源保證。如統計值計算、垃圾資料分析等。

傳統的FIFO排程器不能滿足應用對響應時間和資源的多樣化要求,多使用者多佇列排程器應運而生。容量排程器即是其中被廣泛應用的一種。

一、基本思想

容量排程器以佇列為單位劃分資源,每個佇列都有資源使用的下限和上限。每個使用者也可以設定資源使用上限。一個佇列的剩餘資源可以共享給另一個佇列,其他佇列使用後還可以歸還。管理員可以約束單個佇列、使用者或作業的資源使用。支援資源密集型作業,可以給某些作業分配多個slot(這是比較特殊的一點)。支援作業優先順序,但不支援資源搶佔。

這裡明確一下使用者、佇列和作業之間的關係。Hadoop以佇列為單位管理資源,每個佇列分配到一定的資源,使用者只能向一個或幾個佇列提交作業。佇列管理體現為兩方面:1. 使用者許可權管理:Hadoop使用者管理模組建立在作業系統使用者和使用者組之間的對映之上,允許一個作業系統使用者或者使用者組對應一個或者多個佇列。

同時可以配置每個佇列的管理員使用者。佇列資訊配置在mapred-site.xml檔案中,包括佇列的名稱,是否啟用許可權管理功能等資訊,且不支援動態載入。佇列許可權選項配置在mapred-queue-acls.xml檔案中,可以配置某個使用者或使用者組在某個佇列中的某種許可權。許可權包括作業提交許可權和作業管理許可權。2. 系統資源管理:管理員可以配置每個佇列和每個使用者的可用資源量資訊,為排程器提供排程依據。這些資訊配置在排程器自己的配置檔案(如Capacity-Scheduler.xml)中。關於每個配置檔案的常見內容見附錄。

二、整體架構

總體來說,容量排程器的工作流程分5個步驟:

1. 使用者提交作業到JobTracker。

2. JobTracker將提交的作業交給Capacity Scheduler的監聽器JobQueuesManager,並將作業加入等待佇列,由JobInitializationPoller執行緒初始化。

3. TaskTracker通過心跳資訊要求JobTracker為其分配任務。

4. JobTracker呼叫Capacity Scheduler的assignTasks方法為其分配任務。

5. JobTracker將分配到的任務返回給TaskTracker。

接下,我們結合原始碼依次研究上述過程。

三、實現細節

1. 排程器的啟動

回憶一下,前面談到排程器啟動是由JobTracker呼叫排程器的start方法實現的,首先來看start方法:

    // initialize our queues from the config settings
    if (null == schedConf) {
      schedConf = new CapacitySchedulerConf();
    }
首先生成配置物件,容量排程器定義了自己的配置物件,構造時會載入排程器自己的配置檔案作為資源,並初始化一些預設的配置選項:
public CapacitySchedulerConf() {
    rmConf = new Configuration(false);
    rmConf.addResource(SCHEDULER_CONF_FILE);
    initializeDefaults();
  }

private void initializeDefaults() {
    defaultUlimitMinimum = 
      rmConf.getInt(
          "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
    defaultUserLimitFactor = 
      rmConf.getFloat("mapred.capacity-scheduler.default-user-limit-factor", 
          1.0f);
    defaultSupportPriority = rmConf.getBoolean(
        "mapred.capacity-scheduler.default-supports-priority", false);
    defaultMaxActiveTasksPerQueueToInitialize = 
      rmConf.getInt(
          "mapred.capacity-scheduler.default-maximum-active-tasks-per-queue", 
          200000);
    defaultMaxActiveTasksPerUserToInitialize = 
      rmConf.getInt(
          "mapred.capacity-scheduler.default-maximum-active-tasks-per-user", 
          100000);
    defaultInitToAcceptJobsFactor =
      rmConf.getInt("mapred.capacity-scheduler.default-init-accept-jobs-factor", 
          10);
  }
例如,第一個預設值表示每個使用者的最低資源保障,預設為100%。第三個預設值表示是否考慮作業優先順序,預設是不考慮。其他配置可以參考資料1中的講解。接下來,初始化佇列資訊,佇列資訊由QueueManager物件獲得,該物件的構造過程如下:
  public QueueManager(Configuration conf) {
    checkDeprecation(conf);
    conf.addResource(QUEUE_ACLS_FILE_NAME);
    
    // Get configured ACLs and state for each queue
    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);

    queues.putAll(parseQueues(conf)); 
  }
  
  synchronized private Map<String, Queue> parseQueues(Configuration conf) {
    Map<String, Queue> queues = new HashMap<String, Queue>();
    // First get the queue names
    String[] queueNameValues = conf.getStrings("mapred.queue.names",
        new String[]{JobConf.DEFAULT_QUEUE_NAME});
    for (String name : queueNameValues) {
      Map queueACLs = getQueueAcls(name, conf);
      if (queueACLs == null) {
        LOG.error("The queue, " + name + " does not have a configured ACL list");
      }
      queues.put(name, new Queue(name, getQueueAcls(name, conf),
          getQueueState(name, conf), QueueMetrics.create(name, conf)));
    }
    
    return queues;
  }
首先,獲取使用者許可權配置檔案mapred-queue-acls.xml。然後通過mapred-site.xml中的配置解析並生成佇列的列表queues。解析的過程是,先獲取每個佇列的名字,再通過名字獲取佇列的許可權配置,最後依據這些資訊以及佇列狀態和佇列度量物件構造一個佇列並加入結果列表。如上面程式碼。在初始化佇列之前還有構造出每個佇列對應的CapacitySchedulerQueue物件:
  Map<String, CapacitySchedulerQueue> 
  parseQueues(Collection<String> queueNames, CapacitySchedulerConf schedConf) 
  throws IOException {
    Map<String, CapacitySchedulerQueue> queueInfoMap = 
      new HashMap<String, CapacitySchedulerQueue>();
    
    // Sanity check: there should be at least one queue. 
    if (0 == queueNames.size()) {
      throw new IllegalStateException("System has no queue configured");
    }

    float totalCapacityPercent = 0.0f;
    for (String queueName: queueNames) {
      float capacityPercent = schedConf.getCapacity(queueName);
      if (capacityPercent == -1.0) {
        throw new IOException("Queue '" + queueName + 
            "' doesn't have configured capacity!");
      } 
      
      totalCapacityPercent += capacityPercent;

      // create our Queue and add to our hashmap
      CapacitySchedulerQueue queue = 
        new CapacitySchedulerQueue(queueName, schedConf);
      queueInfoMap.put(queueName, queue);
    }
    
    if (Math.floor(totalCapacityPercent) != 100) {
      throw new IllegalArgumentException(
        "Sum of queue capacities not 100% at "
          + totalCapacityPercent);
    }    

    return queueInfoMap;
  }
容量排程器佇列物件被裝入一個以佇列名為鍵的map中返回並用於初始化。獲取佇列後要進行初始化,由函式initialize完成:
    void initialize(QueueManager queueManager,
      Map<String, CapacitySchedulerQueue> newQueues,
      Configuration conf, CapacitySchedulerConf schedConf) {
    // Memory related configs
    initializeMemoryRelatedConf(conf);

    // Setup queues
    for (Map.Entry<String, CapacitySchedulerQueue> e : newQueues.entrySet()) {
      String newQueueName = e.getKey();
      CapacitySchedulerQueue newQueue = e.getValue();
      CapacitySchedulerQueue currentQueue = queueInfoMap.get(newQueueName);
      if (currentQueue != null) {
        currentQueue.initializeQueue(newQueue);
        LOG.info("Updated queue configs for " + newQueueName);
      } else {
        queueInfoMap.put(newQueueName, newQueue);
        LOG.info("Added new queue: " + newQueueName);
      }
    }

    // Set SchedulingDisplayInfo
    for (String queueName : queueInfoMap.keySet()) {
      SchedulingDisplayInfo schedulingInfo = 
        new SchedulingDisplayInfo(queueName, this);
      queueManager.setSchedulerInfo(queueName, schedulingInfo);
    }

    // Inform the queue manager 
    jobQueuesManager.setQueues(queueInfoMap);
    
    // let our mgr objects know about the queues
    mapScheduler.initialize(queueInfoMap);
    reduceScheduler.initialize(queueInfoMap);
    
    // scheduling tunables
    maxTasksPerHeartbeat = schedConf.getMaxTasksPerHeartbeat();
    maxTasksToAssignAfterOffSwitch = 
      schedConf.getMaxTasksToAssignAfterOffSwitch();
  }
具體過程如下:首先根據配置物件初始化跟記憶體相關的一些變數;然後檢查某個佇列是否在queueInfoMap資料結構中,若在,就更新佇列資訊,若不在,則加入其中,該資料結構提供了一個快速通過佇列名訪問佇列的途徑;接下來設定每個佇列的排程資訊用於展示或日誌;然後將佇列map交給監聽器物件JobQueuesMananger;接著,將佇列資訊再交給map和reduce排程器物件,每個排程器物件維護了可以獲取任務的佇列列表,用於排程時的佇列選擇;最後設定批量任務分配的最大數量。

上述過程中,用於不同任務排程的mapScheduler和reduceScheduler值得進一步研究。佇列會被加入到map或reduce排程器的優先順序佇列中:

     queuesForAssigningTasks.clear();
     queuesForAssigningTasks.addAll(queues.values());
     Collections.sort(queuesForAssigningTasks, queueComparator);
佇列的優先順序由queueComparator定義,map和reduce的比較器實現基本相同,只是任務型別不同:
     public int compare(CapacitySchedulerQueue q1, CapacitySchedulerQueue q2) {
        // look at how much capacity they've filled. Treat a queue with
        // capacity=0 equivalent to a queue running at capacity
        TaskType taskType = getTaskType();
        double r1 = (0 == q1.getCapacity(taskType))? 1.0f:
          (double)q1.getNumSlotsOccupied(taskType)/(double) q1.getCapacity(taskType);
        double r2 = (0 == q2.getCapacity(taskType))? 1.0f:
          (double)q2.getNumSlotsOccupied(taskType)/(double) q2.getCapacity(taskType);
        if (r1<r2) return -1;
        else if (r1>r2) return 1;
        else return 0;
      }
上述compare方法的實現表明,佇列的資源使用率越高,在佇列列表中的順序越靠後,優先順序越低。也就是說,Capacity Scheduler總是選擇資源利用率最低的佇列。至此,佇列初始化分析完畢。

接下來,排程器將監聽器物件註冊到JobTracker:

    // listen to job changes
    taskTrackerManager.addJobInProgressListener(jobQueuesManager);
然後啟動初始化執行緒:
    //Start thread for initialization
    if (initializationPoller == null) {
      this.initializationPoller = new JobInitializationPoller(
          jobQueuesManager, schedConf, queueNames, taskTrackerManager);
    }
    initializationPoller.init(queueNames.size(), schedConf);
    initializationPoller.setDaemon(true);
    initializationPoller.start();
初始化執行緒initializationPoller是個後臺執行緒。init方法為每個佇列指定一個初始化執行緒,執行緒總數總是小於或等於佇列的數量。然後啟動每個初始化執行緒。
最後設定用於顯示排程器資訊的Servlet:
    if (taskTrackerManager instanceof JobTracker) {
      JobTracker jobTracker = (JobTracker) taskTrackerManager;
      HttpServer infoServer = jobTracker.infoServer;
      infoServer.setAttribute("scheduler", this);
      infoServer.addServlet("scheduler", "/scheduler",
          CapacitySchedulerServlet.class);
    }
至此,排程器啟動完畢。

2. 作業初始化

由於初始化的作業不能得到排程會佔用過多記憶體,容量排程器通過兩種策略初始化作業:1. 優先初始化最可能被排程器排程的作業;2. 限制使用者初始化作業數目。詳細過程如下:作業被提交到JobTracker後,排程器的監聽器呼叫jobAdded方法:

    // add job to the right queue
    CapacitySchedulerQueue queue = getQueue(job.getProfile().getQueueName());
這條語句將作業加入對應的佇列中。接下來呼叫佇列的addWaitingJob方法:
    synchronized void addWaitingJob(JobInProgress job) throws IOException {
    JobSchedulingInfo jobSchedInfo = new JobSchedulingInfo(job);
    
    String user = job.getProfile().getUser();

    // Check acceptance limits
    checkJobSubmissionLimits(job, user);
    
    waitingJobs.put(jobSchedInfo, job);
    
    // Update user stats
    UserInfo userInfo = users.get(user);
    if (userInfo == null) {
      userInfo = new UserInfo(comparator);
      users.put(user, userInfo);
    }
    userInfo.jobAdded(jobSchedInfo, job);
  }
在該方法中,首先生成排程資訊物件,此物件與預設的FIFO排程器的排程資訊物件一樣。然後檢查三個約束:

1. 作業的任務數是否超過每個使用者最大任務數

    if (job.desiredTasks() > maxActiveTasksPerUser) {
      throw new IOException(
          "Job '" + job.getJobID() + "' from user '" + user  +
          "' rejected since it has " + job.desiredTasks() + " tasks which" +
          " exceeds the limit of " + maxActiveTasksPerUser + 
          " tasks per-user which can be initialized for queue '" + 
          queueName + "'"
          );
    }
2. 佇列中等待初始化、已經初始化的作業數目和在執行的作業數不能超過可接受值
// Across all jobs in queue
    int queueWaitingJobs = getNumWaitingJobs();
    int queueInitializingJobs = getNumInitializingJobs();
    int queueRunningJobs = getNumRunningJobs();
    if ((queueWaitingJobs + queueInitializingJobs + queueRunningJobs) >= 
      maxJobsToAccept) {
      throw new IOException(
          "Job '" + job.getJobID() + "' from user '" + user  + 
          "' rejected since queue '" + queueName + 
          "' already has " + queueWaitingJobs + " waiting jobs, " + 
          queueInitializingJobs + " initializing jobs and " + 
          queueRunningJobs + " running jobs - Exceeds limit of " +
          maxJobsToAccept + " jobs to accept");
    }
3. 使用者等待初始化、已經初始化和在執行的作業數不能超過可接受值
    // Across all jobs of the user
    int userWaitingJobs = getNumWaitingJobsByUser(user);
    int userInitializingJobs = getNumInitializingJobsByUser(user);
    int userRunningJobs = getNumRunningJobsByUser(user);
    if ((userWaitingJobs + userInitializingJobs + userRunningJobs) >= 
        maxJobsPerUserToAccept) {
      throw new IOException(
          "Job '" + job.getJobID() + "' rejected since user '" + user +  
          "' already has " + userWaitingJobs + " waiting jobs, " +
          userInitializingJobs + " initializing jobs and " +
          userRunningJobs + " running jobs - " +
          " Exceeds limit of " + maxJobsPerUserToAccept + " jobs to accept" +
          " in queue '" + queueName + "' per user");
    }

若有一個約束不滿足,則丟擲異常。否則將作業加入等待初始化佇列。最後呼叫排程器的jobAdded方法通知排程器:
  // called when a job is added
  synchronized void jobAdded(JobInProgress job) throws IOException {
    CapacitySchedulerQueue queue = 
      queueInfoMap.get(job.getProfile().getQueueName());
    
    // Inform the queue
    queue.jobAdded(job);
    
    // setup scheduler specific job information
    preInitializeJob(job);
  }
首先獲取佇列,然後告訴佇列有作業加入,並將佇列中提交該作業的使用者的作業數更新。最後依據配置計算每個任務需要的slot數目(容量排程器支援大記憶體作業)。

作業初始化執行緒的入口在JobInitializationPoller(以下簡稱poller)的run方法。

  public void run() {
    while (running) {
      try {
        cleanUpInitializedJobsList();
        selectJobsToInitialize();
        if (!this.isInterrupted()) {
          Thread.sleep(sleepInterval);
        }
      } catch (InterruptedException e) {
        LOG.error("Job Initialization poller interrupted"
            + StringUtils.stringifyException(e));
      }
    }
  }

在該方法中,首先從initializedJobs資料結構中清除一些作業,這些作業是正在執行且獲得排程的作業或者是執行完成的作業。接著,呼叫selectJobsToInitialize方法來選擇等待初始化的作業。具體過程如下,對於每個佇列,首先選擇該佇列中處於waitingJobs列表中的作業:

ArrayList<JobInProgress> jobsToInitialize = getJobsToInitialize(queue);
選擇的原則是:一看該作業是否已經初始化;若不是,二檢查佇列中作業總數(正在執行和正在初始化)和允許的活動任務數是否超過上限;若沒有,檢查提交該作業的使用者是不是有過多的作業(正在執行和正在初始化)或活動的任務;若仍不是,則進一步檢查作業是否處於PREP狀態(沒有被kill掉),然後放入篩選結果列表,並通知所在佇列,將其放入initializingJobs列表。以上過程詳見getJobsToInitialized方法的實現,這裡不贅述。

下面獲取一個分配給該佇列的初始化執行緒,並將選擇初始化的作業加入屬於相應佇列的排程列表中:

    JobInitializationThread t = threadsToQueueMap.get(queue);
      for (JobInProgress job : jobsToInitialize) {
        t.addJobsToQueue(queue, job);
    }
每個初始化執行緒維護了一個map(jobsPerQueue),通過佇列名字可以找到由該執行緒初始化的佇列的作業排程列表。
最後,我們來看初始化執行緒JobInitializationThread的run方法,該方法中不停地呼叫initializeJobs方法:
     void initializeJobs() {
      // while there are more jobs to initialize...
      while (currentJobCount.get() > 0) {
        Set<String> queues = jobsPerQueue.keySet();
        for (String queue : queues) {
          JobInProgress job = getFirstJobInQueue(queue);
          if (job == null) {
            continue;
          }
          LOG.info("Initializing job : " + job.getJobID() + " in Queue "
              + job.getProfile().getQueueName() + " For user : "
              + job.getProfile().getUser());
          if (startIniting) {
            setInitializingJob(job);
            ttm.initJob(job);
            setInitializingJob(null);
          } else {
            break;
          }
        }
      }
    }

從程式碼可見,獲取佇列中第一個作業,將其交給JobTracker的initJob初始化,初始化詳細過程見前面的一系列文章。至此,容量排程器的作業初始化過程分析完畢。

作為該小節的結束,這裡說一下每個佇列中維護的幾個資料結構:

    this.waitingJobs = 
      new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
    this.initializingJobs =
      new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
    this.runningJobs = 
      new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
正如名稱暗示的那樣,三個列表分別持有等待初始化的作業、正在初始化的作業和正在執行的作業。它們共有的引數為一個Comparator物件,用於定義列表中作業的順序。
它的初始化如下:
    if (supportsPriorities) {
      // use the default priority-aware comparator
      comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;
    }
    else {
      comparator = STARTTIME_JOB_COMPARATOR;
    }
如果排程器支援優先順序,則比較器物件初始化為FIFO排程器中的FIFO比較器,原則是首先比較優先順序,再比較開始時間,最後比較作業ID。如果排程器不支援優先順序,則初始化為開始時間比較器,即先來先服務。 初始化執行緒會從waitingJobs列表中選擇要初始化的作業,被選擇的作業會放入initializingJobs列表,初始化後得到排程的作業會進入runningJobs列表。有關作業的排程見下一小節。

3. 任務排程

容量排程器採用三層排程模型:首先選擇一個佇列,其次選擇一個作業,最後選擇作業的任務。任務選擇由排程器的assignTasks方法完成,下面詳述該方法。

首先呼叫下面方法更新各個佇列的資源使用資訊:

updateAllQueues(mapClusterCapacity, reduceClusterCapacity);
具體到每個佇列中,呼叫佇列的updateAll方法。

首先更新佇列最新的map和reduce資源量:

    // Compute new capacities for maps and reduces
    mapSlots.updateCapacities(capacityPercent, maxCapacityPercent, 
        mapClusterCapacity);
    reduceSlots.updateCapacities(capacityPercent, maxCapacityPercent, 
        reduceClusterCapacity);
接下來將以下資訊更新到每個作業的排程資訊物件中:
    j.setSchedulingInfo(
          CapacityTaskScheduler.getJobQueueSchedInfo(numMapsRunningForThisJob, 
              numRunningMapSlots,
              numReservedMapSlotsForThisJob,
              numReducesRunningForThisJob, 
              numRunningReduceSlots,
              numReservedReduceSlotsForThisJob));
包括:作業正在執行的map和reduce作業數,作業正在使用的map和reduce資源數和為這個作業保留的map和reduce資源數。

然後將每個作業的資源使用資訊反映到該作業所在佇列的相關資訊中:

    update(TaskType.MAP, j, j.getProfile().getUser(), 
          numMapsRunningForThisJob, numMapSlotsForThisJob);
    update(TaskType.REDUCE, j, j.getProfile().getUser(), 
          numReducesRunningForThisJob, numReduceSlotsForThisJob);
包括佇列中正在執行的任務數,正在使用的資源量和使用者使用的資源量等資訊。

更新後,通過addMapTasks和addReduceTask兩個方法排程任務:

    // schedule tasks
    List<Task> result = new ArrayList<Task>();
    addMapTasks(taskTracker, result, maxMapSlots, currentMapSlots);
    addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
關於佇列和作業的優先順序前面已經提到,這裡關注任務的優先順序。在addMapTasks方法中,呼叫CapacityScheduler的assignTasks方法:
    JobInProgress job = taskTracker.getJobForFallowSlot(type);
      if (job != null) {
        if (availableSlots >= job.getNumSlotsPerTask(type)) {
          // Unreserve 
          taskTracker.unreserveSlots(type, job);
          
          // We found a suitable job. Get task from it.
          if (type == TaskType.MAP) {
            // Don't care about locality!
            job.overrideSchedulingOpportunities();
          }
          return obtainNewTask(taskTrackerStatus, job, true);
        } else {
          // Re-reserve the current tasktracker
          taskTracker.reserveSlots(type, job, availableSlots);

          return TaskLookupResult.getMemFailedResult(); 
        }
      }
首先,判斷TaskTracker是否正為某個作業預留資源(該作業為記憶體密集型,一個任務可能需要多個slot,上次排程沒有足夠的slot分配,故將其預留給該作業用於下次排程。這是容量排程器的大記憶體任務排程機制),若有預留,則判斷當前可用的資源是否能滿足該作業,若能則不再預留資源,並呼叫obtainNewTask方法將資源分配給該作業執行;若不能,繼續將當前資源預留給該作業,並返回記憶體失敗的結果。

如果TaskTracker沒有為某個作業預留資源,對於佇列集合中的每個隊裡,從中選擇一個作業,並呼叫obtainNewTask方法獲得一個任務。當遇到當前可用資源不能滿足一個任務時,也要預留資源。注意,每次獲取一個任務都會返回獲取的狀態,程式碼如下:

    for (CapacitySchedulerQueue queue : queuesForAssigningTasks) {
        //This call is for optimization if we are already over the
        //maximum-capacity we avoid traversing the queues.
        if (!queue.assignSlotsToQueue(type, 1)) {
          continue;
        }
        
        TaskLookupResult tlr = 
          getTaskFromQueue(taskTracker, availableSlots, queue, assignOffSwitch);
        TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();

        if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {
          continue; // Look in other queues.
        }

        // if we find a task, return
        if (lookUpStatus == TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND ||
            lookUpStatus == TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND) {
          return tlr;
        }
        // if there was a memory mismatch, return
        else if (lookUpStatus == 
          TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT) {
            return tlr;
        }
      }
最後來分析一下,獲取任務的核心方法obtainNewTask:
    TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, 
                                   JobInProgress job, boolean assignOffSwitch) 
    throws IOException {
      ClusterStatus clusterStatus = 
        scheduler.taskTrackerManager.getClusterStatus();
      int numTaskTrackers = clusterStatus.getTaskTrackers();
      int numUniqueHosts = scheduler.taskTrackerManager.getNumberOfUniqueHosts();
      
      // Inform the job it is about to get a scheduling opportunity
      job.schedulingOpportunity();
      
      // First, try to get a 'local' task
      Task t = job.obtainNewNodeOrRackLocalMapTask(taskTracker,
                                                   numTaskTrackers,
                                                   numUniqueHosts);
      
      if (t != null) {
        return TaskLookupResult.getTaskFoundResult(t, job); 
      }
      
      // Next, try to get an 'off-switch' task if appropriate
      // Do not bother as much about locality for High-RAM jobs
      if (job.getNumSlotsPerMap() > 1 || 
          (assignOffSwitch && 
              job.scheduleOffSwitch(numTaskTrackers))) {
        t = 
          job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers, numUniqueHosts);
      }
      
      return (t != null) ? 
          TaskLookupResult.getOffSwitchTaskFoundResult(t, job) :
          TaskLookupResult.getNoTaskFoundResult();
    }
與FIFO排程器的實現類似,首先也要試圖找到一個具有資料本地新的任務。若沒找到,則分配一個記憶體密集型任務或off-switch的任務。具體的分配過程參見前面的文章對FIFO任務排程的分析(MapReduce任務排程與執行原理之任務排程)。若仍然沒有找到,則返回沒有找到結果。

如果獲取到的任務數達到一次心跳返回的任務最大數量,則返回:

    if (tasks.size() >= maxTasksPerHeartbeat) {
        return;
      }

為了儘量提高任務的資料本地性,容量排程器採用了作業延遲排程機制:如果一個作業中未找到滿足資料本地性的任務,則會讓該作業跳過一定數目的機會,直到找到一個滿足資料本地性的任務或到達跳過次數上限。

    if (job.getNumSlotsPerMap() > 1 || 
          (assignOffSwitch && 
              job.scheduleOffSwitch(numTaskTrackers))) {
        t = 
          job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers, numUniqueHosts);
      }
assignOffSwitch為true表示還未分配過不具有資料本地性的任務,scheduleOffSwitch用於判斷作業是否到達跳過次數上限:
  public boolean scheduleOffSwitch(int numTaskTrackers) {
    long missedTaskTrackers = getNumSchedulingOpportunities();
    long requiredSlots = 
      Math.min((desiredMaps() - finishedMaps()), numTaskTrackers);
    
    return (requiredSlots  * localityWaitFactor) < missedTaskTrackers;
  }
localityWaitFactor表示作業輸入資料所在結點數佔結點總數的比例,requiredSlots表示作業還需要的資源數,二者的乘積來衡量跳過次數的上限,而missedTaskTrackers即為跳過次數。missedTaskTrackers每次分配任務時都會增加,如果分配到本地任務,則返回任務,該變數會重置為0;若沒有分配到,則表示跳過一次。在分配到非本地性任務後跳過次數也會重置為0。
reduce任務的分配機制相對簡單,只採用了大記憶體任務排程策略,排程器只要找到一個合適的reduce任務即返回,且沒有延遲排程。至此,容量排程器任務排程分析結束。

下一篇文章計劃學習Fair Scheduler。如有錯誤和問題,歡迎批評指正。

參考資料

【1】《Hadoop技術內幕--深入解析MapReduce架構設計與實現原理》董西成 【2】  Hadoop 1.0.0 原始碼

2013年10月7日