1. 程式人生 > >hadoop2 作業執行過程之作業提交

hadoop2 作業執行過程之作業提交

hadoop任務的提交常用的兩種,一種是測試常用的IDE遠端提交,另一種就是生產上用的客戶端命令列提交

通用的任務程式提交步驟為:

1.將程式打成jar包;

2.將jar包上傳到HDFS上;

3.用命令列提交HDFS上的任務程式。

跟著提交步驟從命令列提交開始

最簡單的提交命令應該如:

hadoop jar /home/hadoop/hadoop-2.2.0/hadoop-examples.jar wordcount inputPath outputPath

在名為hadoop的shell 命令檔案中當引數為jar時

確定了要執行的CLASS檔案和環境變數後最後執行了了exec命令來執行

看org.apache.hadoop.util.RunJar類的main方法

複製程式碼
 1 public static void main(String[] args) throws Throwable {
 2     String usage = "RunJar jarFile [mainClass] args...";
 3     //驗證提交的引數數量
 4     if (args.length < 1) {
 5       System.err.println(usage);
 6       System.exit(-1);
 7     }
 8     //驗證jar檔案是否存在
 9     int firstArg = 0;
10     String fileName = args[firstArg++];
11 File file = new File(fileName); 12 if (!file.exists() || !file.isFile()) { 13 System.err.println("Not a valid JAR: " + file.getCanonicalPath()); 14 System.exit(-1); 15 } 16 String mainClassName = null; 17 18 JarFile jarFile; 19 try { 20 jarFile = new JarFile(fileName);
21 } catch(IOException io) { 22 throw new IOException("Error opening job jar: " + fileName) 23 .initCause(io); 24 } 25 //驗證是否存在main方法 26 Manifest manifest = jarFile.getManifest(); 27 if (manifest != null) { 28 mainClassName = manifest.getMainAttributes().getValue("Main-Class"); 29 } 30 jarFile.close(); 31 32 if (mainClassName == null) { 33 if (args.length < 2) { 34 System.err.println(usage); 35 System.exit(-1); 36 } 37 mainClassName = args[firstArg++]; 38 } 39 mainClassName = mainClassName.replaceAll("/", "."); 40 //設定臨時目錄並驗證 41 File tmpDir = new File(new Configuration().get("hadoop.tmp.dir")); 42 ensureDirectory(tmpDir); 43 44 final File workDir; 45 try { 46 workDir = File.createTempFile("hadoop-unjar", "", tmpDir); 47 } catch (IOException ioe) { 48 // If user has insufficient perms to write to tmpDir, default 49 // "Permission denied" message doesn't specify a filename. 50 System.err.println("Error creating temp dir in hadoop.tmp.dir " 51 + tmpDir + " due to " + ioe.getMessage()); 52 System.exit(-1); 53 return; 54 } 55 56 if (!workDir.delete()) { 57 System.err.println("Delete failed for " + workDir); 58 System.exit(-1); 59 } 60 ensureDirectory(workDir); 61 //增加刪除工作目錄的鉤子,任務執行完後要刪除 62 ShutdownHookManager.get().addShutdownHook( 63 new Runnable() { 64 @Override 65 public void run() { 66 FileUtil.fullyDelete(workDir); 67 } 68 }, SHUTDOWN_HOOK_PRIORITY); 69 70 71 unJar(file, workDir); 72 73 ArrayList<URL> classPath = new ArrayList<URL>(); 74 classPath.add(new File(workDir+"/").toURI().toURL()); 75 classPath.add(file.toURI().toURL()); 76 classPath.add(new File(workDir, "classes/").toURI().toURL()); 77 File[] libs = new File(workDir, "lib").listFiles(); 78 if (libs != null) { 79 for (int i = 0; i < libs.length; i++) { 80 classPath.add(libs[i].toURI().toURL()); 81 } 82 } 83 //通過反射的方式執行任務程式的main方法,並把剩餘的引數作為任務程式main方法的引數 84 ClassLoader loader = 85 new URLClassLoader(classPath.toArray(new URL[0])); 86 87 Thread.currentThread().setContextClassLoader(loader); 88 Class<?> mainClass = Class.forName(mainClassName, true, loader); 89 Method main = mainClass.getMethod("main", new Class[] { 90 Array.newInstance(String.class, 0).getClass() 91 }); 92 String[] newArgs = Arrays.asList(args) 93 .subList(firstArg, args.length).toArray(new String[0]); 94 try { 95 main.invoke(null, new Object[] { newArgs }); 96 } catch (InvocationTargetException e) { 97 throw e.getTargetException(); 98 } 99 }
複製程式碼

環境設定好後就要開始執行任務程式的main方法了
以WordCount為例:

複製程式碼
 1 package org.apache.hadoop.examples;
 2 
 3 import java.io.IOException;
 4 import java.util.StringTokenizer;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.IntWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.util.GenericOptionsParser;
16 
17 public class WordCount {
18 
19   public static class TokenizerMapper 
20        extends Mapper<Object, Text, Text, IntWritable>{
21     
22     private final static IntWritable one = new IntWritable(1);
23     private Text word = new Text();
24       
25     public void map(Object key, Text value, Context context
26                     ) throws IOException, InterruptedException {
27       StringTokenizer itr = new StringTokenizer(value.toString());
28       while (itr.hasMoreTokens()) {
29         word.set(itr.nextToken());
30         context.write(word, one);
31       }
32     }
33   }
34   
35   public static class IntSumReducer 
36        extends Reducer<Text,IntWritable,Text,IntWritable> {
37     private IntWritable result = new IntWritable();
38 
39     public void reduce(Text key, Iterable<IntWritable> values, 
40                        Context context
41                        ) throws IOException, InterruptedException {
42       int sum = 0;
43       for (IntWritable val : values) {
44         sum += val.get();
45       }
46       result.set(sum);
47       context.write(key, result);
48     }
49   }
50 
51   public static void main(String[] args) throws Exception {
52     Configuration conf = new Configuration();
53     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
54     if (otherArgs.length != 2) {
55       System.err.println("Usage: wordcount <in> <out>");
56       System.exit(2);
57     }
58     Job job = new Job(conf, "word count");
59     job.setJarByClass(WordCount.class);
60     job.setMapperClass(TokenizerMapper.class);
61     job.setCombinerClass(IntSumReducer.class);
62     job.setReducerClass(IntSumReducer.class);
63     job.setOutputKeyClass(Text.class);
64     job.setOutputValueClass(IntWritable.class);
65     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
66     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
67     System.exit(job.waitForCompletion(true) ? 0 : 1);
68   }
69 }
複製程式碼

在程式執行入口main方法中

首先定義配置檔案類 Configuration,此類是Hadoop各個模組的公共使用類,用於載入類路徑下的各種配置檔案,讀寫其中的配置選項;

第二步中用到了 GenericOptionsParser 類,其目的是將命令列中的後部分引數自動設定到變數conf中,

如果程式碼提交的時候傳入其他引數,比如指定reduce的個數,可以根據 GenericOptionsParser的命令列格式這麼寫:

bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5,

其規則是 -D 加上MR的配置選項(預設reduce task的個數為1,map的個數也為1);

之後就是 Job 的定義

使用的job類的構造方法為

  public Job(Configuration conf, String jobName) throws IOException {
    this(conf);
    setJobName(jobName);
  }

呼叫了另外一個構造方法,並設定了Job的名字(即WordCount)

  public Job(Configuration conf) throws IOException {
    this(new JobConf(conf));
  }
複製程式碼
  public JobConf(Configuration conf) {
    super(conf);
    
    if (conf instanceof JobConf) {
      JobConf that = (JobConf)conf;
      credentials = that.credentials;
    }
    
    checkAndWarnDeprecation();
  }
複製程式碼

job 已經根據 配置資訊例項化好執行環境了,下面就是加入實體“口食”

依次給job新增Jar包、設定Mapper類、設定合併類、設定Reducer類、設定輸出鍵型別、設定輸出值型別

在setJarByClass中

  public void setJarByClass(Class<?> cls) {
    ensureState(JobState.DEFINE);
    conf.setJarByClass(cls);
  }

它先判斷當前job的狀態是否在執行中,接著通過class找到jar檔案,將jar路徑賦值給mapreduce.jar.jar屬性(尋找jar檔案的方法使通過ClassUtil類中的findContainingJar方法)

job的提交方法是

job.waitForCompletion(true)
複製程式碼
 1   public boolean waitForCompletion(boolean verbose
 2                                    ) throws IOException, InterruptedException,
 3                                             ClassNotFoundException {
 4     if (state == JobState.DEFINE) {
 5       submit();
 6     }
 7     if (verbose) {
 8       monitorAndPrintJob();
 9     } else {
10       // get the completion poll interval from the client.
11       int completionPollIntervalMillis = 
12         Job.getCompletionPollInterval(cluster.getConf());
13       while (!isComplete()) {
14         try {
15           Thread.sleep(completionPollIntervalMillis);
16         } catch (InterruptedException ie) {
17         }
18       }
19     }
20     return isSuccessful();
21   }
複製程式碼

引數 verbose ,如果想在控制檯列印當前的任務執行進度,則設為true

複製程式碼
 1   public void submit() 
 2          throws IOException, InterruptedException, ClassNotFoundException {
 3     ensureState(JobState.DEFINE);
 4     setUseNewAPI();
 5     connect();
 6     final JobSubmitter submitter = 
 7         getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
 8     status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
 9       public JobStatus run() throws IOException, InterruptedException, 
10       ClassNotFoundException {
11         return submitter.submitJobInternal(Job.this, cluster);
12       }
13     });
14     state = JobState.RUNNING;
15     LOG.info("The url to track the job: " + getTrackingURL());
16    }
複製程式碼

在submit 方法中會把Job提交給對應的Cluster,然後不等待Job執行結束就立刻返回

同時會把Job例項的狀態設定為JobState.RUNNING,從而來表示Job正在進行中

然後在Job執行過程中,可以呼叫getJobState()來獲取Job的執行狀態

Submit主要進行如下操作

  • 檢查Job的輸入輸出是各項引數,獲取配置資訊和遠端主機的地址,生成JobID,確定所需工作目錄(也是MRAppMaster.java所在目錄),執行期間設定必要的資訊
  • 拷貝所需要的Jar檔案和配置檔案資訊到HDFS系統上的指定工作目錄,以便各個節點呼叫使用
  • 計算並獲數去輸入分片(Input Split)的數目,以確定map的個數
  • 呼叫YARNRunner類下的submitJob()函式,提交Job,傳出相應的所需引數(例如 JobID等)。
  • 等待submit()執行返回Job執行狀態,最後刪除相應的工作目錄。

在提交前先連結叢集(cluster),通過connect方法

複製程式碼
 1   private synchronized void connect()
 2           throws IOException, InterruptedException, ClassNotFoundException {
 3     if (cluster == null) {
 4       cluster = 
 5         ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
 6                    public Cluster run()
 7                           throws IOException, InterruptedException, 
 8                                  ClassNotFoundException {
 9                      return new Cluster(getConfiguration());
10                    }
11                  });
12     }
13   }
複製程式碼

這是一個執行緒保護方法。這個方法中根據配置資訊初始化了一個Cluster物件,即代表叢集

複製程式碼
 1   public Cluster(Configuration conf) throws IOException {
 2     this(null, conf);
 3   }
 4 
 5   public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
 6       throws IOException {
 7     this.conf = conf;
 8     this.ugi = UserGroupInformation.getCurrentUser();
 9     initialize(jobTrackAddr, conf);
10   }
11   
12   private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
13       throws IOException {
14 
15     synchronized (frameworkLoader) {
16       for (ClientProtocolProvider provider : frameworkLoader) {
17         LOG.debug("Trying ClientProtocolProvider : "
18             + provider.getClass().getName());
19         ClientProtocol clientProtocol = null; 
20         try {
21           if (jobTrackAddr == null) {
          //建立YARNRunner物件
22             clientProtocol = provider.create(conf);
23           } else {
24             clientProtocol = provider.create(jobTrackAddr, conf);
25           }
26           //初始化Cluster內部成員變數
27           if (clientProtocol != null) {
28             clientProtocolProvider = provider;
29             client = clientProtocol;
30             LOG.debug("Picked " + provider.getClass().getName()
31                 + " as the ClientProtocolProvider");
32             break;
33           }
34           else {
35             LOG.debug("Cannot pick " + provider.getClass().getName()
36                 + " as the ClientProtocolProvider - returned null protocol");
37           }
38         } 
39         catch (Exception e) {
40           LOG.info("Failed to use " + provider.getClass().getName()
41               + " due to error: " + e.getMessage());
42         }
43       }
44     }
45 
46     if (null == clientProtocolProvider || null == client) {
47       throw new IOException(
48           "Cannot initialize Cluster. Please check your configuration for "
49               + MRConfig.FRAMEWORK_NAME
50               + " and the correspond server addresses.");
51     }
52   }
複製程式碼

可以看出建立客戶端代理階段使用了java.util.ServiceLoader,在2.3.0版本中包含LocalClientProtocolProvider(本地作業)和YarnClientProtocolProvider(yarn作業)(hadoop有一個Yarn引數mapreduce.framework.name用來控制你選擇的應用框架。在MRv2裡,mapreduce.framework.name有兩個值:local和yarn),此處會根據mapreduce.framework.name的配置建立相應的客戶端

(ServiceLoader是服務載入類,它根據檔案配置來在java classpath環境中載入對應介面的實現類)

這裡在實際生產中一般都是yarn,所以會建立一個YARNRunner物件(客戶端代理類)類進行任務的提交

例項化Cluster後開始真正的任務提交

submitter.submitJobInternal(Job.this, cluster)
複製程式碼
  1 JobStatus submitJobInternal(Job job, Cluster cluster)   
  2 throws ClassNotFoundException, InterruptedException, IOException {  
  3   
  4   
  5   //檢測輸出目錄合法性,是否已存在,或未設定  
  6   checkSpecs(job);  
  7   
  8   
  9   Configuration conf = job.getConfiguration();  
 10   addMRFrameworkToDistributedCache(conf);  
 11     //獲得登入區,用以存放作業執行過程中用到的檔案,預設位置/tmp/hadoop-yarn/staging/root/.staging ,可通過yarn.app.mapreduce.am.staging-dir修改  
 12   Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);  
 13   //主機名和地址設定  
 14   InetAddress ip = InetAddress.getLocalHost();  
 15   if (ip != null) {  
 16     submitHostAddress = ip.getHostAddress();  
 17     submitHostName = ip.getHostName();  
 18     conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);  
 19     conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);  
 20   }  
 21   //獲取新的JobID,此處需要RPC呼叫  
 22   JobID jobId = submitClient.getNewJobID();  
 23   job.setJobID(jobId);  
 24   //獲取提交目錄:/tmp/hadoop-yarn/staging/root/.staging/job_1395778831382_0002  
 25   Path submitJobDir = new Path(jobStagingArea, jobId.toString());  
 26   JobStatus status = null;  
 27   try {  
 28     conf.set(MRJobConfig.USER_NAME,  
 29         UserGroupInformation.getCurrentUser().getShortUserName());  
 30     conf.set("hadoop.http.filter.initializers",   
 31         "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");  
 32     conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());  
 33     LOG.debug("Configuring job " + jobId + " with " + submitJobDir   
 34         + " as the submit dir");  
 35     // get delegation token for the dir  
 36     TokenCache.obtainTokensForNamenodes(job.getCredentials(),  
 37         new Path[] { submitJobDir }, conf);  
 38       
 39     populateTokenCache(conf, job.getCredentials());  
 40   
 41   
 42     // generate a secret to authenticate shuffle transfers  
 43     if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {  
 44       KeyGenerator keyGen;  
 45       try {  
 46         keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);  
 47         keyGen.init(SHUFFLE_KEY_LENGTH);  
 48       } catch (NoSuchAlgorithmException e) {  
 49         throw new IOException("Error generating shuffle secret key", e);  
 50       }  
 51       SecretKey shuffleKey = keyGen.generateKey();  
 52       TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),  
 53           job.getCredentials());  
 54     }  
 55     //向叢集中拷貝所需檔案,下面會單獨分析(1)  
 56     copyAndConfigureFiles(job, submitJobDir);  
 57     Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);  
 58       
 59     // 寫分片檔案job.split job.splitmetainfo,具體寫入過程與MR1相同,可參考以前文章  
 60     LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));  
 61     int maps = writeSplits(job, submitJobDir);  
 62     conf.setInt(MRJobConfig.NUM_MAPS, maps);  
 63     LOG.info("number of splits:" + maps);  
 64   
 65   
 66     // write "queue admins of the queue to which job is being submitted"  
 67     // to job file.  
 68     //設定佇列名  
 69     String queue = conf.get(MRJobConfig.QUEUE_NAME,  
 70         JobConf.DEFAULT_QUEUE_NAME);  
 71     AccessControlList acl = submitClient.getQueueAdmins(queue);  
 72     conf.set(toFullPropertyName(queue,  
 73         QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());  
 74   
 75   
 76     // removing jobtoken referrals before copying the jobconf to HDFS  
 77     // as the tasks don't need this setting, actually they may break  
 78     // because of it if present as the referral will point to a  
 79     // different job.  
 80     TokenCache.cleanUpTokenReferral(conf);  
 81   
 82   
 83     if (conf.getBoolean(  
 84         MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,  
 85         MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {  
 86       // Add HDFS tracking ids  
 87       ArrayList<String> trackingIds = new ArrayList<String>();  
 88       for (Token<? extends TokenIdentifier> t :  
 89           job.getCredentials().getAllTokens()) {  
 90         trackingIds.add(t.decodeIdentifier().getTrackingId());  
 91       }  
 92       conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,  
 93           trackingIds.toArray(new String[trackingIds.size()]));  
 94     }  
 95   
 96   
 97     // Write job file to submit dir  
 98     //寫入job.xml  
 99     writeConf(conf, submitJobFile);  
100       
101     //  
102     // Now, actually submit the job (using the submit name)  
103     //這裡才開始真正提交,見下面分析(2)  
104     printTokens(jobId, job.getCredentials());  
105     status = submitClient.submitJob(  
106         jobId, submitJobDir.toString(), job.getCredentials());  
107     if (status != null) {  
108       return status;  
109     } else {  
110       throw new IOException("Could not launch job");  
111     }  
112   } finally {  
113     if (status == null) {  
114       LOG.info("Cleaning up the staging area " + submitJobDir);  
115       if (jtFs != null && submitJobDir != null)  
116         jtFs.delete(submitJobDir, true);  
117   
118   
119     }  
120   }  
121 }  
複製程式碼

洋洋灑灑一百餘行
(這個可謂任務提交的核心部分,前面的都是鋪墊)

Step1: 檢查job的輸出路徑是否存在,如果存在則丟擲異常。 Step2: 初始化用於存放Job相關資源的路徑。 Step3: 設定客戶端的host屬性:mapreduce.job.submithostname和mapreduce.job.submithostaddressStep4: 通過RPC,向Yarn的ResourceManager申請JobID物件。 Step5: 從HDFS的NameNode獲取驗證用的Token,並將其放入快取。 Step6: 將作業檔案上傳到HDFS,這裡如果我們前面沒有對Job命名的話,預設的名稱就會在這裡設定成jar的名字。並且,作業預設的副本數是10,如果屬性mapreduce.client.submit.file.replication沒有被設定的話。 Step7: 檔案上傳到HDFS之後,還要被DistributedCache進行快取起來。這是因為計算節點收到該作業的第一個任務後,就會有DistributedCache自動將作業檔案Cache到節點本地目錄下,並且會對壓縮檔案進行解壓,如:.zip,.jar,.tar等等,然後開始任務。 最後,對於同一個計算節點接下來收到的任務,DistributedCache不會重複去下載作業檔案,而是直接執行任務。如果一個作業的任務數很多,這種設計避免了在同一個節點上對用一個job的檔案會下載多次,大大提高了任務執行的效率。 Step8: Step9: 將split資訊和SplitMetaInfo都寫入HDFS中 Step10: 對Map數目設定,上面獲得到的split的個數就是實際的Map任務的數目。 Step11: 相關配置寫入到job.xml中 Step12: (可以看出目標檔案的切分上傳、任務ID的申請、合法性檢查、map數量的計算等等都是在提交到yarn之前搞定的,yarn只管根據任務申請資源並排程執行) 通過如下程式碼正式提交Job到Yarn:
status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());

這裡就涉及到YarnClient和RresourceManager的RPC通訊了。包括獲取applicationId、進行狀態檢查、網路通訊等

這裡的submitClient其實就是 YARNRunner的實體類了;

Step13: 上面通過RPC的呼叫,最後會返回一個JobStatus物件,它的toString方法可以在JobClient端列印執行的相關日誌資訊。 (到這裡任務都給yarn了,這裡就只剩下監控(如果設定為true的話))
monitorAndPrintJob();

這只是粗略的job提交,詳細的還有從在yarn上的RPC通訊、在datanode上從檔案的輸入到map的執行、經過shuffle過程、reduce的執行最後結果的寫檔案

MR任務的提交大多是任務環境的初始化過程,任務的執行則大多涉及到任務的排程

<