1. 程式人生 > >Yarn原始碼分析之MapReduce作業中任務Task排程整體流程(一)

Yarn原始碼分析之MapReduce作業中任務Task排程整體流程(一)

        v2版本的MapReduce作業中,作業JOB_SETUP_COMPLETED事件的發生,即作業SETUP階段完成事件,會觸發作業由SETUP狀態轉換到RUNNING狀態,而作業狀態轉換中涉及作業資訊的處理,是由SetupCompletedTransition來完成的,它主要做了四件事:

        1、通過設定作業Job的成員變數setupProgress為1,標記作業setup已完成;

        2、排程作業Job的Map Task;

        3、排程作業的JobReduce Task;

        4、如果沒有task了,則生成JOB_COMPLETED事件並交由作業的事件處理器eventHandler進行處理。

        本文,我們就將研究作業Job中Task是如何被排程的。

        首先看下SetupCompletedTransition中transition()方法關於作業Job中Task排程的程式碼,如下:

      // 排程作業Job的Map Task
      job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
      // 排程作業Job的Reduce Task
      job.scheduleTasks(job.reduceTasks, true);
        它實際上是通過Job,也就是JobImpl的scheduleTasks()完成的,這個方法需要兩個引數,第一個是作業Job待排程任務的任務ID集合taskIDs,第二個引數是表示是否恢復任務輸出的標誌位recoverTaskOutput,對於Map-Only型作業中Map任務和所有型別作業的Reduce任務,都需要恢復,標誌位recoverTaskOutput為true,具體程式碼如下:
  protected void scheduleTasks(Set<TaskId> taskIDs,
      boolean recoverTaskOutput) {
	  
	// 遍歷傳入的任務集合taskIDs中的每個TaskId,對taskID做以下處理:
    for (TaskId taskID : taskIDs) {
    	
      // 根據taskID從集合completedTasksFromPreviousRun中移除對應元素,並獲取被移除的元素TaskInfo例項taskInfo
      TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID);
      
      if (taskInfo != null) {// 若存在taskID對應任務資訊TaskInfo例項taskInfo
    	  
    	// 構造T_RECOVER型別任務恢復事件TaskRecoverEvent,交給eventHandler處理,標誌位recoverTaskOutput表示是否恢復任務的輸出,
    	// 對於Map-Only型Map任務和所有的Reduce任務,都需要恢復,標誌位recoverTaskOutput為true
        eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo,
            committer, recoverTaskOutput));
      } else {
    	  
    	// 否則,構造T_SCHEDULE型別任務排程事件TaskEvent,交給eventHandler處理
        eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE));
      }
    }
  }
        scheduleTasks()方法遍歷傳入的任務集合taskIDs中的每個TaskId例項taskID,對taskID做以下處理:

       1、根據taskID從集合completedTasksFromPreviousRun中移除對應元素,並獲取被移除的元素TaskInfo例項taskInfo;

       2、若存在taskID對應任務資訊TaskInfo例項taskInfo,構造T_RECOVER型別任務恢復事件TaskRecoverEvent,交給eventHandler處理,標誌位recoverTaskOutput表示是否恢復任務的輸出,對於Map-Only型Map任務和所有的Reduce任務,都需要恢復,標誌位recoverTaskOutput為true;

       3、否則,構造T_SCHEDULE型別任務排程事件TaskEvent,交給eventHandler處理。

        我們先看T_SCHEDULE型別任務排程事件TaskEvent的處理,它是交由Job的eventHandler來處理的,而這個eventHandler是在Job被建立時(即構造JobImpl例項時)由MRAppMaster的dispatcher來賦值的,而在MRAppMaster中,dispatcher被建立後就會註冊任務事件的處理器TaskEventDispatcher例項,程式碼如下:

dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
        而這個任務事件處理器TaskEventDispatcher中處理任務事件TaskEvent的handle()方法定義如下:
  private class TaskEventDispatcher implements EventHandler<TaskEvent> {
    @SuppressWarnings("unchecked")
    @Override
    public void handle(TaskEvent event) {
      Task task = context.getJob(event.getTaskID().getJobId()).getTask(
          event.getTaskID());
      ((EventHandler<TaskEvent>)task).handle(event);
    }
  }
        它實際上是通過作業Job中相關任務Task的handle()方法來處理的,而這個任務Task的實現則是TaskImpl,其中對於各種任務事件的處理,也是類似作業Job,由一個任務Task的狀態機進行處理,關於任務Task的狀態機,我們會有專門的文章進行介紹,這裡,您只需要知道在TaskImpl中,對於上述兩種任務狀態機中任務狀態的轉換、觸發事件及事件處理者定義如下:
  private static final StateMachineFactory
               <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent> 
            stateMachineFactory 
           = new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
               (TaskStateInternal.NEW)
		
	// 省略部分程式碼
        .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED, 
                  TaskEventType.T_SCHEDULE, new InitialScheduleTransition())		
	// 省略部分程式碼
	.addTransition(TaskStateInternal.NEW,
                   EnumSet.of(TaskStateInternal.FAILED,
                   TaskStateInternal.KILLED,
                   TaskStateInternal.RUNNING,
                   TaskStateInternal.SUCCEEDED),
                   TaskEventType.T_RECOVER, new RecoverTransition())
	// 省略部分程式碼
        由此可見,對於T_RECOVER型別任務恢復事件TaskRecoverEvent,Task狀態機指定由RecoverTransition處理,並且任務Task的狀態會由NEW轉換為RUNNING、FAILED、KILLED、SUCCEEDED等,而對於T_SCHEDULE型別任務排程事件TaskEvent,則由Task狀態機指定為InitialScheduleTransition處理,並且任務Task的狀態會由NEW轉換為SCHEDULED。下面,我們挨個進行分析。

        一、T_SCHEDULE型別任務排程事件TaskEvent

        由InitialScheduleTransition進行處理,任務Task的狀態會由NEW轉換為SCHEDULED,InitialScheduleTransition程式碼如下:

  private static class InitialScheduleTransition
    implements SingleArcTransition<TaskImpl, TaskEvent> {

    @Override
    public void transition(TaskImpl task, TaskEvent event) {
    	
      // 新增並排程任務執行嘗試TaskAttempt,Avataar.VIRGIN表示它是第一個Attempt,
      // 而剩餘的Avataar.SPECULATIVE表示它是為拖後腿任務開啟的一個Attempt,即推測執行原理
      task.addAndScheduleAttempt(Avataar.VIRGIN);
      // 設定任務的排程時間scheduledTime為當前時間
      task.scheduledTime = task.clock.getTime();
      // 傳送任務啟動事件
      task.sendTaskStartedEvent();
    }
  }
         InitialScheduleTransition的處理邏輯比較簡單,大體如下:

        1、呼叫addAndScheduleAttempt()方法,新增並排程任務執行嘗試TaskAttempt,Avataar.VIRGIN表示它是第一個Attempt,而剩餘的Avataar.SPECULATIVE表示它是為拖後腿任務開啟的一個Attempt,即推測執行原理;

        2、設定任務的排程時間scheduledTime為當前時間;

        3、傳送任務啟動事件。

        其中,1中的addAndScheduleAttempt()方法實現如下:

  // This is always called in the Write Lock
  private void addAndScheduleAttempt(Avataar avataar) {
	  
	// 呼叫addAttempt()方法,建立一個任務執行嘗試TaskAttempt例項attempt,
	// 並將其新增到attempt集合attempts中,還會設定attempt的Avataar屬性
    TaskAttempt attempt = addAttempt(avataar);
    
    // 將attempt的id新增到正在執行的attempt集合inProgressAttempts中
    inProgressAttempts.add(attempt.getID());
    
    //schedule the nextAttemptNumber
    // 排程TaskAttempt
    
    // 如果集合failedAttempts大小大於0,說明該Task之前有TaskAttempt失敗過,此次為重新排程,
    // TaskAttemp事件型別為TA_RESCHEDULE,
    if (failedAttempts.size() > 0) {
      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
          TaskAttemptEventType.TA_RESCHEDULE));
    } else {
      // 否則為TaskAttemp事件型別為TA_SCHEDULE
      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
          TaskAttemptEventType.TA_SCHEDULE));
    }
  }
        addAndScheduleAttempt()方法處理邏輯如下:

        1、呼叫addAttempt()方法,建立一個任務執行嘗試TaskAttempt例項attempt,並將其新增到attempt集合attempts中,還會設定attempt的Avataar屬性;

        2、將attempt的id新增到正在執行的attempt集合inProgressAttempts中;

        3、排程TaskAttempt:如果集合failedAttempts大小大於0,說明該Task之前有TaskAttempt失敗過,此次為重新排程,TaskAttemp事件型別為TA_RESCHEDULE,否則為TaskAttemp事件型別為TA_SCHEDULE。

        而addAttempt()方法實現如下:

  private TaskAttemptImpl addAttempt(Avataar avataar) {
	  
	// 呼叫createAttempt()方法建立任務執行嘗試TaskAttemptImpl例項attempt
    TaskAttemptImpl attempt = createAttempt();
    
    // 設定attempt的Avataar屬性
    attempt.setAvataar(avataar);
    
    // 記錄debug級別日誌資訊:Created attempt ... ...
    if (LOG.isDebugEnabled()) {
      LOG.debug("Created attempt " + attempt.getID());
    }
    
    // 將建立的任務執行嘗試TaskAttemptImpl例項attempt與其ID的對應關係新增到TaskImpl的任務執行嘗試集合attempts中,
    // attempts先被初始化為Collections.emptyMap()
    // this.attempts = Collections.emptyMap();
    switch (attempts.size()) {
      case 0:
    	  
    	// 如果attempts大小為0,即為Collections.emptyMap(),則將其更換為Collections.singletonMap(),並加入該TaskAttemptImpl例項attempt
        attempts = Collections.singletonMap(attempt.getID(),
            (TaskAttempt) attempt);
        break;
        
      case 1:
    	  
    	// 如果attempts大小為1,即為Collections.singletonMap(),則將其替換為LinkedHashMap,並加入之前和現在的TaskAttemptImpl例項attempt
        Map<TaskAttemptId, TaskAttempt> newAttempts
            = new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts);
        newAttempts.putAll(attempts);
        attempts = newAttempts;
        attempts.put(attempt.getID(), attempt);
        break;

      default:
    	// 如果attempts大小大於1,說明其實一個LinkedHashMap,直接put吧
        attempts.put(attempt.getID(), attempt);
        break;
    }

    // 累加TaskAttempt計數器nextAttemptNumber
    ++nextAttemptNumber;
    
    // 返回TaskAttemptImpl例項attempt
    return attempt;
  }
        其處理邏輯如下:

        1、呼叫createAttempt()方法建立任務執行嘗試TaskAttemptImpl例項attempt;

        2、設定attempt的Avataar屬性;

        3、記錄debug級別日誌資訊:Created attempt ... ...;

        4、將建立的任務執行嘗試TaskAttemptImpl例項attempt與其ID的對應關係新增到TaskImpl的任務執行嘗試集合attempts中,attempts先被初始化為Collections.emptyMap():

              4.1、如果attempts大小為0,即為Collections.emptyMap(),則將其更換為Collections.singletonMap(),並加入該TaskAttemptImpl例項attempt;

              4.2、如果attempts大小為1,即為Collections.singletonMap(),則將其替換為LinkedHashMap,並加入之前和現在的TaskAttemptImpl例項attempt;

              4.3、如果attempts大小大於1,說明其實一個LinkedHashMap,直接put吧;

        5、累加TaskAttempt計數器nextAttemptNumber;

        6、返回TaskAttemptImpl例項attempt。

         繼續往下追蹤createAttempt()方法,其在TaskImpl中程式碼如下:

  protected abstract TaskAttemptImpl createAttempt();
        這是一個抽象方法,由其子類實現,而它的子類有兩個,表示Map任務的MapTaskImpl和表示Reduce任務的ReduceTaskImpl,其createAttempt()方法分別實現如下:

         1、MapTaskImpl.createAttempt()

  @Override
  protected TaskAttemptImpl createAttempt() {
    return new MapTaskAttemptImpl(getID(), nextAttemptNumber,
        eventHandler, jobFile,
        partition, taskSplitMetaInfo, conf, taskAttemptListener,
        jobToken, credentials, clock, appContext);
  }
        生成一個MapTaskAttemptImpl例項,傳入表示Attempt序號的nextAttemptNumber、事件處理器eventHandler、作業檔案jobFile、分割槽資訊partition、分片元資料資訊taskSplitMetaInfo等關鍵變數。

        2、ReduceTaskImpl.createAttempt()

  @Override
  protected TaskAttemptImpl createAttempt() {
    return new ReduceTaskAttemptImpl(getID(), nextAttemptNumber,
        eventHandler, jobFile,
        partition, numMapTasks, conf, taskAttemptListener,
        jobToken, credentials, clock, appContext);
  }

        生成一個ReduceTaskAttemptImpl例項,除不需要分片元資料資訊taskSplitMetaInfo,和需要一個Map任務數numMapTasks外,其他與MapTaskAttemptImpl基本相同。

        TaskAttempt生成了,接下來就應該進行排程執行了。我們再折回去看看addAndScheduleAttempt()方法中,傳送的TA_SCHEDULE或TA_RESCHEDULE型別的TaskAttemptEvent,其與JobImpl、TaskImpl一樣,是由TaskAttempt狀態機負責處理的,如下所示:

     // 在事件TaskAttemptEventType.TA_SCHEDULE的觸發下,經過RequestContainerTransition的處理,
     // TaskAttempt的狀態由NEW轉換成UNASSIGNED
     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
         TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))
     
     // 在事件TaskAttemptEventType.TA_SCHEDULE的觸發下,經過RequestContainerTransition的處理,
     // TaskAttempt的狀態由NEW轉換成UNASSIGNED
     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
         TaskAttemptEventType.TA_RESCHEDULE, new RequestContainerTransition(true))
     // 上述二者的區別是RequestContainerTransition傳入的標誌位rescheduled,前者為false,後者為true
        在事件TaskAttemptEventType.TA_SCHEDULE的觸發下,經過RequestContainerTransition的處理,TaskAttempt的狀態由NEW轉換成UNASSIGNED;在事件TaskAttemptEventType.TA_SCHEDULE的觸發下,經過RequestContainerTransition的處理,TaskAttempt的狀態由NEW轉換成UNASSIGNED;上述二者的區別是RequestContainerTransition傳入的標誌位rescheduled,前者為false,後者為true。

        我們再看下RequestContainerTransition的實現,程式碼如下:

  @SuppressWarnings("unchecked")
    @Override
    public void transition(TaskAttemptImpl taskAttempt, 
        TaskAttemptEvent event) {
      // Tell any speculator that we're requesting a container
      
      // taskAttempt的事件處理器eventHandler處理SpeculatorEvent事件,告訴所有的speculator,此時正在申請一個容器
      taskAttempt.eventHandler.handle
          (new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1));
      //request for container
      
      // 申請容器
      if (rescheduled) {// Task的Attempt重新排程
    	  
    	// 構造容器申請事件ContainerRequestEvent,並交由taskAttempt的事件處理器eventHandler處理,
    	// 這個eventHandler實際上是MRAppMaster中的dispatcher,依次經過TaskImpl、TaskAttemptImpl的建立傳遞過來的,
        taskAttempt.eventHandler.handle(
            ContainerRequestEvent.createContainerRequestEventForFailedContainer(
                taskAttempt.attemptId, 
                taskAttempt.resourceCapability));
      } else {// Task的Attempt第一次排程
    	  
    	// 構造容器申請事件ContainerRequestEvent,並交由taskAttempt的事件處理器eventHandler處理,
        taskAttempt.eventHandler.handle(new ContainerRequestEvent(
            taskAttempt.attemptId, taskAttempt.resourceCapability,
            taskAttempt.dataLocalHosts.toArray(
                new String[taskAttempt.dataLocalHosts.size()]),
            taskAttempt.dataLocalRacks.toArray(
                new String[taskAttempt.dataLocalRacks.size()])));
      }
      
      // 兩者建立的ContainerRequestEvent事件的區別是,rescheduled時,不需要考慮Node和Lock位置屬性,因為此時Attempt之前已經失敗過,此時應當能夠以完成Attempt為首要任務,
      // 同時,兩者的事件型別都是ContainerAllocator.EventType.CONTAINER_REQ,
      // MRAppMaster中的dispatcher針對該事件ContainerAllocator.EventType註冊的事件處理器是LocalContainerAllocator或RMContainerAllocator
    }
        RequestContainerTransition的transition()方法處理邏輯如下:

        1、TaskAttempt的事件處理器eventHandler處理SpeculatorEvent事件,告訴所有的speculator,此時正在申請一個容器;

        2、申請容器:

              2.1、如果是Task的Attempt重新排程,構造容器申請事件ContainerRequestEvent,並交由taskAttempt的事件處理器eventHandler處理,這個eventHandler實際上是MRAppMaster中的dispatcher,依次經過TaskImpl、TaskAttemptImpl的建立傳遞過來的;

              2.2、否則如果是Task的Attempt第一次排程,構造容器申請事件ContainerRequestEvent,並交由taskAttempt的事件處理器eventHandler處理。

        兩者建立的ContainerRequestEvent事件的區別是,rescheduled時,不需要考慮Node和Lock位置屬性,因為此時Attempt之前已經失敗過,此時應當能夠以完成Attempt為首要任務,同時,兩者的事件型別都是ContainerAllocator.EventType.CONTAINER_REQ,MRAppMaster中的dispatcher針對該事件ContainerAllocator.EventType註冊的事件處理器是LocalContainerAllocator或RMContainerAllocator。

        關於Yarn容器等資源申請與分配RMContainerAllocator的介紹,我會在以後的文章中為大家講解,這裡,你只需要瞭解其執行的大體流程即可:

        1、RMContainerAllocator首先間接繼承自AbstractService,它是Hadoop中的一種服務,有服務初始化serviceInit()及服務啟動serviceStart()方法要執行;

        2、RMContainerAllocator針對容器請求分配事件,是一個雙重生產者-消費者模式,第一層生產者通過其handle()方法,將容器請求分配ContainerAllocatorEvent加入其內部eventQueue佇列,第一層消費者通過其內部事件處理執行緒eventHandlingThread,不斷的從事件佇列eventQueue中take事件進行消費,而消費的方式是做為第二層生產者,將事件按照任務型別放入排程請求列表scheduledRequests、pendingReduces中,scheduledRequests是一個複雜的區分Map和Reduce任務的會立即被排程的請求列表,而pendingReduces只是儲存等待被排程的Reduce任務請求的列表,其會根據Yarn中資源情況和Map任務完成情況確定是將事件移送至(即rampUp)scheduledRequests,還是從scheduledRequests移回Reduce任務排程請求至pendingReduces(即rampDown),而第二層的消費者則是RMContainerAllocator祖先父類RMCommunicator中的心跳執行緒allocatorThread,它週期性的呼叫heartbeat()方法,從Yarn的RM中獲取可用資源,然後消費scheduledRequests列表中的請求,進行容器分配;

        3、RMContainerAllocator中,對於Map任務來說,它經歷的資料結構,或者生命週期為scheduled->assigned->completed,而Reduce任務則是pending->scheduled->assigned->completed;

        4、經過一些的複雜邏輯後,包括綜合判斷資源情況、任務本地性、優先排程失敗任務、Map任務完成比例、針對拖後退的任務進行推測執行等,無論是Map任務還是Reduce任務,最終在分配到容器Container後,都會發送一個TaskAttemptContainerAssignedEvent事件,交由TaskAttemptImpl的狀態機中ContainerAssignedTransition進行處理,而其方法則最終會構造ContainerRemoteLaunchEvent事件,進行Container遠端載入,在遠端或本機或本程序Container中Launch任務嘗試進行任務的執行。

        關於RMContainerAllocator,因為其結構、處理邏輯比較複雜,我會專門寫文章進行分析,敬請期待!


        二、T_RECOVER型別任務恢復事件TaskRecoverEvent

        未完待續!敬請關注後續文章!