1. 程式人生 > >基於Oozie實現MapReduce作業的自動提交功能

基於Oozie實現MapReduce作業的自動提交功能

Oozie是一個Hadoop工作流管理系統。OozieClient RestFul API官方參考如下。它提供了JAVA API 和 RESTFul API 兩種形式使用Oozie客戶端可以向Oozie服務端提交workflow。

workflow即工作流,在Oozie中使用workflow來配置各種型別的動作節點,如MapReduce型別的action,這些action就是完成具體功能的程式。而workflow負責組織各個動作節點,形成有向無環圖。

對於使用者而言,傳統的MapReduce程式的提交執行過程如下:

a) 將編寫好的模型打包成jar檔案並上傳到Hadoop叢集的HDFS上;

b) 根據具體的作業配置好程式的輸入目錄和輸出目錄;

c) 使用hadoop jar 命令提交計算作業。

採用Oozie之後,可以藉助Oozie來負責命令的提交。下面介紹使用Oozie RESTFul API 提交作業。

使用Oozie提交作業的流程如下:

a) 使用者先提前將作業jar包以及相應的workflow.xml配置檔案上傳到HDFS上的合適目錄中

b)執行ooziejob -oozie ..... -run 命令或者通過程式設計實現Oozie RestFul API提交作業。

Oozie的提交原理簡介

當Oozie Server接收到使用者的提交請求後,Oozie Server解析本次作業的workflow.xml檔案,執行工作流,並啟動一個特殊的mapreduce作業(沒有reducer),由這個特殊的mpareduce作業負責真正地提交Hadoop作業執行(如MapReduce的wordcount)。正是由於Oozie Server通過啟動一個特殊的MR作業來向Hadoop叢集提交作業,事先是不知道這個特殊的MapReduce作業執行在哪個機器上的,因此這也是為什麼需要提前將寫好的程式jar檔案以及配置檔案workflow.xml先上傳到HDFS上的原因。

作業提交的生產者--消費者模型

當有許多作業進行提交時,可以使用生產者--消費者模型來管理這些作業。

本生產者--消費者的設計思路如下:

當向Oozie Server提交一個作業之後,會返回一個jobId,生產者負責將jobId新增到連結串列中,交將作業的一些基本的靜態資訊儲存到資料庫中;而消費者則從連結串列中取出jobId,並根據此jobId來查詢本次作業的執行時間、執行狀態等資訊。以此為基礎,可以統計出所有向Oozie Server提交的作業。

提交系統提供了兩種方式URI給使用者進行作業提交:一種是Servelet實現,一種是WebService實現。

使用者只需要通過傳送HTTP請求,攜帶使用者名稱和AppPath(提前配置好的上傳到HDFS中的作業lib資料夾所在目錄)兩個引數,就可以完成作業的提交。使用者名稱可以進行一定的許可權驗證,而AppPath則是告訴Oozie Server待執行的作業在哪裡,Oozie Server根據AppPath找到作業,生成工作流,然後將作業提交給Hadoop計算,並監控作業直到完成。

作業提交的部分程式碼如下:

String jsonJobId = oozieClient.submitJob(userName, appPath);//submit job to oozie server
jobId = Parse.getJobId(jsonJobId);//get job id
System.out.println("job " + jobId + " submited at " + new Date());
		
//start producer thread
new Thread(new AddJob(this)).start();
//start consumer thread
new Thread(new JobStatistic(oozieClient)).start(); 

由於每一個作業都有一個唯一的作業ID,故生產者執行緒先將本次作業提交相關資訊儲存到資料庫中,然後再將作業ID新增到生產者-消費者佇列中(一個LinkedList)。生產者的部分程式碼如下:
String jobId = schedulerAction.getJobId();
String userName = schedulerAction.getUserName();
String appPath = schedulerAction.getAppPath();
long inputSize = CommonUtil.getFileSizeWithSplitPath(appPath, userName);
try{
	db.insertJobProperty(userName, appPath, inputSize, jobId);
	System.out.println("insert into mysql success");
}catch(SQLException e){
		e.printStackTrace();
		System.out.println("SQL Exeception");
	}
	//after producer insert job property into mysql, add jobId to LinkedList.so consumer always update successful.
	addId(jobId);//as producer thread add jobId in LinkedList.
其中inputSize是本次作業執行時所處理的輸入大小。這可以通過WebHDFS訪問輸入目錄獲得。

消費者負責從LinkedList中取出jobId,並根據此jobId向Oozie Server傳送HTTP請求查詢當前作業的執行狀態以及作業執行完成後所花的時間。消費者部分程式碼如下:

String jobId = null;
	synchronized (jobIds) {
		try {
			if (jobIds.size() == 0)
				jobIds.wait();//LinkedList is empty
			jobId = jobIds.removeLast();
			System.out.println("remove a job from LinkedList");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}// end sync

try {
		db.updateRunTime(runTime, jobId);
		System.out.println("update job run Time success");
	} catch (SQLException e) {
		e.printStackTrace();
	}

Client向Oozie Server傳送HTTP請求通過 Apache HTTP Client 包來實現。

需要改進的地方:

1,每來一個提交請求時,就會啟動二個執行緒,一個是生產者執行緒另一個是消費者執行緒。而對於消費者執行緒而言,可以預先啟動起來,作為後臺執行緒一直在執行,負責查詢作業的執行情況。而不是一次請求就啟動一個消費者執行緒。

2,一個提交請求會觸發兩次寫資料庫。一次發生在生產者將作業提交的相關資訊寫入資料庫,一次發生在消費者將作業執行相關資訊寫入資料庫。可以先將這些資訊快取在記憶體中,當提交的請求次數達到一定程度時,再統一寫入資料庫。從而減少訪問資料庫的次數。