1. 程式人生 > >YARN原始碼分析—AM-RM通訊協議,獲得資源

YARN原始碼分析—AM-RM通訊協議,獲得資源

在上幾篇博文中分析了YARN排程模擬器SLS的原始碼,重點分析了AM與RM之間的通訊協議。

接下來分析在YARN專案中,AM-RM通訊如何實現的。

注意點:在YARN中,真正已經實現的只有RM和NM,而AM和client只是提供了api,需要使用者自行實現。

而AM的主要功能是根據業務需求,從RM處申請資源,並利用這些資源完成業務邏輯,因此AM需要跟RM通訊,也需要跟NM通訊。

通訊協議:

AM-RM:ApplicationMasterProtocol

AM-NM: ContainerManagementProtocol

這兩個協議的定義在hadoop-yarn-api中

AM-RM協議:

AM與RM通過ApplicationMasterProtocol協議進行通訊,該協議提供了幾種方法:

1.向RM註冊AM

public RegisterApplicationMasterResponse registerApplicationMaster(
      RegisterApplicationMasterRequest request) 
  throws YarnException, IOException;

2.告訴RM,應用結束

 public FinishApplicationMasterResponse finishApplicationMaster(
      FinishApplicationMasterRequest request) 
  throws YarnException, IOException;

3.向RM請求資源

  public AllocateResponse allocate(AllocateRequest request) 
  throws YarnException, IOException;

客戶端(client)向RM提交應用後,RM會分配一定的資源啟動AM,AM啟動後呼叫ApplicationMasterProtocol的registerApplicationMaster方法向RM註冊自己。完成註冊後,呼叫allocate方法向RM申請執行任務的資源。   獲取資源後,通過與NM的通訊協議:ContainerManagementProtocol啟動資源容器,完成任務。完成後,通過ApplicationMasterProtocol的finishApplicationMaster方法向RM彙報應用結束,並登出AM。

接下來詳細看下AM向RM請求資源的過程:

1.AM向RM註冊

AM通過allocate方法向RM申請或釋放資源。資訊被封裝為AllocateRequest裡。

舉個例子:hadoop實現了MR的例子。

org.apache.hadoop.mapreduce.v2.app.MRAppMaster.java

在serviceinit函式中,首先建立job也就是任務。

@Override
  protected void serviceInit(final Configuration conf) throws Exception {
    // create the job classloader if enabled
    createJobClassLoader(conf);

    initJobCredentialsAndUGI(conf);
......
      if(!stagingExists) {
        isLastAMRetry = true;
        LOG.info("Attempt num: " + appAttemptID.getAttemptId() +
            " is last retry: " + isLastAMRetry +
            " because the staging dir doesn't exist.");
        errorHappenedShutDown = true;
        forcedState = JobStateInternal.ERROR;
        shutDownMessage = "Staging dir does not exist " + stagingDir;
        LOG.error(shutDownMessage);
      } else if (commitStarted) {
        //A commit was started so this is the last time, we just need to know
        // what result we will use to notify, and how we will unregister
        errorHappenedShutDown = true;
        isLastAMRetry = true;
        LOG.info("Attempt num: " + appAttemptID.getAttemptId() +
            " is last retry: " + isLastAMRetry +
            " because a commit was started.");
        copyHistory = true;
        if (commitSuccess) {
          shutDownMessage =
              "Job commit succeeded in a prior MRAppMaster attempt " +
              "before it crashed. Recovering.";
          forcedState = JobStateInternal.SUCCEEDED;
        } else if (commitFailure) {
          shutDownMessage =
              "Job commit failed in a prior MRAppMaster attempt " +
              "before it crashed. Not retrying.";
          forcedState = JobStateInternal.FAILED;
        } else {
          if (isCommitJobRepeatable()) {
            // cleanup previous half done commits if committer supports
            // repeatable job commit.
            errorHappenedShutDown = false;
            cleanupInterruptedCommit(conf, fs, startCommitFile);
          } else {
            //The commit is still pending, commit error
            shutDownMessage =
                "Job commit from a prior MRAppMaster attempt is " +
                "potentially in progress. Preventing multiple commit executions";
            forcedState = JobStateInternal.ERROR;
          }
        }
      }
......
      // service to allocate containers from RM (if non-uber) or to fake it (uber)
      containerAllocator = createContainerAllocator(null, context);
      addIfService(containerAllocator);
      dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);

然後最後從RM分配容器資源。通過呼叫createContainerAllocator函式實現。

該函式實現如下:

  protected ContainerAllocator createContainerAllocator(
      final ClientService clientService, final AppContext context) {
    return new ContainerAllocatorRouter(clientService, context);
  }

new了ContainerAllocatorRouter類並返回。

ContainerAllocatorRouter實現如下:

  private final class ContainerAllocatorRouter extends AbstractService
      implements ContainerAllocator, RMHeartbeatHandler {
    private final ClientService clientService;
    private final AppContext context;
    private ContainerAllocator containerAllocator;

    ContainerAllocatorRouter(ClientService clientService,
        AppContext context) {
      super(ContainerAllocatorRouter.class.getName());
      this.clientService = clientService;
      this.context = context;
    }

    @Override
    protected void serviceStart() throws Exception {
      if (job.isUber()) {
        MRApps.setupDistributedCacheLocal(getConfig());
        this.containerAllocator = new LocalContainerAllocator(
            this.clientService, this.context, nmHost, nmPort, nmHttpPort
            , containerID);
      } else {
        this.containerAllocator = new RMContainerAllocator(
            this.clientService, this.context, preemptionPolicy);
      }
      ((Service)this.containerAllocator).init(getConfig());
      ((Service)this.containerAllocator).start();
      super.serviceStart();
    }

    @Override
    protected void serviceStop() throws Exception {
      ServiceOperations.stop((Service) this.containerAllocator);
      super.serviceStop();
    }

    @Override
    public void handle(ContainerAllocatorEvent event) {
      this.containerAllocator.handle(event);
    }

    public void setSignalled(boolean isSignalled) {
      ((RMCommunicator) containerAllocator).setSignalled(isSignalled);
    }
    
    public void setShouldUnregister(boolean shouldUnregister) {
      ((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister);
    }

    @Override
    public long getLastHeartbeatTime() {
      return ((RMCommunicator) containerAllocator).getLastHeartbeatTime();
    }

    @Override
    public void runOnNextHeartbeat(Runnable callback) {
      ((RMCommunicator) containerAllocator).runOnNextHeartbeat(callback);
    }
  }

ContainerAllocatorRouter返回後,serviceinit繼續往下執行,將返回的該類服務加入到schedule中:

 addIfService(containerAllocator);

然後會啟動該類的服務:serviceStart

  @Override
    protected void serviceStart() throws Exception {
      if (job.isUber()) {
        MRApps.setupDistributedCacheLocal(getConfig());
        this.containerAllocator = new LocalContainerAllocator(
            this.clientService, this.context, nmHost, nmPort, nmHttpPort
            , containerID);
      } else {
        this.containerAllocator = new RMContainerAllocator(
            this.clientService, this.context, preemptionPolicy);
      }
      ((Service)this.containerAllocator).init(getConfig());
      ((Service)this.containerAllocator).start();
      super.serviceStart();
    }

可以看到該類服務會判斷job型別,是uber or not。uber是啥(hadoop針對小規模MR的本地模式,均在一個jvm中執行。可以理解為本地模式)。

如果不是uber,則new一個RMContainerAllocator,然後init和start這個類。

看下這個類:

org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator.java

建構函式:用傳入的引數進行成員變數初始化

  public RMContainerAllocator(ClientService clientService, AppContext context,
      AMPreemptionPolicy preemptionPolicy) {
    super(clientService, context);
    this.preemptionPolicy = preemptionPolicy;
    this.stopped = new AtomicBoolean(false);
    this.clock = context.getClock();
    this.assignedRequests = createAssignedRequests();
  }

(Service)this.containerAllocator).init(getConfig());中的init函式:

@Override
  protected void serviceInit(Configuration conf) throws Exception {
    super.serviceInit(conf);
    reduceSlowStart = conf.getFloat(
        MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 
        DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART);
    maxReduceRampupLimit = conf.getFloat(
        MRJobConfig.MR_AM_JOB_REDUCE_RAMPUP_UP_LIMIT, 
        MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_RAMP_UP_LIMIT);
    maxReducePreemptionLimit = conf.getFloat(
        MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
        MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
    reducerUnconditionalPreemptionDelayMs = 1000 * conf.getInt(
        MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC,
        MRJobConfig.DEFAULT_MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC);
    reducerNoHeadroomPreemptionDelayMs = conf.getInt(
        MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
        MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
    maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT,
        MRJobConfig.DEFAULT_JOB_RUNNING_MAP_LIMIT);
    maxRunningReduces = conf.getInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT,
        MRJobConfig.DEFAULT_JOB_RUNNING_REDUCE_LIMIT);
    RackResolver.init(conf);
    retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
                                MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
    mapNodeLabelExpression = conf.get(MRJobConfig.MAP_NODE_LABEL_EXP);
    reduceNodeLabelExpression = conf.get(MRJobConfig.REDUCE_NODE_LABEL_EXP);
    // Init startTime to current time. If all goes well, it will be reset after
    // first attempt to contact RM.
    retrystartTime = System.currentTimeMillis();
    this.scheduledRequests.setNumOpportunisticMapsPercent(
        conf.getInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PERCENT,
            MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENT));
    LOG.info(this.scheduledRequests.getNumOpportunisticMapsPercent() +
        "% of the mappers will be scheduled using OPPORTUNISTIC containers");
  }

對所需容器的一些引數進行配置,比如心跳時間,map或reduce等。

((Service)this.containerAllocator).start():start函式

  @Override
  protected void serviceStart() throws Exception {
    this.eventHandlingThread = new Thread() {
      @SuppressWarnings("unchecked")
      @Override
      public void run() {

        ContainerAllocatorEvent event;

        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
          try {
            event = RMContainerAllocator.this.eventQueue.take();
          } catch (InterruptedException e) {
            if (!stopped.get()) {
              LOG.error("Returning, interrupted : " + e);
            }
            return;
          }

          try {
            handleEvent(event);
          } catch (Throwable t) {
            LOG.error("Error in handling event type " + event.getType()
                + " to the ContainreAllocator", t);
            // Kill the AM
            eventHandler.handle(new JobEvent(getJob().getID(),
              JobEventType.INTERNAL_ERROR));
            return;
          }
        }
      }
    };
    this.eventHandlingThread.start();
    super.serviceStart();
  }

服務啟動後,會進入心跳模式。

迴圈執行類中的heartbeat方法:

 @Override
  protected synchronized void heartbeat() throws Exception {
    scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
    List<Container> allocatedContainers = getResources();
    if (allocatedContainers != null && allocatedContainers.size() > 0) {
      scheduledRequests.assign(allocatedContainers);
    }

    int completedMaps = getJob().getCompletedMaps();
    int completedTasks = completedMaps + getJob().getCompletedReduces();
    if ((lastCompletedTasks != completedTasks) ||
          (scheduledRequests.maps.size() > 0)) {
      lastCompletedTasks = completedTasks;
      recalculateReduceSchedule = true;
    }

    if (recalculateReduceSchedule) {
      boolean reducerPreempted = preemptReducesIfNeeded();

      if (!reducerPreempted) {
        // Only schedule new reducers if no reducer preemption happens for
        // this heartbeat
        scheduleReduces(getJob().getTotalMaps(), completedMaps,
            scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
            assignedRequests.maps.size(), assignedRequests.reduces.size(),
            mapResourceRequest, reduceResourceRequest, pendingReduces.size(),
            maxReduceRampupLimit, reduceSlowStart);
      }

      recalculateReduceSchedule = false;
    }

    scheduleStats.updateAndLogIfChanged("After Scheduling: ");
  }

在該方法中,最重要的是第一行,獲得資源:

List<Container> allocatedContainers = getResources();

獲得資源後,加入到schedule的assign分配中:

   if (allocatedContainers != null && allocatedContainers.size() > 0) {
      scheduledRequests.assign(allocatedContainers);
    }

接下來逐段具體分析getResources函式:

 @SuppressWarnings("unchecked")
  private List<Container> getResources() throws Exception {
    applyConcurrentTaskLimits();

    // will be null the first time
    Resource headRoom = Resources.clone(getAvailableResources());
    AllocateResponse response;
    /*
     * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
     * milliseconds before aborting. During this interval, AM will still try
     * to contact the RM.
     */
    try {
      response = makeRemoteRequest();
      // Reset retry count if no exception occurred.
      retrystartTime = System.currentTimeMillis();

這裡最重要的就是response = makeRemoteRequest(),這是AM向RM通訊索取資源的關鍵方法。

接著就是try後面進行catch捕捉異常

  } catch (ApplicationAttemptNotFoundException e ) {
      // This can happen if the RM has been restarted. If it is in that state,
      // this application must clean itself up.
      eventHandler.handle(new JobEvent(this.getJob().getID(),
        JobEventType.JOB_AM_REBOOT));
      throw new RMContainerAllocationException(
        "Resource Manager doesn't recognize AttemptId: "
            + this.getContext().getApplicationAttemptId(), e);
    } catch (ApplicationMasterNotRegisteredException e) {
      LOG.info("ApplicationMaster is out of sync with ResourceManager,"
          + " hence resync and send outstanding requests.");
      // RM may have restarted, re-register with RM.
      lastResponseID = 0;
      register();
      addOutstandingRequestOnResync();
      return null;
    } catch (InvalidLabelResourceRequestException e) {
      // If Invalid label exception is received means the requested label doesnt
      // have access so killing job in this case.
      String diagMsg = "Requested node-label-expression is invalid: "
          + StringUtils.stringifyException(e);
      LOG.info(diagMsg);
      JobId jobId = this.getJob().getID();
      eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
      eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
      throw e;
    } catch (Exception e) {
      // This can happen when the connection to the RM has gone down. Keep
      // re-trying until the retryInterval has expired.
      if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
        LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
        eventHandler.handle(new JobEvent(this.getJob().getID(),
                                         JobEventType.JOB_AM_REBOOT));
        throw new RMContainerAllocationException("Could not contact RM after " +
                                retryInterval + " milliseconds.");
      }
      // Throw this up to the caller, which may decide to ignore it and
      // continue to attempt to contact the RM.
      throw e;
    }

也就是捕捉response = makeRemoteRequest返回的異常,比如資源不夠,資源分配出錯等。

接著:

 Resource newHeadRoom = getAvailableResources();
 List<Container> newContainers = response.getAllocatedContainers();

這是將response(回覆)中的已分配好的container資源拎出來賦給List<Container> newContainers。這就是AM所需要的資源。

接著:

   // Setting NMTokens
    if (response.getNMTokens() != null) {
      for (NMToken nmToken : response.getNMTokens()) {
        NMTokenCache.setNMToken(nmToken.getNodeId().toString(),
            nmToken.getToken());
      }
    }

    // Setting AMRMToken
    if (response.getAMRMToken() != null) {
      updateAMRMToken(response.getAMRMToken());
    }

    List<ContainerStatus> finishedContainers =
        response.getCompletedContainersStatuses();

    // propagate preemption requests
    final PreemptionMessage preemptReq = response.getPreemptionMessage();
    if (preemptReq != null) {
      preemptionPolicy.preempt(
          new PreemptionContext(assignedRequests), preemptReq);
    }

    if (newContainers.size() + finishedContainers.size() > 0
        || !headRoom.equals(newHeadRoom)) {
      //something changed
      recalculateReduceSchedule = true;
      if (LOG.isDebugEnabled() && !headRoom.equals(newHeadRoom)) {
        LOG.debug("headroom=" + newHeadRoom);
      }
    }

    if (LOG.isDebugEnabled()) {
      for (Container cont : newContainers) {
        LOG.debug("Received new Container :" + cont);
      }
    }

    //Called on each allocation. Will know about newly blacklisted/added hosts.
    computeIgnoreBlacklisting();

    handleUpdatedNodes(response);
    handleJobPriorityChange(response);
    // Handle receiving the timeline collector address and token for this app.
    MRAppMaster.RunningAppContext appContext =
        (MRAppMaster.RunningAppContext)this.getContext();
    if (appContext.getTimelineV2Client() != null) {
      appContext.getTimelineV2Client().
          setTimelineCollectorInfo(response.getCollectorInfo());
    }
    for (ContainerStatus cont : finishedContainers) {
      processFinishedContainer(cont);
    }
    return newContainers;
  }

進行一些處理後,返回newContainers。

接下來對getResources方法中呼叫到的makeRemoteRequest方法進行分析:

org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.java

makeRemoteRequest方法:

  protected AllocateRespo makeRemoteRequestnse () throws YarnException,
      IOException {
    applyRequestLimits();
    ResourceBlacklistRequest blacklistRequest =
        ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
            new ArrayList<String>(blacklistRemovals));
    AllocateRequest allocateRequest =
        AllocateRequest.newInstance(lastResponseID,
          super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
          new ArrayList<ContainerId>(release), blacklistRequest);
    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
    lastResponseID = allocateResponse.getResponseId();
    availableResources = allocateResponse.getAvailableResources();
    lastClusterNmCount = clusterNmCount;
    clusterNmCount = allocateResponse.getNumClusterNodes();
    int numCompletedContainers =
        allocateResponse.getCompletedContainersStatuses().size();

    if (ask.size() > 0 || release.size() > 0) {
      LOG.info("getResources() for " + applicationId + ":" + " ask="
          + ask.size() + " release= " + release.size() + " newContainers="
          + allocateResponse.getAllocatedContainers().size()
          + " finishedContainers=" + numCompletedContainers
          + " resourcelimit=" + availableResources + " knownNMs="
          + clusterNmCount);
    }

    ask.clear();
    release.clear();

    if (numCompletedContainers > 0) {
      // re-send limited requests when a container completes to trigger asking
      // for more containers
      requestLimitsToUpdate.addAll(requestLimits.keySet());
    }

    if (blacklistAdditions.size() > 0 || blacklistRemovals.size() > 0) {
      LOG.info("Update the blacklist for " + applicationId +
          ": blacklistAdditions=" + blacklistAdditions.size() +
          " blacklistRemovals=" +  blacklistRemovals.size());
    }
    blacklistAdditions.clear();
    blacklistRemovals.clear();
    return allocateResponse;
  }

定義一個AllocateRequest物件,呼叫newInstance例項化,並加入了一個getApplicationProgress方法。

    AllocateRequest allocateRequest =
        AllocateRequest.newInstance(lastResponseID,
          super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
          new ArrayList<ContainerId>(release), blacklistRequest);

這樣就構造完成一個標準的allocateRequest物件。可以傳送給RM了。

接著,呼叫scheduler.allocate將請求加入排程中,返回值為標準的RM返回格式AllocateResponse。

這裡的scheduler.allocate(allocateRequest)是不是似曾相識的感覺?

這裡就是最上面提到的ApplicationMasterProtocol協議的第三個方法:

 public AllocateResponse allocate(AllocateRequest request) 

AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);

這就完成了AM向RM請求資源和回覆資源。

然後將allocateResponse返回即可。

最後來詳細分析下AllocateRequest類和AllocateResponse類。

AllocateRequest:AM向RM請求資源的標準包

org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.java

AllocateRequest包格式如下:

1. responseID,相應的ID,用於區分重複相應

2. appProgress,程序的進度

3. askList(List<ResourceRequest> resourceAsk),AM向RM請求的資源列表,是一個List<ResourceRequest> 物件。其中ResourceRequest是一個資源請求的詳細引數,包括容器個數,容器容量,分配策略等。

hadoop-yarn-api/src/main/proto/yarn_protos.proto

ResourceRequest

message ResourceRequestProto {
  optional PriorityProto priority = 1;
  optional string resource_name = 2;
  optional ResourceProto capability = 3;
  optional int32 num_containers = 4;
  optional bool relax_locality = 5 [default = true];
  optional string node_label_expression = 6;
  optional ExecutionTypeRequestProto execution_type_request = 7;
  optional int64 allocation_request_id = 8 [default = -1];
}

4. resourceBlacklistRequest,要新增或者刪除的資源黑名單

AllocateResponse類:RM向AM回覆的資源包

org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse.java

包內容包括:

1.responseId,回覆的ID,避免重複響應

2.numClusterNodes,叢集規模大小

3.completedContainersStatuses,已完成的容器狀態列表

4. allocatedContainers,RM新分配的資源給AM,這些資源封裝在Container類中,因此返回型別通常為List<Container>

Container組成:

org.apache.hadoop.yarn.api.records.Container.java

  public static Container newInstance(ContainerId containerId, NodeId nodeId,
      String nodeHttpAddress, Resource resource, Priority priority,
      Token containerToken, ExecutionType executionType) {
    Container container = Records.newRecord(Container.class);
    container.setId(containerId);
    container.setNodeId(nodeId);
    container.setNodeHttpAddress(nodeHttpAddress);
    container.setResource(resource);
    container.setPriority(priority);
    container.setContainerToken(containerToken);
    container.setExecutionType(executionType);
    return container;
  }

可以看到container組成包括:

a. container ID

b. Node ID

c. nodeHttpAddress:節點http的地址

d. resource:為Resource類,格式<mem, vcores>

e. priority: 優先順序

f. containerToken: 容器令牌

g. executionType: 容器執行的型別

5.updatedNodes,狀態被更新過的所有節點列表,每個節點的更新資訊存放在NodeReport類中,因此返回型別通常為List<NodeReport>

6.amCommand,RM給AM傳送的控制命令,包括重連和關閉。

7.preemptionMessage,資源搶佔資訊,包括兩部分:強制回收部分和可自主調配部分

8.nmTokens,AM與NM之間的通訊令牌

over