大資料 : Hadoop reduce階段
Mapreduce中由於sort的存在,MapTask和ReduceTask直接是工作流的架構。而不是資料流的架構。在MapTask尚未結束,其輸出結果尚未排序及合併前,ReduceTask是又有資料輸入的,因此即使ReduceTask已經建立也只能睡眠等待MapTask完成。從而可以從MapTask節點獲取資料。一個MapTask最終的資料輸出是一個合併的spill檔案,可以通過Web地址訪問。所以reduceTask一般在MapTask快要完成的時候才啟動。啟動早了浪費container資源。
ReduceTask是個執行緒,這個執行緒執行在YarnChild的Java虛擬機器上,我們從ReduceTask.run開始看Reduce階段。 獲取更多大資料視訊資料請加QQ群:947967114
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
if (isMapOrReduce()) {
/*新增reduce過程需要經過的幾個階段。以便通知TaskTracker目前運 行的情況*/
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
TaskReporter reporter = startReporter(umbilical);
// 設定並啟動reporter程序以便和TaskTracker進行交流
boolean useNewApi = job.getUseNewReducer();
//在job client中初始化job時,預設就是用新的API,詳見Job.setUseNewAPI()方法
initialize(job, getJobID(), reporter, useNewApi);
/*用來初始化任務,主要是進行一些和任務輸出相關的設定,比如建立commiter,設定工作目錄等*/
// check if it is a cleanupJobTask
/*以下4個if語句均是根據任務型別的不同進行相應的操作,這些方 法均是Task類的方法,所以與任務是MapTask還是ReduceTask無關*/
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;//只是為了JobCleanup,做完就停
}
if () {
runJobSetupTask(umbilical, reporter);
return;
//主要是建立工作目錄的FileSystem物件
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
//設定任務目前所處的階段為結束階段,並且刪除工作目錄
}
下面才是真正要成為reducer
// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
//如果需要就建立combineCollector
Classextends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
//配置檔案找mapreduce.job.reduce.shuffle.consumer.plugin.class預設是shuffle.class
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
//建立shuffle類物件
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
//建立context物件,ShuffleConsumerPlugin.Context
shuffleConsumerPlugin.init(shuffleContext);
//這裡呼叫的起始是shuffle的init函式,重點摘要如下。
this.localMapFiles = context.getLocalMapFiles();
scheduler = new ShuffleSchedulerImpl(jobConf, taskStatus, reduceId,
this, copyPhase, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
//建立shuffle所需的排程器
merger = createMergeManager(context);
//建立shuffle內部的merge,createMergeManager裡面原始碼:
return new MergeManagerImpl(reduceId, jobConf, context.getLocalFS(),
context.getLocalDirAllocator(), reporter, context.getCodec(),
context.getCombinerClass(), context.getCombineCollector(),
context.getSpilledRecordsCounter(),
context.getReduceCombineInputCounter(),
context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
context.getMapOutputFile());
//建立MergeMnagerImpl物件和Merge執行緒
rIter = shuffleConsumerPlugin.run();
//從各個Mapper複製其輸出檔案,並加以合併排序,等待直到完成為止
// free up the data structures
mapOutputFilesOnDisk.clear();
sortPhase.complete();
//排序階段完成
setPhase(TaskStatus.Phase.REDUCE);
//進入reduce階段
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();
//3.Reduce 1.Reduce任務的最後一個階段。它會準備好Map的 keyClass("mapred.output.key.class""mapred.mapoutput.key.class"),valueClass("mapred.mapoutput.value.class"或"mapred.output.value.class")和 Comparator (“mapred.output.value.groupfn.class”或“mapred.output.key.comparator.class”)
if (useNewApi) {
//2.根據引數useNewAPI判斷執行runNewReduce還是runOldReduce。分析潤runNewReduce
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
//0.像報告程序書寫一些資訊,1.獲得一個TaskAttemptContext物件。通過這個物件建立reduce、output及用於跟蹤的統計output的RecordWrit、最後建立用於收集reduce結果的Context,2.reducer.run(reducerContext)開始執行reduce
} else {//老API
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
(1)reduce分為三個階段(copy就是遠端拷貝Map的輸出資料、sort就是對所有的資料做排序、reduce做聚集就是我們自己寫的reducer),為這三個階段分別設定Progress,用來和TaskTracker通訊報道狀態。
(2)上面程式碼的15-40行和MapReduce的MapTask任務的執行原始碼級分析中對應部分基本相同,可參考之;
(3)codec = initCodec()這句是檢查map的輸出是否是壓縮的,壓縮的則返回壓縮codec例項,否則返回null,這裡討論不壓縮的;
(4)我們討論完全分散式的hadoop,即isLocal==false,然後構造一個ReduceCopier物件reduceCopier,並呼叫reduceCopier.fetchOutputs()方法拷貝各個Mapper的輸出,到本地;
(5)然後copy階段完成,設定接下來的階段是sort階段,更新狀態資訊;
(6)根據isLocal來選擇KV迭代器,完全分散式的會使用reduceCopier.createKVIterator(job, rfs, reporter)作為KV迭代器;
(7)sort階段完成,設定接下來的階段是reduce階段,更新狀態資訊;
(8)然後獲取一些配置資訊,並根據是否使用新API選擇不同的處理方式,這裡是新的API,呼叫runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass)會執行reducer;
(9)done(umbilical, reporter)這個方法用於做結束任務的一些清理工作:更新計數器updateCounters();如果任務需要提交,設定Taks狀態為COMMIT_PENDING,並利用TaskUmbilicalProtocol,彙報Task完成,等待提交,然後呼叫commit提交任務;設定任務結束標誌位;結束Reporter通訊執行緒;傳送最後一次統計報告(通過sendLastUpdate方法);利用TaskUmbilicalProtocol報告結束狀態(通過sendDone方法)。
有些人將Reduce Task分為了5個階段:一、shuffle階段:也稱為Copy階段,就是從各個MapTask上遠端拷貝一片資料,如果大小超過一定閾值就寫到磁碟,否則放入記憶體;二、Merge階段:在遠端拷貝資料的同時,Reduce Task啟動了兩個後臺執行緒對記憶體和磁碟上的檔案進行合併,防止記憶體使用過多和磁碟檔案過多;三、sort階段:使用者編寫的reduce方法的輸入資料是按key進行聚集的,需要對copy過來的資料排序,這裡用的是歸併排序,因為Map Task的結果是有序的;四、Reduce階段:將每組資料依次交給使用者編寫的Reduce方法處理;五、write階段:就是將結果寫入HDFS。
上面的5個階段分的比較細了,程式碼裡分為3個階段copy、sort、reduce,我們在eclipse執行MR程式時,控制檯看到的reduce階段的百分比就分為3個階段各佔33.3%。
這裡的shuffleConsumerPlugin是實現了ShuffleConsumerPlugin的某個類物件。具體可以通過配置檔案mapreduce.job.reduce.shuffle.consumer.plugin.class選項設定,預設情況下是使用shuffle。我們在程式碼中分析過完成shuffleConsumerPlugin.run,通常是shuffle.run,因為有了這個過程Mapper的合成的spill檔案才能通過HTTP協議傳輸到Reducer端。有了資料才能進行runNewReducer或者runOldReducer。可以說shuffle物件就是MapTask的搬運工。而且shuffle的搬運方式不是一遍搬運一遍Reducer處理,而是要把MapTask所有的資料都搬運過來,並且進行合併排序之後才開始提供給對應的Reducer。
一般而言,MapTask和ReduceTask是多對多的關係,假如有M個Mapper有N個Reducer。我們知道N個Reducer對應著N個partition,所以每個Mapper都會被劃分成N個Partition,每個Reducer承擔著一個Partition部分的操作。這樣每一個Reducer從每個不同的Mapper內拿來屬於自己的那部分資料,這樣每個Reducer就有M份不同Mapper的資料,把M份資料合併在一起就是一個最終完整的Partition,有必要還會進行排序,這時候才成為了Reducer的具體輸入資料。這個資料搬運和重組的過程被叫做shuffle過程。shuffle這個過程開銷頗大,會佔用較大的網路流量,因為涉及到大量資料的傳輸,shuffle過程也會有延遲,因為M個Mapper的計算有快有慢,但是shuffle要所有的Mapper完成才能開始,Reduce又必須等shuffle完成才能開始,當然這種延遲不是shuffle造成的,如果Reducer不需要全部Partition資料到位並排序,就不用與最慢的Mapper同步,這是排序付出的代價。
所以shuffle在MapReduce框架中起著非常重要的作用。我們先看shuffle的摘要: 獲取更多大資料視訊資料請加QQ群:947967114
public class Shuffle implements ShuffleConsumerPlugin, ExceptionReporter
private ShuffleConsumerPlugin.Context context;
private TaskAttemptID reduceId;
private JobConf jobConf;
private TaskUmbilicalProtocol umbilical;
private ShuffleSchedulerImpl scheduler;
private MergeManager merger;
private Task reduceTask; //Used for status updates
private Map localMapFiles;
public void init(ShuffleConsumerPlugin.Context context)
public RawKeyValueIterator run() throws IOException, InterruptedException
在ReduceTask.run中看到呼叫了shuffle.init,在run理建立了ShuffleSchedulerImpl和MergeManagerImpl物件。後面會講解就是是做什麼用的。
之後就是對shuffle.run的呼叫,shuffle雖然有一個run但是並非是一個執行緒,只是用了這個名字而已。
我們看:ReduceTask.run->Shuffle.run
public RawKeyValueIterator run() throws IOException, InterruptedException {
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
// Start the map-completion events fetcher thread
final EventFetcher eventFetcher =
new EventFetcher(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
//通過檢視EventFetcher我們看到他繼承了Thread,所以他是一個執行緒
// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher[] fetchers = new Fetcher[numFetchers];
//建立了一個執行緒池
if (isLocal) {
//如果Mapper和Reducer在同一臺機器上,就在本地fetche
fetchers[0] = new LocalFetcher(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
//LocalFetcher是對Fetcher的擴充套件,也是執行緒。
fetchers[0].start();//本地Fecher只有一個
} else {
//Mapper集合Reducer不在同一個機器上,需要跨多個節點Fecher
for (int i=0; i < numFetchers; ++i) {
//啟動所有的Fecher
fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
//建立Fecher執行緒
fetchers[i].start();
//跨節點的Fecher需要好多個,都需要開啟
}
}
// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
//等待所有的Fecher都完成,如果有超時情況就報告進度
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}
// Stop the event-fetcher thread
eventFetcher.shutDown();
//關閉eventFetcher,代表shuffle操作完成,所有的MapTask的資料都拷貝過來了
// Stop the map-output fetcher threads
for (Fetcher fetcher : fetchers) {
fetcher.shutDown();//關閉所有的fetcher。
}
// stop the scheduler
scheduler.close();
//也不需要shuffle的排程,所以關閉
copyPhase.complete(); // copy is already complete
//檔案複製階段結束
以下就是Reduce階段的MergeSort了
taskStatus.setPhase(TaskStatus.Phase.SORT);
//完成排序
reduceTask.statusUpdate(umbilical);
//通過umbilical向MRAppMaster彙報,更新狀態
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
//合併和排序,完成後返回一個佇列kvIter 。
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
// Sanity check
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
return kvIter;
}
資料從MapTask轉移到ReduceTask就兩種方式,一MapTask送,二ReduceTask取,hadoop採用的是第二種方式,就是檔案的複製。在Shuffle進入run之前,RduceTask.run呼叫過他的init函式shuffleConsumerPlugin.init(shuffleContext),在init裡建立了scheduler和用於合併排序的merge,進入run後又建立了EventFetcher執行緒和若干個Fetcher執行緒。Fetcher的作用就是拿取,向MapTask節點提取資料。但是我們要清楚EventFetcher雖然也是Fetcher,但是提取的是event,不是資料本身。我們可以認為它只是對Fetcher過程的一個事件的控制。
Fetcher執行緒的數量也不一定,Uber模式下,MapTask和ReduceTask在同一個節點上,並且只有一個MapTask,所以只有一個Fetcher就能夠完成,而且這個Fetcher是localFetcher。如果不是Uber模式可能會有很多MapTask並且一般和ReduceTask不在同一個節點。這時Fetcher的數量可以進行配置,預設有5個。陣列fetchers就相當於Fetcher的執行緒池。
建立了EventFetcher和Fetcher執行緒池後,進入了while迴圈,但是while迴圈什麼都不做,一直等待,所以實際的操作都是線上程完成的,也就是通過EventFetcher和若干的Fetcher完成。EventFetcher起到了非常關鍵的樞紐的作用。
我們檢視EventFetcher的原始碼摘要,我們提取關鍵的東西:
class EventFetcher extends Thread {
private final TaskAttemptID reduce;
private final TaskUmbilicalProtocol umbilical;
private final ShuffleScheduler scheduler;
private final int maxEventsToFetch;
public void run() {
int failures = 0;
LOG.info(reduce + " Thread started: " + getName());
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {//執行緒沒有被打斷
try {
int numNewMaps = getMapCompletionEvents();
//獲取Map的完成的事件,接著我們看getMapCompletionEvents原始碼:
protected int getMapCompletionEvents()
throws IOException, InterruptedException {
int numNewMaps = 0;
TaskCompletionEvent events[] = null;
do {
MapTaskCompletionEventsUpdate update =
umbilical.getMapCompletionEvents(
(org.apache.hadoop.mapred.JobID)reduce.getJobID(),
fromEventIdx,
maxEventsToFetch,
(org.apache.hadoop.mapred.TaskAttemptID)reduce);
//彙報umbilical從MRAppMaster獲取Map完成的時間的報告
events = update.getMapTaskCompletionEvents();
//獲取有關具體的MapTask結束執行的情況
LOG.debug("Got " + events.length + " map completion events from " +
fromEventIdx);
assert !update.shouldReset() : "Unexpected legacy state";
//做了一個斷言 獲取更多大資料視訊資料請加QQ群:947967114
// Update the last seen event ID
fromEventIdx += events.length;
// Process the TaskCompletionEvents:
// 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
// 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
// fetching from those maps.
// 3. Remove TIPFAILED maps from neededOutputs since we don't need their
// outputs at all.
for (TaskCompletionEvent event : events) {
//對於獲取的每個事件的報告
scheduler.resolve(event);
//這裡使用了ShuffleSchedullerImpl.resolve函式,原始碼如下:
public void resolve(TaskCompletionEvent event) {
switch (event.getTaskStatus()) {
case SUCCEEDED://如果成功
URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());//獲取其URI
addKnownMapOutput(u.getHost() + ":" + u.getPort(),
u.toString(),
event.getTaskAttemptId());
//記錄這個MapTask的節點主機記錄下來,供Fetcher使用,getBaseURI的原始碼:
static URI getBaseURI(TaskAttemptID reduceId, String url) {
StringBuffer baseUrl = new StringBuffer(url);
if (!url.endsWith("/")) {
baseUrl.append("/");
}
baseUrl.append("mapOutput?job=");
baseUrl.append(reduceId.getJobID());
baseUrl.append("&reduce=");
baseUrl.append(reduceId.getTaskID().getId());
baseUrl.append("&map=");
URI u = URI.create(baseUrl.toString());
return u;
獲取各種資訊,然後新增都URI物件中。
}
回到原始碼
maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
//最大的嘗試時間
break;
case FAILED:
case KILLED:
case OBSOLETE://如果MapTask執行失敗
obsoleteMapOutput(event.getTaskAttemptId());//獲取TaskId
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
" map-task: '" + event.getTaskAttemptId() + "'");//寫日誌
break;
case TIPFAILED://如果失敗
tipFailed(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
event.getTaskAttemptId() + "'");//寫日誌
break;
}
}
回到原始碼
if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {//如果事件成功
++numNewMaps;//增加map數量
}
}
} while (events.length == maxEventsToFetch);
return numNewMaps;
}
回到原始碼
failures = 0;
if (numNewMaps > 0) {
LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
}
LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(SLEEP_TIME);
}
} catch (InterruptedException e) {
LOG.info("EventFetcher is interrupted.. Returning");
return;
} catch (IOException ie) {
LOG.info("Exception in getting events", ie);
// check to see whether to abort
if (++failures >= MAX_RETRIES) {
throw new IOException("too many failures downloading events", ie);//失敗數量大於重試的數量
}
// sleep for a bit
if (!Thread.currentThread().isInterrupted()) {
Thread.sleep(RETRY_PERIOD);
}
}
}
} catch (InterruptedException e) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
return;
}
}
MapTask和ReduceTask沒有直接的關係,MapTask不知道ReduceTask在哪些節點上,它只是把進度的時間報告給MRAppMaster。ReduceTask通過“臍帶”執行getMapCompletionEvents操作想MRAppMaster獲取MapTask結束執行的時間報告。有個別的MapTask可能會失敗,但是絕大多數都會成功,只要成功的就通過Fetcher去索取輸出資料,這個資訊就是通過shcheduler完成的也就是ShuffleSchedulerImpl物件,ShuffleSchedulerImpl物件並不多,只是個普通的物件。
fetchers就像執行緒池,裡面有若干執行緒(預設有5個),這些執行緒等待EventFetcher的通知,一旦有MapTask完成就前往提取資料。
獲取更多大資料視訊資料請加QQ群:947967114
我們看Fetcher執行緒類的run方法:
public void run() {
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on, block
merger.waitForResource();
// Get a host to shuffle from
host = scheduler.getHost();
//從scheduler獲取一個已經成功完成的MapTask的節點。
metrics.threadBusy();
//執行緒變成繁忙狀態
// Shuffle
copyFromHost(host);
//開始複製這個節點的資料
} finally {
if (host != null) {//maphost還有執行中的
scheduler.freeHost(host);
//狀態設定成空閒狀態,等待其完成。
metrics.threadFree();
}
}
}
} catch (InterruptedException ie) {
return;
} catch (Throwable t) {
exceptionReporter.reportException(t);
}
}
這裡的重點是copyFromHost獲取資料的函式。
protected void copyFromHost(MapHost host) throws IOException {
// reset retryStartTime for a new host
//這是在ReduceTask的節點上執行的
retryStartTime = 0;
// Get completed maps on 'host'
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
//獲取目標節點上的MapTask集合。
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
if (maps.size() == 0) {
return;//沒有完成的直接返回
}
if(LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
+ maps);
}
// List of maps to be fetched yet
Set remaining = new HashSet(maps);
//已經完成、等待shuffle的MapTask集合。
// Construct the url and connect
DataInputStream input = null;
URL url = getMapOutputURL(host, maps);
//生成MapTask所在節點的URL,下面要看getMapOutputURL原始碼:
private URL getMapOutputURL(MapHost host, Collection maps
) throws MalformedURLException {
// Get the base url
StringBuffer url = new StringBuffer(host.getBaseUrl());
boolean first = true;
for (TaskAttemptID mapId : maps) {
if (!first) {
url.append(",");
}
url.append(mapId);//在URL後面加上mapid
first = false;
}
LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
//寫日誌
return new URL(url.toString());
//返回URL
}
回到主程式碼:
try {
setupConnectionsWithRetry(host, remaining, url);
//和對方主機建立HTTP連線,setupConnectionsWithRetry使用了openConnectionWithRetry函式開啟連結。
openConnectionWithRetry(host, remaining, url);
這段原始碼有使用了openConnection(url);方式,繼續檢視。
如下是連結的主要過程:
protected synchronized void openConnection(URL url)
throws IOException {
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
//使用的是HTTPURL進行連線
if (sslShuffle) {//如果是有信任證書的
HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
//強轉conn型別
try {
httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());//新增一個證書socket的工廠
} catch (GeneralSecurityException ex) {
throw new IOException(ex);
}
httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
}
connection = conn;
}
在setupConnectionsWithRetry中繼續寫到:
setupShuffleConnection(encHash);
//建立了Shuffle連結
connect(connection, connectionTimeout);
// verify that the thread wasn't stopped during calls to connect
if (stopped) {
return;
}
verifyConnection(url, msgToEncode, encHash);
}
//至此連線通過。
if (stopped) {
abortConnect(host, remaining);
//這裡邊是關閉連線,可以點進去看一下,滿足列表和等待的兩個條件
return;
}
} catch (IOException ie) {
boolean connectExcpt = ie instanceof ConnectException;
ioErrs.increment(1);
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
" map outputs", ie);
回到主程式碼
input = new DataInputStream(connection.getInputStream());
//例項一個輸入流物件。
try {
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
TaskAttemptID[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
//如果需要fetcher的列表不空,並且失敗的task數量沒有
try {
failedTasks = copyMapOutput(host, input, remaining, fetchRetryEnabled);
//複製資料出來copyMapOutput的原始碼如下:
try {
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
mapId = TaskAttemptID.forName(header.mapId);
//獲取mapID
compressedLength = header.compressedLength;
decompressedLength = header.uncompressedLength;
forReduce = header.forReduce;
} catch (IllegalArgumentException e) {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
//Don't know which one was bad, so consider all of them as bad
return remaining.toArray(new TaskAttemptID[remaining.size()]);
}
InputStream is = input;
is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
compressedLength -= CryptoUtils.cryptoPadding(jobConf);
decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
//如果需要解壓或解密
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, mapId)) {
return new TaskAttemptID[] {mapId};
}
if(LOG.isDebugEnabled()) {
LOG.debug("header: " + mapId + ", len: " + compressedLength +
", decomp len: " + decompressedLength);
}
try {
mapOutput = merger.reserve(mapId, decompressedLength, id);
//為merge預留一個MapOutput:是記憶體還是磁碟上。
} catch (IOException ioe) {
// kill this reduce attempt
ioErrs.increment(1);
scheduler.reportLocalError(ioe);
//報告錯誤
return EMPTY_ATTEMPT_ID_ARRAY;
}
// Check if we can shuffle *now* ...
if (mapOutput == null) {
LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ...");
//Not an error but wait to process data.
return EMPTY_ATTEMPT_ID_ARRAY;
}
// The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
// on decompression failures. Catching and re-throwing as IOException
// to allow fetch failure logic to be processed
try {
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map "
+ mapOutput.getMapId() + " decomp: " + decompressedLength
+ " len: " + compressedLength + " to " + mapOutput.getDescription());
mapOutput.shuffle(host, is, compressedLength, decompressedLength,
metrics, reporter);
//跨節點把Mapper的檔案內容拷貝到reduce的記憶體或者磁碟上。
} catch (java.lang.InternalError e) {
LOG.warn("Failed to shuffle for fetcher#"+id, e);
throw new IOException(e);
}
// Inform the shuffle scheduler
long endTime = Time.monotonicNow();
// Reset retryStartTime as map task make progress if retried before.
retryStartTime = 0;
scheduler.copySucceeded(mapId, host, compressedLength,
startTime, endTime, mapOutput);
//告訴排程器完成了一個節點的Map輸出的檔案拷貝。
remaining.remove(mapId);
//這個MapTask的輸出已經shuffle完畢
metrics.successFetch();
return null;後面的異常失敗資訊我們不管。
這裡的mapOutput是用來容納MapTask輸出檔案的儲存空間,根據輸出檔案的內容大小和記憶體的情況,可以是記憶體的Output也可以是DiskOutput。 如果是記憶體需要預約,因為不止一個Fetcher。我們以InMemoryMapOutput為例。
程式碼結構;
Fetcher.run-->copyFromHost-->copyMapOutput-->merger.reserve(MergeManagerImpl.reserve)-->InmemoryMapOutput.shuffle
public void shuffle(MapHost host, InputStream input,
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics,
Reporter reporter) throws IOException {
//跨節點從Mapper拷貝spill檔案
IFileInputStream checksumIn =
new IFileInputStream(input, compressedLength, conf);
//校驗和的輸入流
input = checksumIn;
// Are map-outputs compressed?
if (codec != null) {
//如果涉及到了壓縮
decompressor.reset();
//重啟解壓器
input = codec.createInputStream(input, decompressor);
//加了解壓器的輸入流
}
try {
IOUtils.readFully(input, memory, 0, memory.length);
//從Mapper方把特定的Partition資料讀入Reducer的記憶體緩衝區。
metrics.inputBytes(memory.length);
reporter.progress();//彙報進度
LOG.info("Read " + memory.length + " bytes from map-output for " +
getMapId());
/**
* We've gotten the amount of data we were expecting. Verify the
* decompressor has nothing more to offer. This action also forces the
* decompressor to read any trailing bytes that weren't critical
* for decompression, which is necessary to keep the stream
* in sync.
*/
if (input.read() >= 0 ) {
throw new IOException("Unexpected extra bytes from input stream for " +
getMapId());
}
} catch (IOException ioe) {
// Close the streams
IOUtils.cleanup(LOG, input);
// Re-throw
throw ioe;
} finally {
CodecPool.returnDecompressor(decompressor);
//釋放解壓器
}
}
從對方把spill檔案中屬於本partition資料複製過來,回到copyFromHost中,通過scheduler.copySuccessed告知scheduler,並把這個MapTask的ID從remaining集合中刪除,進入下一個迴圈,複製下一個MapTask資料。直到把所有的屬於本Partition的資料都複製過來。
以上是Reducer端Fetcher的過程,它向Mapper端傳送HTTP GET請求,下載檔案。在MapTask就有一個與之對應的Server,這個網路協議的原始碼不做深究,課下有興趣自己研究。 獲取更多大資料視訊資料請加QQ群:947967114