1. 程式人生 > >原始碼走讀-Yarn-ResourceManager03-RM排程之FairScheduler

原始碼走讀-Yarn-ResourceManager03-RM排程之FairScheduler

0x04 RM排程之FairScheduler

RM對NM的排程分為心跳觸發排程持續排程,我們先從心跳排程開始講。因為本文的主題是講RM,這裡就不講NM啟動過程了,放在另一篇文章裡分析。我們直接從AsyncDispatcher講起。

4.1 AsyncDispatcher

AsyncDispatcher的內部類GenericEventHandler會處理一個EventType: NODE_UPDATENodeUpdateSchedulerEvent,然後走我們熟悉的事件處理流程,最後交給ResourceManager的內部類SchedulerEventDispatcher處理,:

4.2 SchedulerEventDispatcher

接收到NODE_UPDATE事件後,會交給排程器FairSchedulerhandle方法處理。

4.3 FairScheduler

因為事件是NODE_UPDATE型別,所以會執行handle方法中以下程式碼:

 case NODE_UPDATE:
      if (!(event instanceof NodeUpdateSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      // NodeUpdateSchedulerEvent是SchedulerEvent的子類
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; nodeUpdate(nodeUpdatedEvent.getRMNode()); break;
// 處理從node來的update事件
private synchronized void nodeUpdate(RMNode nm) {
    long start = getClock().getTime();
    if (LOG.isDebugEnabled()) {
      LOG.debug("nodeUpdate: "
+ nm + " cluster capacity: " + clusterResource); } eventLog.log("HEARTBEAT", nm.getHostName()); // 根據NM的NodeId 取出FSSchedulerNode // FSSchedulerNode是配置了FairScheuler策略時 繼承自SchedulerNode,從排程器角度表示的一個節點 FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); // 拿到並清除在NM心跳間隔期間積累的container變更 List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates(); // 存放新啟動的container List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>(); // 存放已結束的container List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>(); for(UpdatedContainerInfo containerInfo : containerInfoList) { newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); completedContainers.addAll(containerInfo.getCompletedContainers()); } // 將應該新啟動的Container啟動起來 for (ContainerStatus launchedContainer : newlyLaunchedContainers) { containerLaunchedOnNode(launchedContainer.getContainerId(), node); } // 處理應該結束的Container for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } if (continuousSchedulingEnabled) { if (!completedContainers.isEmpty()) { attemptScheduling(node); } } else { // 嘗試排程該node attemptScheduling(node); } long duration = getClock().getTime() - start; // 統計指標 fsOpDurations.addNodeUpdateDuration(duration); }

下面看看attemptScheduling

synchronized void attemptScheduling(FSSchedulerNode node) {
    if (rmContext.isWorkPreservingRecoveryEnabled()
        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
      return;
    }

    final NodeId nodeID = node.getNodeID();
    if (!nodes.containsKey(nodeID)) {
      // The node might have just been removed while this thread was waiting
      // on the synchronized lock before it entered this synchronized method
      LOG.info("Skipping scheduling as the node " + nodeID +
          " has been removed");
      return;
    }

    // 分配新的conatiner
    // 1. 檢查app預留情況
    // 2. 如果沒有預留就開始地排程

    boolean validReservation = false;
    FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
    if (reservedAppSchedulable != null) {
      validReservation = reservedAppSchedulable.assignReservedContainer(node);
    }
    if (!validReservation) {
      // No reservation, schedule at queue which is farthest below fair share
      // 沒有預留時,
      int assignedContainers = 0;
      // 已分配資源,初始為空資源
      Resource assignedResource = Resources.clone(Resources.none());
      // 得到當前可分配的最大資源量
      Resource maxResourcesToAssign =
          Resources.multiply(node.getAvailableResource(), 0.5f);
      while (node.getReservedContainer() == null) {
        boolean assignedContainer = false;
        // 這一步很關鍵,開始嘗試分配該node的container資源
        // 最後得到是這一次準備分配的資源
        Resource assignment = queueMgr.getRootQueue().assignContainer(node);
        if (!assignment.equals(Resources.none())) {
          // 分配成功,就增加統計量
          assignedContainers++;
          assignedContainer = true;
          // 將此次分配的累加到已分配的資源中
          Resources.addTo(assignedResource, assignment);
        }
        // 分配不成功就退出迴圈
        if (!assignedContainer) { break; }
        // 判斷是否應該繼續排程 比如未開啟yarn.scheduler.fair.assignmultiple
        if (!shouldContinueAssigning(assignedContainers,
            maxResourcesToAssign, assignedResource)) {
          break;
        }
      }
    }
    // 更新root佇列指標
    updateRootQueueMetrics();
  }

4.4 FSParentQueue

我們接著看FSParentQueueassignContainer方法:

@Override
  public Resource assignContainer(FSSchedulerNode node) {
    Resource assigned = Resources.none();

    // 如果不允許分配比如超出配額限制就直接返回了
    if (!assignContainerPreCheck(node)) {
      return assigned;
    }

    // 在對子佇列排序時需要持有寫鎖
    writeLock.lock();
    try {

      // FairSharePolicy分配策略:
      // 通過加權公平共享份額來實現排程比較。以外,低於其最低份額的排程計劃比剛好達到最低份額的排程計劃優先順序更高。
      // 在比較低於最小份額的那些排程計劃時,參考的標準是低於最小份額的比例。
      // 比如,作業A具有10個任務的最小份額中的8個,而作業B具有100個最小份額中的50個
      // 則接下來排程作業B,因為B擁有50%的最小份額 而A處於佔80%

      // 在比較超過最小份額的排程計劃時,參考的標準是 執行中任務數/權重
      // 如果所有權重都相等,就把槽位給擁有最少任務數量的作業
      // 否則,擁有更高權重的作業分配更多

      // 對子佇列列表按FairSharePolicy來排序
      Collections.sort(childQueues, policy.getComparator());
    } finally {
      writeLock.unlock();
    }

    // 在排序和遍歷排序後的子佇列列表間隙會釋放鎖
    // 那麼這個佇列列表在以下情況可能被修改:
    // 1.將一個子佇列新增到子佇列列表末尾,那麼顯然這不會影響container分配
    // 2.移除一個子佇列,這樣可能還挺好,我們就不分配資源給很快被移除的佇列
    readLock.lock();
    try {
      // 以下開始遍歷子佇列分配資源,只要一次分配成功就退出迴圈
      for (FSQueue child : childQueues) {
        // 這裡有可能是FSParentQueue FSLeafQueue FSAppAttempt,都實現自Schedulable介面
        assigned = child.assignContainer(node);
        if (!Resources.equals(assigned, Resources.none())) {
          break;
        }
      }
    } finally {
      readLock.unlock();
    }
    // 最後返回分配到的資源
    return assigned;
  }

4.5 FSLeafQueue

這個小節我們看看FairScheduelr中的葉子佇列,首先是assignContainer方法:

@Override
  public Resource assignContainer(FSSchedulerNode node) {
    Resource assigned = Resources.none();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
          getName() + " fairShare: " + getFairShare());
    }

    if (!assignContainerPreCheck(node)) {
      return assigned;
    }

    Comparator<Schedulable> comparator = policy.getComparator();
    writeLock.lock();
    try {
      Collections.sort(runnableApps, comparator);
    } finally {
      writeLock.unlock();
    }
    // 在這個間隙我們釋放鎖可以有高效的效能、避免死鎖
    // 未排序的、可執行的App可能在這個時候直接新增
    // 但我們可以接受,因為這種可能性極低
    readLock.lock();
    try {
      // 遍歷app列表
      for (FSAppAttempt sched : runnableApps) {
        // 判斷待分配的app黑名單中是否包含該node,如果包含就分配下一個app
        if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
          continue;
        }

         // node嘗試分配container資源給該app
        assigned = sched.assignContainer(node);
        // 如果分配成功就跳出迴圈
        if (!assigned.equals(Resources.none())) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Assigned container in queue:" + getName() + " " +
                "container:" + assigned);
          }
          break;
        }
      }
    } finally {
      readLock.unlock();
    }
    // 返回分配到的資源
    return assigned;
  }