Yarn原始碼分析之MRAppMaster:作業執行方式Local、Uber、Non-Uber
基於作業大小因素,MRAppMaster提供了三種作業執行方式:本地Local模式、Uber模式、Non-Uber模式。其中,
1、本地Local模式:通常用於除錯;
2、Uber模式:為降低小作業延遲而設計的一種模式,所有任務,不管是Map Task,還是Reduce Task,均在同一個Container中順序執行,這個Container其實也是MRAppMaster所在Container;
3、Non-Uber模式:對於執行時間較長的大作業,先為Map Task申請資源,當Map Task執行完成數目達到一定比例後再為Reduce Task申請資源。
在Yarn中,作業執行的資源,統一被抽象為容器Container,在MRAppMaster中關於作業執行時需要的資源的分配與載入程式碼中,容器分配申請服務、容器分配完成後載入服務中,都有關於Uber模式和Non-Uber模式的處理,如下:
1、容器分配申請路由服務
容器分配申請路由服務ContainerAllocatorRouter繼承自AbstractService,是Hadoop中一個典型的服務,其正常提供服務需要經歷初始化init、啟動start等過程,而在服務啟動的serviceStart()方法中,存在以下關於Uber模式和Non-Uber模式的處理:
// 如果Job在Uber模式下執行,啟動構造容器分配器LocalContainerAllocator例項 if (job.isUber()) { MRApps.setupDistributedCacheLocal(getConfig()); this.containerAllocator = new LocalContainerAllocator( this.clientService, this.context, nmHost, nmPort, nmHttpPort , containerID); } else { // 否則構造RM容器分配器RMContainerAllocator例項 this.containerAllocator = new RMContainerAllocator( this.clientService, this.context); }
可見,如果Job在Uber模式下執行,啟動構造容器分配器LocalContainerAllocator例項,否則構造RM容器分配器RMContainerAllocator例項。而LocalContainerAllocator代表的是本地容器分配器,其構造過程中傳入的containerID為MRAppMaster的成員變數containerID,什麼意思呢?不就正好說明LocalContainerAllocator所使用的容器,也就是Uber模式下所使用的容器,就是MRAppMaster所在Container,與上面所介紹的Uber模式正好一致,而Non-Uber模式下則需要使用Yarn的RMContainerAllocator,通過與ResourceManager進行通訊來申請容器的分配,總的原則就是:先為Map Task申請資源,當Map Task執行完成數目達到一定比例後再為Reduce Task申請資源。
2、容器載入路由服務
容器載入路由服務ContainerLauncherRouter同樣繼承自AbstractService,也是Hadoop中一個典型的服務,我們同樣看下服務啟動serviceStart()方法,如下:
// 如果Job在Uber模式下執行,啟動構造本地容器載入器LocalContainerLauncher例項
if (job.isUber()) {
this.containerLauncher = new LocalContainerLauncher(context,
(TaskUmbilicalProtocol) taskAttemptListener);
} else {
// 否則,構造容器載入器ContainerLauncherImpl例項
this.containerLauncher = new ContainerLauncherImpl(context);
}
也是針對Uber模式和Non-Uber模式分別處理,如果Job在Uber模式下執行,啟動構造本地容器載入器LocalContainerLauncher例項;否則,構造容器載入器ContainerLauncherImpl例項。另外,由於Uber模式下不管是Map Task,還是Reduce Task,均在同一個Container中順序執行,所以MapReduce的推測執行機制對於Uber模式是不適用的,故在MRAppMaster服務啟動的serviceStart()方法中,對於Uber模式,會禁用推測執行機制,相關程式碼如下:
if (job.isUber()) {
// Uber模式下禁用推測執行機制,即Disable Speculation
speculatorEventDispatcher.disableSpeculation();
LOG.info("MRAppMaster uberizing job " + job.getID()
+ " in local container (\"uber-AM\") on node "
+ nmHost + ":" + nmPort + ".");
} else {
// send init to speculator only for non-uber jobs.
// This won't yet start as dispatcher isn't started yet.
// Non-Uber模式下發送SpeculatorEvent事件,初始化speculator
dispatcher.getEventHandler().handle(
new SpeculatorEvent(job.getID(), clock.getTime()));
LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ "job " + job.getID() + ".");
}
可以看到,Uber模式下禁用推測執行機制,即Disable Speculation,Non-Uber模式下發送SpeculatorEvent事件,初始化speculator,因此,對於Uber模式,會禁用推測執行機制。在作業的抽象實現JobImpl中,會針對Uber模式進行一些特定引數設定,如下:
if (isUber) {
LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
+ numReduceTasks + "r tasks (" + dataInputLength
+ " input bytes) will run sequentially on single node.");
// make sure reduces are scheduled only after all map are completed
// mapreduce.job.reduce.slowstart.completedmaps引數設定為1,
// 即全部Map任務完成後才會為Reduce任務分配資源
conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
1.0f);
// uber-subtask attempts all get launched on same node; if one fails,
// probably should retry elsewhere, i.e., move entire uber-AM: ergo,
// limit attempts to 1 (or at most 2? probably not...)
// 引數mapreduce.map.maxattempts、mapreduce.reduce.maxattempts設定為1,即Map、Reduce任務的最大嘗試次數均為1
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
// disable speculation
// 引數mapreduce.map.speculative、mapreduce.reduce.speculative設定為false,即禁用Map、Reduce任務的推測執行機制
conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
}
主要包括:1、mapreduce.job.reduce.slowstart.completedmaps引數設定為1,即全部Map任務完成後才會為Reduce任務分配資源;
2、引數mapreduce.map.maxattempts、mapreduce.reduce.maxattempts設定為1,即Map、Reduce任務的最大嘗試次數均為1;
3、引數mapreduce.map.speculative、mapreduce.reduce.speculative設定為false,即禁用Map、Reduce任務的推測執行機制;