1. 程式人生 > >Job提交流程原始碼和切片原始碼詳解

Job提交流程原始碼和切片原始碼詳解

1. 進入Job提交方法 

public boolean waitForCompletion(boolean verbose

                                   ) throws IOException, InterruptedException,

                                            ClassNotFoundException {

// 判斷Job的狀態,如果

Runing,代表Job正在執行,不會重複提交

    if (state == JobState.DEFINE) {

      submit();

}

// 執行完後,列印執行的資訊

    if (verbose) {

      monitorAndPrintJob();

    } else {

      // get the completion poll interval from the client.

      int completionPollIntervalMillis =

        Job.getCompletionPollInterval(cluster.getConf());

      while (!isComplete()) {

        try {

          Thread.sleep(completionPollIntervalMillis);

        } catch (InterruptedException ie) {

        }

      }

    }

    return isSuccessful();

  }

 

 

 

1.1 提交Job到Cluster

public void submit()

         throws IOException, InterruptedException, ClassNotFoundException {

    ensureState(JobState.DEFINE);

setUseNewAPI();

// 建立Cluster物件,包含兩個關鍵屬性: ①檔案系統,負責讀入資料到程式,寫出資料,儲存結果  ②執行Job的客戶端,如果Job執行方式是Local,使用LocalJobRunner,如果Job執行方式是YARN,使用YarnRunner

    connect();

    final JobSubmitter submitter =

        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());

    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {

      public JobStatus run() throws IOException, InterruptedException,

      ClassNotFoundException {

        return submitter.submitJobInternal(Job.this, cluster);

      }

    });

    state = JobState.RUNNING;

    LOG.info("The url to track the job: " + getTrackingURL());

   }

 

 

1.2 建立Cluster

private synchronized void connect()

          throws IOException, InterruptedException, ClassNotFoundException {

  //根據使用者的configuration,建立相應的Cluster,負責執行Job

    if (cluster == null) {

      cluster =

        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {

                   public Cluster run()

                          throws IOException, InterruptedException,

                                 ClassNotFoundException {

                     return new Cluster(getConfiguration());

                   }

                 });

    }

  }

 

 

1.3 使用Submitter提交Job

JobStatus submitJobInternal(Job job, Cluster cluster)

  throws ClassNotFoundException, InterruptedException, IOException {

 

// 驗證輸出目錄是否合法和存在

    checkSpecs(job);

 

    Configuration conf = job.getConfiguration();

    addMRFrameworkToDistributedCache(conf);

    // 獲取當前Job作業區域的路徑

    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

    //configure the command line options correctly on the submitting dfs

    InetAddress ip = InetAddress.getLocalHost();

    if (ip != null) {

      submitHostAddress = ip.getHostAddress();

      submitHostName = ip.getHostName();

      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);

      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);

    }

    JobID jobId = submitClient.getNewJobID();

job.setJobID(jobId);

// 如果本地提交: 當前job的作業目錄在eclipse所在的工作空間,所在碟符的/tmp

// YARN上提交,需要HDFS來找/tmp

    Path submitJobDir = new Path(jobStagingArea, jobId.toString());

    JobStatus status = null;

    try {

      conf.set(MRJobConfig.USER_NAME,

          UserGroupInformation.getCurrentUser().getShortUserName());

      conf.set("hadoop.http.filter.initializers",

          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");

      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());

      LOG.debug("Configuring job " + jobId + " with " + submitJobDir

          + " as the submit dir");

      // get delegation token for the dir

      TokenCache.obtainTokensForNamenodes(job.getCredentials(),

          new Path[] { submitJobDir }, conf);

     

      populateTokenCache(conf, job.getCredentials());

 

      // generate a secret to authenticate shuffle transfers

      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {

        KeyGenerator keyGen;

        try {

          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);

          keyGen.init(SHUFFLE_KEY_LENGTH);

        } catch (NoSuchAlgorithmException e) {

          throw new IOException("Error generating shuffle secret key", e);

        }

        SecretKey shuffleKey = keyGen.generateKey();

        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),

            job.getCredentials());

      }

      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {

        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);

        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +

                "data spill is enabled");

      }

 

      copyAndConfigureFiles(job, submitJobDir);

 

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

     

      // Create the splits for the job

      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));

      // 切片操作,產生split檔案和splitinfo,是對切片和對切片的說明資訊

// split記錄了 當前輸入目錄中,所有檔案,切了幾片,每一片都是一個FileSplit物件

// splitinto對所有片資訊的說明,記錄了每一片,應該到哪個節點去讀取資料

      int maps = writeSplits(job, submitJobDir);

      // 設定mapreduce.job.maps 為切片數

      conf.setInt(MRJobConfig.NUM_MAPS, maps);

      LOG.info("number of splits:" + maps);

 

      // write "queue admins of the queue to which job is being submitted"

      // to job file.

      String queue = conf.get(MRJobConfig.QUEUE_NAME,

          JobConf.DEFAULT_QUEUE_NAME);

      AccessControlList acl = submitClient.getQueueAdmins(queue);

      conf.set(toFullPropertyName(queue,

          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

 

      // removing jobtoken referrals before copying the jobconf to HDFS

      // as the tasks don't need this setting, actually they may break

      // because of it if present as the referral will point to a

      // different job.

      TokenCache.cleanUpTokenReferral(conf);

 

      if (conf.getBoolean(

          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,

          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {

        // Add HDFS tracking ids

        ArrayList<String> trackingIds = new ArrayList<String>();

        for (Token<? extends TokenIdentifier> t :

            job.getCredentials().getAllTokens()) {

          trackingIds.add(t.decodeIdentifier().getTrackingId());

        }

        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,

            trackingIds.toArray(new String[trackingIds.size()]));

      }

 

      // Set reservation info if it exists

      ReservationId reservationId = job.getReservationId();

      if (reservationId != null) {

        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());

      }

 

      // Job所有的配置資訊,寫入到Job.xml中!

      writeConf(conf, submitJobFile);

     

      //

      // Now, actually submit the job (using the submit name)

      //

      printTokens(jobId, job.getCredentials());

      // 正式準備提交Job

      status = submitClient.submitJob(

          jobId, submitJobDir.toString(), job.getCredentials());

      if (status != null) {

        return status;

      } else {

        throw new IOException("Could not launch job");

      }

    } finally {

      if (status == null) {

        LOG.info("Cleaning up the staging area " + submitJobDir);

        if (jtFs != null && submitJobDir != null)

          jtFs.delete(submitJobDir, true);

 

      }

    }

  }

 

1.4 提交Job

public org.apache.hadoop.mapreduce.JobStatus submitJob(

      org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,

      Credentials credentials) throws IOException {

  // 根據之前的準備工作,重構Job

    Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);

    job.job.setCredentials(credentials);

    return job.status;

 

  }

 

1.5 建立LocalJobRunner可以執行的Job物件

public Job(JobID jobid, String jobSubmitDir) throws IOException {

      ……

    // 將之前已經生成的Job執行的各種設定,重新賦值給LocalJobRunner$Job

     // 開啟一個分執行緒來執行Job

      this.start();

    }

 

1.6 Job的run()

@Override

    public void run() {

      JobID jobId = profile.getJobID();

   // JobContext代表Job執行的上下文,可以獲取Job中所有的配置資訊

      JobContext jContext = new JobContextImpl(job, jobId);

     

      org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;

      try {

        outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);

      } catch (Exception e) {

        LOG.info("Failed to createOutputCommitter", e);

        return;

      }

     

      try {

       // 根據切片資訊,建立TaskSplitMetaInfo陣列,有幾片,陣列大小就是幾

        TaskSplitMetaInfo[] taskSplitMetaInfos =

          SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);

 

        int numReduceTasks = job.getNumReduceTasks();

        outputCommitter.setupJob(jContext);

        status.setSetupProgress(1.0f);

      // 指定儲存所有MapTask輸出目錄的位置

        Map<TaskAttemptID, MapOutputFile> mapOutputFiles =

            Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());

        // 建立執行的MapTask程序列表

        List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(

            taskSplitMetaInfos, jobId, mapOutputFiles);

             

        initCounters(mapRunnables.size(), numReduceTasks);

      // 建立一個執行緒池

        ExecutorService mapService = createMapExecutor();

      // 執行所有的MapTask , 需要檢視MapTaskRunablerun()

        runTasks(mapRunnables, mapService, "map");

 

        try {

          if (numReduceTasks > 0) {

            List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(

                jobId, mapOutputFiles);

            ExecutorService reduceService = createReduceExecutor();

            runTasks(reduceRunnables, reduceService, "reduce");

          }

        } finally {

          for (MapOutputFile output : mapOutputFiles.values()) {

            output.removeAll();

          }

        }

        // delete the temporary directory in output directory

        outputCommitter.commitJob(jContext);

        status.setCleanupProgress(1.0f);

 

        if (killed) {

          this.status.setRunState(JobStatus.KILLED);

        } else {

          this.status.setRunState(JobStatus.SUCCEEDED);

        }

 

        JobEndNotifier.localRunnerNotification(job, status);

      } catch (Throwable t) {

        try {

          outputCommitter.abortJob(jContext,

            org.apache.hadoop.mapreduce.JobStatus.State.FAILED);

        } catch (IOException ioe) {

          LOG.info("Error cleaning up job:" + id);

        }

        status.setCleanupProgress(1.0f);

        if (killed) {

          this.status.setRunState(JobStatus.KILLED);

        } else {

          this.status.setRunState(JobStatus.FAILED);

        }

        LOG.warn(id, t);

 

        JobEndNotifier.localRunnerNotification(job, status);

 

      } finally {

        try {

          fs.delete(systemJobFile.getParent(), true);  // delete submit dir

          localFs.delete(localJobFile, true);              // delete local copy

          // Cleanup distributed cache

          localDistributedCacheManager.close();

        } catch (IOException e) {

          LOG.warn("Error cleaning up "+id+": "+e);

        }

      }

    }

 

2. 進入Map階段

2.1 進入MapTaskRunable的run()

public void run() {

        try {

        // 生成當前Task任務的id

          TaskAttemptID mapId = new TaskAttemptID(new TaskID(

              jobId, TaskType.MAP, taskId), 0);

          LOG.info("Starting task: " + mapId);

          mapIds.add(mapId);

          MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,

            info.getSplitIndex(), 1);

          map.setUser(UserGroupInformation.getCurrentUser().

              getShortUserName());

          setupChildMapredLocalDirs(map, localConf);

          // 建立當前MapTask 輸出的檔案物件

          MapOutputFile mapOutput = new MROutputFiles();

          mapOutput.setConf(localConf);

          mapOutputFiles.put(mapId, mapOutput);

 

          map.setJobFile(localJobFile.toString());

          localConf.setUser(map.getUser());

          map.localizeConfiguration(localConf);

          map.setConf(localConf);

          try {

            map_tasks.getAndIncrement();

            myMetrics.launchMap(mapId);

           // 進入MapTaskrun()

            map.run(localConf, Job.this);

            myMetrics.completeMap(mapId);

          } finally {

            map_tasks.getAndDecrement();

          }

 

          LOG.info("Finishing task: " + mapId);

        } catch (Throwable e) {

          this.storedException = e;

        }

      }

    }

 

2.2 MapTask的run()

@Override

  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)

    throws IOException, ClassNotFoundException, InterruptedException {

    this.umbilical = umbilical;

     // 判斷是否需要reduce階段

   // Map階段,可以分為兩個階段:

 map:  呼叫Mapper的map()方法,對輸入的key-value進行處理

  // sort :  當map()處理完,context.wirte(),將key-value儲存到檔案中!

  //  在儲存到檔案之前,會將所有的key-value進行排序,會經過排序的階段

    if (isMapTask()) {

      // If there are no reducers then there won't be any sort. Hence the map

      // phase will govern the entire attempt's progress.

      if (conf.getNumReduceTasks() == 0) {

        mapPhase = getProgress().addPhase("map", 1.0f);

      } else {

        // If there are reducers then the entire attempt's progress will be

        // split between the map phase (67%) and the sort phase (33%).

        mapPhase = getProgress().addPhase("map", 0.667f);

        sortPhase  = getProgress().addPhase("sort", 0.333f);

      }

    }

    TaskReporter reporter = startReporter(umbilical);

 

    boolean useNewApi = job.getUseNewMapper();

    initialize(job, getJobID(), reporter, useNewApi);

 

    // check if it is a cleanupJobTask

    if (jobCleanup) {

      runJobCleanupTask(umbilical, reporter);

      return;

    }

    if (jobSetup) {

      runJobSetupTask(umbilical, reporter);

      return;

    }

    if (taskCleanup) {

      runTaskCleanupTask(umbilical, reporter);

      return;

    }

 

    if (useNewApi) {

      runNewMapper(job, splitMetaInfo, umbilical, reporter);

    } else {

      runOldMapper(job, splitMetaInfo, umbilical, reporter);

    }

    done(umbilical, reporter);

  }

 

2.3 執行Mapper

@SuppressWarnings("unchecked")

  private <INKEY,INVALUE,OUTKEY,OUTVALUE>

  void runNewMapper(final JobConf job,

                    final TaskSplitIndex splitIndex,

                    final TaskUmbilicalProtocol umbilical,

                    TaskReporter reporter

                    ) throws IOException, ClassNotFoundException,

                             InterruptedException {

// MapTask 的上下文物件

    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =

      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,

                                                                  getTaskID(),

                                                                  reporter);

    // 例項化Mapper物件

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =

      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)

        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

    // 獲取輸入格式

    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =

      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)

        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);

//重建當前的切片

    org.apache.hadoop.mapreduce.InputSplit split = null;

    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),

        splitIndex.getStartOffset());

    LOG.info("Processing split: " + split);

// 負責初始化RecordReader

// 在NewTrackingRecordReader構造方法中,為真正讀取記錄的RecordReader進行賦值

    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =

      new NewTrackingRecordReader<INKEY,INVALUE>

        (split, inputFormat, reporter, taskContext);

   

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

// output負責寫出MapTask產生的key-value

    org.apache.hadoop.mapreduce.RecordWriter output = null;

   

    // get an output object

    if (job.getNumReduceTasks() == 0) {

      output =

       // 如果沒用Reduce階段,使用一個直接輸出的記