1. 程式人生 > >原始碼走讀-Yarn-ResourceManager04-MR任務提交-客戶端側分析

原始碼走讀-Yarn-ResourceManager04-MR任務提交-客戶端側分析

0x05 RM排程-MR任務提交-客戶端側分析

5.1 mapreduce.job

org.apache.hadoop.mapreduce.Job

我們都知道,MR任務的一般結尾會有一句話是job.waitForCompletion(true),這行程式碼意思是提交任務並等待結束。我們的分析就從這裡入手:

public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if
(state == Job.JobState.DEFINE) { //提交任務 submit(); } if (verbose) { //監控任務執行,持續列印輸出,直到任務完成(成功或失敗) this.monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = getCompletionPollInterval(this.cluster.getConf()); while
(!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); }

下面看看submit方法:

// 提交任務都叢集,然後立刻返回
public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    connect();
    // 建立JobSubmitter
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { // 提交job return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }

可以看到使用了JobSubmitter類,下面接著看。

5.2 JobSubmitter

org.apache.hadoop.mapreduce.JobSubmitter

接著看submitter.submitJobInternal方法。由於裡面程式碼太多,我這裡只寫出最關鍵的幾句:

  // 獲取jobId
  JobID jobId = submitClient.getNewJobID();
  job.setJobID(jobId);

  // 提交job
  status = this.submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

可以看到,這裡先獲取jobID把,再job提交到了submitClient,他是一個ClientProtocol介面的實現類。上面的程式碼看了以後我們知道,現在的主要工作是兩步:獲取JobId然後提交Job。下面我們分開講下這兩個流程:因為我們是提交到Yarn跑任務的,所以實際使用的是YARNRunner

5.3 獲取JobID

5.3.1 YARNRunner

org.apache.hadoop.mapred.YARNRunner

先看看前面使用的submitClient.getNewJobID():

//這裡是呼叫了resMgrDelegate.getNewJobID來獲取jobId
@Override
  public JobID getNewJobID() throws IOException, InterruptedException {
    return resMgrDelegate.getNewJobID();
  }

上面我們看到是用了resMgrDelegate,那繼續看看這個ResourceMgrDelegate是啥:

5.3.2 ResourceMgrDelegate

ResourceMgrDelegate的部分程式碼:

public class ResourceMgrDelegate extends YarnClient{
  private YarnConfiguration conf;
  private ApplicationSubmissionContext application;
  private ApplicationId applicationId;
  @Private
  @VisibleForTesting
  protected YarnClient client;
  private Text rmDTService;

  /**
   * Delegate responsible for communicating with the Resource Manager's
   * {@link ApplicationClientProtocol}.
   * 被委託負責用ApplicationClientProtocol協議和RM通訊互動
   * @param conf the configuration object.
   */
  public ResourceMgrDelegate(YarnConfiguration conf) {
    super(ResourceMgrDelegate.class.getName());
    this.conf = conf;
    this.client = YarnClient.createYarnClient();
    init(conf);
    start();
  }

   @Override
  protected void serviceInit(Configuration conf) throws Exception {
    client.init(conf);
    super.serviceInit(conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    client.start();
    super.serviceStart();
  }

  @Override
  protected void serviceStop() throws Exception {
    client.stop();
    super.serviceStop();
  }

  //獲取一個新的JobId
  public JobID getNewJobID() throws IOException, InterruptedException {
    try {
      // 這裡的client是一個YarnClientImpl例項
      // 獲取一個ApplicationSubmissionContext 
      this.application = client.createApplication().getApplicationSubmissionContext();
      // 獲取applicationId
      this.applicationId = this.application.getApplicationId();
      return TypeConverter.fromYarn(applicationId);
    } catch (YarnException e) {
      throw new IOException(e);
    }
  }
}

看到serviceInit serviceStart等方法有沒有很熟悉?對他的父類YarnClient就是繼承自AbstractService。 這裡有用YarnClientImplApplicationSubmissionContext,我們在後面介紹。

5.3.3 YarnClientImpl

org.apache.hadoop.yarn.client.api.impl.YarnClientImpl

我們先看看前面提到的ResourceMgrDelegate中用的client.createApplication方法:

@Override
  public YarnClientApplication createApplication()
      throws YarnException, IOException {
    // 生成一個ApplicationSubmissionContextPBImpl例項
    // ApplicationSubmissionContext 表示所有RM為應用程式啟動AM所需的所有資訊
    ApplicationSubmissionContext context = Records.newRecord
        (ApplicationSubmissionContext.class);
    // 獲得一個新應用程式的返回資訊
    GetNewApplicationResponse newApp = getNewApplication();
    ApplicationId appId = newApp.getApplicationId();
    // 將appId儲存到ApplicationSubmissionContext中
    context.setApplicationId(appId);
    // 拿到appId後例項化一個封裝了applictionResponse和context的物件
    return new YarnClientApplication(newApp, context);
  }

接著看看getNewApplication方法:

private GetNewApplicationResponse getNewApplication()
      throws YarnException, IOException {
    GetNewApplicationRequest request =
        Records.newRecord(GetNewApplicationRequest.class);
    // 這裡的rmClient是關鍵
    return rmClient.getNewApplication(request);
  }

上面方法中的rmClient是實現了ApplicationClientProtocol介面的類,下面看看rmClient.getNewApplication方法。

5.3.4 ApplicationClientProtocolPBClientImpl

org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl

/**
 * 這個方法繼承自ApplicationClientProtocol,註釋如下:
 * 這個方法是客戶端用來獲取一個新的ApplicationId,然後用它來提交新的application
 * 該方法執行後,RM會在返回一個GetNewApplicationResponse,
 * 他包含了一個新的、單調遞增的ApplicationId以及一些詳細的叢集資訊如最大資源容量
 * 
 * 也就是說這裡只是獲取appId,不會真正執行app
 */
@Override
  public GetNewApplicationResponse getNewApplication(
      GetNewApplicationRequest request) throws YarnException,
      IOException {
    GetNewApplicationRequestProto requestProto =
        ((GetNewApplicationRequestPBImpl) request).getProto();
    try {
      return new GetNewApplicationResponsePBImpl(proxy.getNewApplication(null,
        requestProto));
    } catch (ServiceException e) {
      RPCUtil.unwrapAndThrowException(e);
      return null;
    }
  }

到這裡,客戶端側的獲取JobId流程就介紹完了。下面,我們接著講SubmitJob流程。

5.4 提交Job

5.4.1 YARNRunner

下面接著看JobSubmitter中呼叫的YARNRunner的submitJob方法:

@Override
  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
    addHistoryToken(ts);
    // 將所需資訊構建為appContext,來為開啟 MR 的 ApplicationMaster做準備
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    // 提交給ResourceManager
    try {
        //這裡就是剛才ResourceMgrDelegate.getNewJobId獲取到的applicationId
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);

        // appMaster資訊報告
      ApplicationReport appMaster = resMgrDelegate
          .getApplicationReport(applicationId);
      String diagnostics =
          (appMaster == null ?
              "application report is null" : appMaster.getDiagnostics());
      if (appMaster == null
          || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
          || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
        throw new IOException("Failed to run job : " +
            diagnostics);
      }
      return clientCache.getClient(jobId).getJobStatus(jobId);
    } catch (YarnException e) {
      throw new IOException(e);
    }
  }

到這裡,就介紹完了YARNRunner中的submitJob兩個方法,依然是mapreduce包中的程式碼。下面的開始進入yarn包。

5.4.2 YarnClientImpl

直接看submitApplication方法:

  @Override
  public ApplicationId submitApplication(ApplicationSubmissionContext appContext)
          throws YarnException, IOException {
    ApplicationId applicationId = appContext.getApplicationId();
    if (applicationId == null) {
      throw new ApplicationIdNotProvidedException(
          "ApplicationId is not provided in ApplicationSubmissionContext");
    }
    SubmitApplicationRequest request =
        Records.newRecord(SubmitApplicationRequest.class);
    request.setApplicationSubmissionContext(appContext);

    // Automatically add the timeline DT into the CLC
    // Only when the security and the timeline service are both enabled
    if (isSecurityEnabled() && timelineServiceEnabled) {
      addTimelineDelegationToken(appContext.getAMContainerSpec());
    }

    // 提交application的請求
    rmClient.submitApplication(request);

    int pollCount = 0;
    long startTime = System.currentTimeMillis();
    EnumSet<YarnApplicationState> waitingStates = 
                                 EnumSet.of(YarnApplicationState.NEW,
                                 YarnApplicationState.NEW_SAVING,
                                 YarnApplicationState.SUBMITTED);
    EnumSet<YarnApplicationState> failToSubmitStates = 
                                  EnumSet.of(YarnApplicationState.FAILED,
                                  YarnApplicationState.KILLED);     
    while (true) {
      try {
        ApplicationReport appReport = getApplicationReport(applicationId);
        YarnApplicationState state = appReport.getYarnApplicationState();
        if (!waitingStates.contains(state)) {
          if(failToSubmitStates.contains(state)) {
            throw new YarnException("Failed to submit " + applicationId + 
                " to YARN : " + appReport.getDiagnostics());
          }
          LOG.info("Submitted application " + applicationId);
          break;
        }

        long elapsedMillis = System.currentTimeMillis() - startTime;
        if (enforceAsyncAPITimeout() &&
            elapsedMillis >= asyncApiPollTimeoutMillis) {
          throw new YarnException("Timed out while waiting for application " +
              applicationId + " to be submitted successfully");
        }

        // Notify the client through the log every 10 poll, in case the client
        // is blocked here too long.
        if (++pollCount % 10 == 0) {
          LOG.info("Application submission is not finished, " +
              "submitted application " + applicationId +
              " is still in " + state);
        }
        try {
          Thread.sleep(submitPollIntervalMillis);
        } catch (InterruptedException ie) {
          LOG.error("Interrupted while waiting for application "
              + applicationId
              + " to be successfully submitted.");
        }
      } catch (ApplicationNotFoundException ex) {
        // FailOver or RM restart happens before RMStateStore saves
        // ApplicationState
        LOG.info("Re-submit application " + applicationId + "with the " +
            "same ApplicationSubmissionContext");
        rmClient.submitApplication(request);
      }
    }

    return applicationId;
  }

5.4.3 ApplicationClientProtocolPBClientImpl

 @Override
  public SubmitApplicationResponse submitApplication(
      SubmitApplicationRequest request) throws YarnException,
      IOException {
    SubmitApplicationRequestProto requestProto =
        ((SubmitApplicationRequestPBImpl) request).getProto();
    try {
      return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null,
        requestProto));
    } catch (ServiceException e) {
      RPCUtil.unwrapAndThrowException(e);
      return null;
    }
  }