1. 程式人生 > >YARN原始碼分析之ApplicationMaster分配策略

YARN原始碼分析之ApplicationMaster分配策略

一次和朋友的談話中涉及到ApplicationMaster的container分配策略是什麼,我映像中是隨機分配的,但他說是根據各節點空閒資源來分配的。
之前看程式碼的時候也沒注意這塊的邏輯,既然現在有了疑惑那就去程式碼裡瞅瞅。

從MR的執行log中可以找到AM的container是在什麼時候分配的,見log

2017-04-09 03:26:17,113 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1491729774382_0001_000001 State change from SUBMITTED to SCHEDULED
2017
-04-09 03:26:17,415 INFO org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: container_1491729774382_0001_01_000001 Container Transitioned from NEW to ALLOCATED

AM container是在appattempt的狀態由SUBMITTED變為SCHEDULED時初始化的。
appattempt由SUBMITTED變為SCHEDULED狀態的處理邏輯為:

public static final class ScheduleTransition
implements MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { ApplicationSubmissionContext subCtx = appAttempt.submissionContext; if (!subCtx.getUnmanagedAM()) { // Need reset #containers before create new attempt, because this request
// will be passed to scheduler, and scheduler will deduct the number after // AM container allocated // 設定am container的請求 appAttempt.amReq.setNumContainers(1); appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY); // ResourceName為ANY表示任何機架上的任一機器 appAttempt.amReq.setResourceName(ResourceRequest.ANY); appAttempt.amReq.setRelaxLocality(true); // 由排程器來分配資源 Allocation amContainerAllocation = appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, Collections.singletonList(appAttempt.amReq), EMPTY_CONTAINER_RELEASE_LIST, null, null); ... return RMAppAttemptState.SCHEDULED; } else { ... } } }

首先為AM container構造container請求,其實從appAttempt.amReq.setResourceName(ResourceRequest.ANY)就可以看出am container的分配原則是隨機的,因為在建立請求時對ResourceName並沒有要求。但我們還是繼續看下程式碼以驗證下。
請求建立成功之後,由排程器來分配資源,這裡預設使用的是Capacity排程,程式碼如下:

// CapacityScheduler.java
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
    List<ResourceRequest> ask, List<ContainerId> release, 
    List<String> blacklistAdditions, List<String> blacklistRemovals) {

  FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
  ...
  // Release containers
  releaseContainers(release, application);

  synchronized (application) {
    ...
    if (!ask.isEmpty()) {
      ...
      application.showRequests();
      // 將請求該application attempt的map中
      // Update application requests
      application.updateResourceRequests(ask);
      application.showRequests();
    }

    application.updateBlacklist(blacklistAdditions, blacklistRemovals);
    //
    return application.getAllocation(getResourceCalculator(),
                 clusterResource, getMinimumResourceCapability());
  }
}

CapacityScheduler分配請求時,呼叫application.updateResourceRequests(ask)將請求放入map中,等待nm心跳時來取。
這個application是FiCaSchedulerApp的物件,FiCaSchedulerApp其實對應的是application attempt。updateResurceRequests程式碼如下:

public synchronized void updateResourceRequests(
    List<ResourceRequest> requests) {
  if (!isStopped) {
    // AppSchedulingInfo.updateResourceRequests
    appSchedulingInfo.updateResourceRequests(requests, false);
  }
}

AppSchedulingInfo記錄了application的所有消費情況,當然也包括這個application正在執行或者已經完成的container。

synchronized public void updateResourceRequests(
    List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
  // Update resource requests
  for (ResourceRequest request : requests) {
    Priority priority = request.getPriority();
    String resourceName = request.getResourceName();
    boolean updatePendingResources = false;
    ResourceRequest lastRequest = null;
    // 如果request的ResourceName是ResourceRequest.ANY
    // 只有am container是ANY???不應該吧
    if (resourceName.equals(ResourceRequest.ANY)) {
      ...
      // ResourceRequest.ANY才置為true??
      updatePendingResources = true;

      // Premature optimization?
      // Assumes that we won't see more than one priority request updated
      // in one call, reasonable assumption... however, it's totally safe
      // to activate same application more than once.
      // Thus we don't need another loop ala the one in decrementOutstanding()  
      // which is needed during deactivate.
      if (request.getNumContainers() > 0) {
        activeUsersManager.activateApplication(user, applicationId);
      }
    }
    // requests是一個請求列表 map
    // 檢視requests中是否已有該優先順序的請求
    // this.requests中存放的是這個application的request
    Map<String, ResourceRequest> asks = this.requests.get(priority);
    // 沒有此優先順序的請求,則new一個map
    if (asks == null) {
      asks = new HashMap<String, ResourceRequest>();
      this.requests.put(priority, asks);
      this.priorities.add(priority);
    }
    // asks不為null,檢視asks中是否有與此請求ResourceName一樣的請求
    lastRequest = asks.get(resourceName);

    if (recoverPreemptedRequest && lastRequest != null) {
      // Increment the number of containers to 1, as it is recovering a
      // single container.
      request.setNumContainers(lastRequest.getNumContainers() + 1);
    }
    // 把原來的請求拿出賦值給lastRequest,
    // 然後將新的request將入asks中,lastRequest怎麼辦?在哪處理的?
    asks.put(resourceName, request);
    if (updatePendingResources) {

      // Similarly, deactivate application?
      if (request.getNumContainers() <= 0) {
        LOG.info("checking for deactivate... ");
        checkForDeactivation();
      }

      int lastRequestContainers = lastRequest != null ? lastRequest
          .getNumContainers() : 0;
      Resource lastRequestCapability = lastRequest != null ? lastRequest
          .getCapability() : Resources.none();
      metrics.incrPendingResources(user, request.getNumContainers(),
          request.getCapability());
      metrics.decrPendingResources(user, lastRequestContainers,
          lastRequestCapability);
    }
  }
}

updateResourceRequests主要是將請求放入requests中,等待nm心跳來取。不過這裡有點模糊,在更新requests之前,如果存在該ResourceName的請求則取出,賦值給lastRequest,然後這個lastRequest是怎麼處理的呢?不知道怎麼回事,標註下。


更新完requests之後,回到CapacityScheduler.allocate中繼續執行,return時執行application.getAllocation返回一個Allocation物件,這裡會給container建立TOKEN,這裡建立TOKEN的container是已經分配給nm的,也就是已經例項化的RMContainer,是不是說排程器在排程container時,先建立一個請求,然後從newlyAllocatedContainers列表中取出上次請求container的響應結果

am container的請求建立好之後,等待nm心跳來取

某個nm傳送來了心跳,
程式碼如下:

// CapacityScheduler.handle  NODE_UPDATE事件
case NODE_UPDATE:
{
  NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
  RMNode node = nodeUpdatedEvent.getRMNode();
  // 更新該節點上的container的資訊
  // 對剛分配到該節點的container進行launch,已經完成的container進行狀態轉移
  nodeUpdate(node);
  if (!scheduleAsynchronously) {
    // 該節點取container請求
    allocateContainersToNode(getNode(node.getNodeID()));
  }
}

nm與CapacityScheduler心跳之後,通過nodeUpdate(node)對改節點上已有的container進行狀態更新,然後呼叫allocateContainersToNode去拉取新的container請求。

private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
  ...
  // Assign new containers...
  // 1. Check for reserved applications
  // 2. Schedule if there are no reservations
  // 如果有預留container的話先分配預留的container
  ...
  // Try to schedule more if there are no reservations to fulfill
  if (node.getReservedContainer() == null) {
    // 計算nm上是否還有空閒的資源進行分配container
    if (calculator.computeAvailableContainers(node.getAvailableResource(),
      minimumAllocation) > 0) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Trying to schedule on node: " + node.getNodeName() +
            ", available: " + node.getAvailableResource());
      }
      // 分配container
      root.assignContainers(clusterResource, node, false);
    }
  } else {
    LOG.info("Skipping scheduling since node " + node.getNodeID() + 
        " is reserved by application " + 
        node.getReservedContainer().getContainerId().getApplicationAttemptId()
        );
  }
}

排程器給這臺nm排程container時,先判斷這臺nm上是否有預留的container,如果有先對預留的container進行分配,如果沒有預留的container才呼叫root.assignContainers進行排程。
root是CSQueue物件,CSQueue是一個介面,抽象類AbstractCSQueue實現了該介面,而AbstractCSQueue又被ParentQueue和ChildQueue繼承,這裡呼叫的是ParentQueue的assignContainers,程式碼如下:

public synchronized CSAssignment assignContainers(
    Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) {
  CSAssignment assignment = 
      new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);

  // if our queue cannot access this node, just return
  if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
      labelManager.getLabelsOnNode(node.getNodeID()))) {
    return assignment;
  }

  while (canAssign(clusterResource, node)) {
    ...
    boolean localNeedToUnreserve = false;
    Set<String> nodeLabels = labelManager.getLabelsOnNode(node.getNodeID()); 
    // Are we over maximum-capacity for this queue?
    if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
      // check to see if we could if we unreserve first
      localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
      if (!localNeedToUnreserve) {
        break;
      }
    }
    // Schedule
    CSAssignment assignedToChild = 
        assignContainersToChildQueues(clusterResource, node, localNeedToUnreserve | needToUnreserve);
    assignment.setType(assignedToChild.getType());
    ...
    // Do not assign more than one container if this isn't the root queue
    // or if we've already assigned an off-switch container
    if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) {
      if (LOG.isDebugEnabled()) {
        if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) {
          LOG.debug("Not assigning more than one off-switch container," +
              " assignments so far: " + assignment);
        }
      }
      break;
    }
  } 

  return assignment;
}

分配時,先判斷此佇列是否可以訪問該nm,然後判斷是否可以訪問該nm上的label,都判斷通過之後呼叫assignContainersToChildQueues進行分配,

private synchronized CSAssignment assignContainersToChildQueues(Resource cluster, 
    FiCaSchedulerNode node, boolean needToUnreserve) {
  ...
  // Try to assign to most 'under-served' sub-queue
  for (Iterator<CSQueue> iter=childQueues.iterator(); iter.hasNext();) {
    CSQueue childQueue = iter.next();
    ...
    assignment = childQueue.assignContainers(cluster, node, needToUnreserve);
    ...
    // If we do assign, remove the queue and re-insert in-order to re-sort
    if (Resources.greaterThan(
            resourceCalculator, cluster, 
            assignment.getResource(), Resources.none())) {
      // Remove and re-insert to sort
      iter.remove();
      LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() + 
          " stats: " + childQueue);
      childQueues.add(childQueue);
      if (LOG.isDebugEnabled()) {
        printChildQueues();
      }
      break;
    }
  }

  return assignment;
}

assignContainersToChildQueues呼叫ChildQueue的assignContainer進行分配,分配之後要講改childQueue從佇列中remove掉,然後重新插入到佇列中,進行排序。
childQueue.assignContainers如下:

public synchronized CSAssignment assignContainers(Resource clusterResource,
    FiCaSchedulerNode node, boolean needToUnreserve) {
  ...
  // if our queue cannot access this node, just return
  if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
      labelManager.getLabelsOnNode(node.getNodeID()))) {
    return NULL_ASSIGNMENT;
  }

  // Check for reserved resources
  RMContainer reservedContainer = node.getReservedContainer();
  if (reservedContainer != null) {
    FiCaSchedulerApp application = 
        getApplication(reservedContainer.getApplicationAttemptId());
    synchronized (application) {
      return assignReservedContainer(application, node, reservedContainer,
          clusterResource);
    }
  }

  // Try to assign containers to applications in order
  for (FiCaSchedulerApp application : activeApplications) {
    ...
    // 加鎖
    synchronized (application) {
      // Check if this resource is on the blacklist
      if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
        continue;
      }

      // Schedule in priority order
      for (Priority priority : application.getPriorities()) {
        // 為什麼是ANY?
        // 如果當前application中的request中沒有ANY就不分配?
        // 想辦法debug試一下
        ResourceRequest anyRequest =
            application.getResourceRequest(priority, ResourceRequest.ANY);
        if (null == anyRequest) {
          continue;
        }

        // Required resource
        Resource required = anyRequest.getCapability();

        // Do we need containers at this 'priority'?
        if (application.getTotalRequiredResources(priority) <= 0) {
          continue;
        }
        if (!this.reservationsContinueLooking) {
          if (!needContainers(application, priority, required)) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("doesn't need containers based on reservation algo!");
            }
            continue;
          }
        }

        Set<String> requestedNodeLabels =
            getRequestLabelSetByExpression(anyRequest
                .getNodeLabelExpression());

        // Compute user-limit & set headroom
        // Note: We compute both user-limit & headroom with the highest 
        //       priority request as the target. 
        //       This works since we never assign lower priority requests
        //       before all higher priority ones are serviced.
        Resource userLimit = 
            computeUserLimitAndSetHeadroom(application, clusterResource, 
                required, requestedNodeLabels);          

        // Check queue max-capacity limit
        if (!canAssignToThisQueue(clusterResource, required,
            labelManager.getLabelsOnNode(node.getNodeID()), application, true)) {
          return NULL_ASSIGNMENT;
        }

        // Check user limit
        if (!assignToUser(clusterResource, application.getUser(), userLimit,
            application, true, requestedNodeLabels)) {
          break;
        }

        // Inform the application it is about to get a scheduling opportunity
        // 這又是什麼鬼?增加排程的機會?
        application.addSchedulingOpportunity(priority);

        // Try to schedule
        // 開始排程
        CSAssignment assignment =  
          assignContainersOnNode(clusterResource, node, application, priority, 
              null, needToUnreserve);

        // Did the application skip this node?
        if (assignment.getSkipped()) {
          // Don't count 'skipped nodes' as a scheduling opportunity!
          application.subtractSchedulingOpportunity(priority);
          continue;
        }

        // Did we schedule or reserve a container?
        Resource assigned = assignment.getResource();
        if (Resources.greaterThan(
            resourceCalculator, clusterResource, assigned, Resources.none())) {

          // Book-keeping 
          // Note: Update headroom to account for current allocation too...
          allocateResource(clusterResource, application, assigned,
              labelManager.getLabelsOnNode(node.getNodeID()));

          // Don't reset scheduling opportunities for non-local assignments
          // otherwise the app will be delayed for each non-local assignment.
          // This helps apps with many off-cluster requests schedule faster.
          if (assignment.getType() != NodeType.OFF_SWITCH) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("Resetting scheduling opportunities");
            }
            application.resetSchedulingOpportunities(priority);
          }

          // Done
          return assignment;
        } else {
          // Do not assign out of order w.r.t priorities
          break;
        }
      }
    }

    if(LOG.isDebugEnabled()) {
      LOG.debug("post-assignContainers for application "
        + application.getApplicationId());
    }
    application.showRequests();
  }

  return NULL_ASSIGNMENT;
}

LeafQueue.assignContainers會從遍歷當前佇列中正在執行的application的container請求,通過一系列的邏輯之後呼叫assignContainersOnNode進行排程

private CSAssignment assignContainersOnNode(Resource clusterResource,
    FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
    RMContainer reservedContainer, boolean needToUnreserve) {
  Resource assigned = Resources.none();
  // 如果ResourceName是NODE_LOCAL
  ResourceRequest nodeLocalResourceRequest =
      application.getResourceRequest(priority, node.getNodeName());
  if (nodeLocalResourceRequest != null) {
    assigned = 
        assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, 
            node, application, priority, reservedContainer, needToUnreserve); 
    if (Resources.greaterThan(resourceCalculator, clusterResource, 
        assigned, Resources.none())) {
      return new CSAssignment(assigned, NodeType.NODE_LOCAL);
    }
  }
  // 如果ResourceName是Rack-local
  ResourceRequest rackLocalResourceRequest =
      application.getResourceRequest(priority, node.getRackName());
  if (rackLocalResourceRequest != null) {
    if (!rackLocalResourceRequest.getRelaxLocality()) {
      return SKIP_ASSIGNMENT;
    }

    assigned = 
        assignRackLocalContainers(clusterResource, rackLocalResourceRequest, 
            node, application, priority, reservedContainer, needToUnreserve);
    if (Resources.greaterThan(resourceCalculator, clusterResource, 
        assigned, Resources.none())) {
      return new CSAssignment(assigned, NodeType.RACK_LOCAL);
    }
  }

  // 如果ResourceName是Off-switch,也就是ANY
  ResourceRequest offSwitchResourceRequest =
      application.getResourceRequest(priority, ResourceRequest.ANY);
  if (offSwitchResourceRequest != null) {
    if (!offSwitchResourceRequest.getRelaxLocality()) {
      return SKIP_ASSIGNMENT;
    }

    return new CSAssignment(
        assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
            node, application, priority, reservedContainer, needToUnreserve), 
            NodeType.OFF_SWITCH);
  }
  return SKIP_ASSIGNMENT;
}

assignContainersOnNode會根據請求中資源的型別進行不同的邏輯處理,由於am container中ResourceRequest為ANY,所以這裡只關注下Off-switch的處理邏輯,程式碼如下:

private Resource assignOffSwitchContainers(
    Resource clusterResource, ResourceRequest offSwitchResourceRequest,
    FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, 
    RMContainer reservedContainer, boolean needToUnreserve) {
  if (canAssign(application, priority, node, NodeType.OFF_SWITCH, 
      reservedContainer)) {
    return assignContainer(clusterResource, node, application, priority,
        offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
        needToUnreserve);
  }

  return Resources.none();
}

assignOffSwitchContainers又呼叫了assignContainer,繼續跟蹤

private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, 
    FiCaSchedulerApp application, Priority priority, 
    ResourceRequest request, NodeType type, RMContainer rmContainer,
    boolean needToUnreserve) {
  ...
  // container的資源大小  
  Resource capability = request.getCapability();
  // 節點可用的資源大小
  Resource available = node.getAvailableResource();
  // 節點總共資源大小
  Resource totalResource = node.getTotalResource();
  // 判斷請求的資源是否超過了節點的總量
  if (!Resources.fitsIn(capability, totalResource)) {
    LOG.warn("Node : " + node.getNodeID()
        + " does not have sufficient resource for request : " + request
        + " node total capability : " + node.getTotalResource());
    return Resources.none();
  }
  assert Resources.greaterThan(
      resourceCalculator, clusterResource, available, Resources.none());

  // Create the container if necessary
  // 生成containerId
  Container container = 
      getContainer(rmContainer, application, node, capability, priority);
  ...
  // 先判斷是否可以分配預留的container,
  // 可以分配正常的container時,才去判斷空閒的資源是否可以分配
  // Can we allocate a container on this node?
  int availableContainers = 
      resourceCalculator.computeAvailableContainers(available, capability);
  if (availableContainers > 0) {
    // Allocate...
    ...
    // Inform the application
    RMContainer allocatedContainer = 
        application.allocate(type, node, priority, request, container);

    // Does the application need this resource?
    if (allocatedContainer == null) {
      return Resources.none();
    }
    // 通知node進行分配,將container放入launchedContainers map中
    // Inform the node
    node.allocateContainer(allocatedContainer);

    LOG.info("assignedContainer" +
        " application attempt=" + application.getApplicationAttemptId() +
        " container=" + container + 
        " queue=" + this + 
        " clusterResource=" + clusterResource);

    return container.getResource();
  } else {
    // if we are allowed to allocate but this node doesn't have space, reserve it or
    // if this was an already a reserved container, reserve it again
    ...
    return Resources.none();
  }
}

assignContainer首先判斷container請求的資源是否超過了節點的總資源量,如果沒有超過呼叫getContainer檢視當前節點上是否有預留的container,沒有則createContainer,生成containerId。containerId生成之後,去判斷當前節點上的空閒資源能否夠分配,如果可以的話就呼叫application.allocate進行分配,application是FiCaSchedulerApp的物件。最後將container放入launchedContainers中,隨後會心跳返回給node。allocate程式碼如下:

synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
    Priority priority, ResourceRequest request, 
    Container container) {
  ...
  // container在RM端稱為RMcontainer
  // Create RMContainer
  RMContainer rmContainer = new RMContainerImpl(container, this
      .getApplicationAttemptId(), node.getNodeID(),
      appSchedulingInfo.getUser(), this.rmContext);

  // Add it to allContainers list.
  // 將生成的container放入allContainers list
  // 排程器在排程的時候從中取出container
  newlyAllocatedContainers.add(rmContainer);
  liveContainers.put(container.getId(), rmContainer);    

  // Update consumption and track allocations
  List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
      type, node, priority, request, container);
  Resources.addTo(currentConsumption, container.getResource());

  // Update resource requests related to "request" and store in RMContainer 
  ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);

  // Inform the container
  // 時間排程器來通知container已經準備好,觸發container狀態機
  rmContainer.handle(
      new RMContainerEvent(container.getId(), RMContainerEventType.START));
  ...
  return rmContainer;
}

allocate建立一個RMContainer,並將其放入allContainers列表newlyAllocatedContainers中,排程器從中取出container分配給node。

總結

大致的流程順著程式碼理解的差不多了,但一些細節還是沒有搞太清楚,隨後詳細debug下,在更新吧。
說下我目前的理解,排程器首先建立一個container請求,並檢視newlyAllocatedContainers中是否有可排程的container,如果有則建立該container的TOKEN。然後nm來進行心跳的時候,從requests中取出對應的請求進行例項化,隨後再放入newlyAllocatedContainers列表中,等待排程。