1. 程式人生 > >MapReduce框架學習(3)——Job的建立及配置

MapReduce框架學習(3)——Job的建立及配置

參考: JeffreyZhou的部落格園
《Hadoop權威指南》第四版

0

一個MR作業,包括三點:

  • 輸入資料
  • MR程式
  • Job配置資訊

前面兩篇學習了資料格式和MR過程(map函式和reduce函式),那麼今天再講一下配置資訊,是怎麼把 資料 和 程式結合起來的。

3.1 程式碼

Job物件指定作業執行規範,我們可以用它來控制整個作業的執行。打算對照程式碼來講解,這樣可能要有邏輯一點,不至於講到最後都不知道自己的線索在哪。先貼上WordCount中的Job任務程式碼:

public static void main(String[] args) throws Exception {
	Configuration conf = new Configuration();   // 配置檔案
	// System.out.println("url:" + conf.get("fs.default.name"));  // deprecated
	// System.out.println("url:" + conf.get("fs.defaultFS"));
	
// 獲取一個作業
	// Job job = new Job(conf, "word count");  // deprecated
Job job = Job.getInstance(conf,"wordcount"); // 用job的靜態方法 // 設定job所用的那些類(class)檔案在哪個jar包 job.setJarByClass(WordCount.class); // 設定所用的map reduce類 job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); // 對每個節點的map輸出進行combine job.setReducerClass(IntSumReducer.class); // 對所有節點的輸出進行reduce
// 設定資料輸出型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定要處理的輸入、輸出路徑, //此處輸入/出為固定檔案目錄 FileInputFormat.addInputPath(job, "input"); FileOutputFormat.setOutputPath(job, "output"); //此處為引數 // FileInputFormat.addInputPath(job, new Path(otherArgs[0])); // FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 將job提交給叢集執行 System.exit(job.waitForCompletion(true) ? 0 : 1);

那麼,現在對照上面的程式碼來分析作業提交到執行的整個過程吧。

3.2 建立Job

任務建立比較容易,其實就是new一個例項,先建立一個配置檔案的物件,然後將配置檔案,以及作業名稱作為引數,構造一個Job物件就行了

Configuration conf = new Configuration();   // 配置檔案
Job job = Job.getInstance(conf,"wordcount");  // 用job的靜態方法

3.3 打包作業

我們在Hadoop叢集上執行這個作業時,要把程式碼打包成一個JAR檔案(Hadoop在叢集上釋出這個檔案),關於打包JAR,大概意思就是把程式執行所需要的包啊類啊啥的,全部形成一個壓縮包,但這個工作不用我們自己去一個個找,只要在Job物件的setJarByClass()方法中傳遞一個類即可,Hadoop會利用這個類來查詢包含它的JAR檔案,進而找到相關的JAR檔案。

// 將job所用的那些類(class)檔案,打成jar包
job.setJarByClass(WordCount.class);

3.4 設定各個環節的函式

這個很好理解,上一篇博文分析了MR過程中的各個環節,這些環節都是可以自定義的,就是在Job裡面設定,將自定義的函式和具體作業聯絡起來。

job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);  // 對每個節點的map輸出進行combine
job.setPartitionerClass(MyPartitioner.class); // 對每個節點的map輸出進行partition
job.setReducerClass(IntSumReducer.class);  // 對所有節點的輸出進行reduce

3.5 設定輸入輸出資料型別

job.setInputFormatClass(MyInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

3.6 設定輸入輸出檔案目錄

在設定輸入輸出檔案目錄時,可以選擇使用絕對目錄,就是直接在語句中寫入目錄;也可以使用引數輸入,即在執行程式時,再在控制檯輸入目錄。

// 指定要處理的輸入、輸出路徑,
	//此處輸入/出為固定檔案目錄
	FileInputFormat.addInputPath(job, "input");
	FileOutputFormat.setOutputPath(job, "output");
	//此處為引數
	// FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	// FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

3.7 提交併執行作業

單個任務的執行

可以直接使用語句:

job.waitForCompletion(true)

waitForCompletion()方法提交作業並等待執行完成,該方法唯一的引數是一個標識,指示是否已生成詳細輸出,當標識為true(成功)時,作業會把其進度寫到控制檯。
–《Hadoop權威指南》第四版,28頁

多個任務執行

多個任務的話,就有多種組織形式,例如序列、並行、無關、組合。如下圖:

在這裡插入圖片描述
圖中,Job2和Job3將會等Job1執行完了再執行,且可以同時開始,而Job4必須等Job2和Job3同時結束後才結束。

這個組合,就可以採用這樣的程式碼來實現:

Configuration conf = new Configuration();
Job job1 = new Job(conf, "job1");   //.. config Job1
Job job2 = new Job(conf, "job2");   //.. config Job2
Job job3 = new Job(conf, "job3");	//.. config Job3
Job job4 = new Job(conf, "job4");	//.. config Job4
 
//新增依賴關係
job2.addDependingJob(job1);
job3.addDependingJob(job1);
job4.addDependingJob(job2);
job4.addDependingJob(job3);
 
JobControl jc = new JobControl("jbo name");
jc.addJob(job1);
jc.addJob(job2);
jc.addJob(job3);
jc.addJob(job4);

jc.run();

3.x 後記

這裡講的其實偏向於作業的程式設計方面,但是在程式中把這些都設定好了,提交給集群后,又是怎樣的執行機制呢?這個就是關於application mastermap task等方面的分析了。