1. 程式人生 > >Hadoop原始碼分析(2)————MapReduce之MapTask

Hadoop原始碼分析(2)————MapReduce之MapTask

MapTask(Hadoop2.7.3)

MapTask.java繼承於Task,是hadoop中Map節點主要所做的主要流程。
一般被jvmtask初始化或者在MapTaskAttemptImpl被初始化。其主要流程寫在run()方法中。
run()方法主要在YarnChild中被呼叫(偽分散式或者分散式,如果是在單機模式一般被LocalJobRunning被調)也就是說Map節點都是被Yarn啟動的。

@Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
          throws
IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; if (isMapTask()) { // If there are no reducers then there won't be any sort. Hence the map // phase will govern the entire attempt's progress. if (conf.getNumReduceTasks() == 0) { mapPhase = getProgress().addPhase("map"
, 1.0f); } else { // If there are reducers then the entire attempt's progress will be // split between the map phase (67%) and the sort phase (33%). mapPhase = getProgress().addPhase("map", 0.667f); sortPhase = getProgress().addPhase("sort", 0.333f); } } TaskReporter reporter = startReporter(umbilical); boolean
useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; } //sendSend(umbilical); if (useNewApi) { System.out.println("新事前:"+umbilical.getClass().getName()); LOG.info("新事前:"+umbilical.getClass().getName()); runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { System.out.println("舊事前:"+umbilical.getClass().getName()); LOG.info("舊事前:"+umbilical.getClass().getName()); runOldMapper(job, splitMetaInfo, umbilical, reporter); } LOG.info("事後:"+umbilical.getClass().getName()); System.out.println("事後:"+umbilical.getClass().getName()); //sendSend(umbilical); done(umbilical, reporter); }

這部分是我加過log的run()方法,因為圖省事,找日誌找起來比較方便,就直接使用System.out.println()來做輸出了。這樣的話當執行完之後,所有的輸出都會在hadoop目錄下logs/userlogs/stdout裡面。

可以看到run()方法主要有兩個引數:

  • final JobConf job
    第一個引數主要是 Job的一些配置,可能會影響到maptask執行時候的一些方式。

  • final TaskUmbilicalProtocol umbilical
    這個引數就比較重要了,主要是的事件通訊都是通過這個物件的。具體的在說MapTask所涉及到的狀態機的時候再說吧。

這個類一部分都是一些初始化的工作,都很輕鬆就能看懂,主要是在

if (useNewApi) {
      System.out.println("新事前:"+umbilical.getClass().getName());
      LOG.info("新事前:"+umbilical.getClass().getName());
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      System.out.println("舊事前:"+umbilical.getClass().getName());
      LOG.info("舊事前:"+umbilical.getClass().getName());
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    LOG.info("事後:"+umbilical.getClass().getName());
    System.out.println("事後:"+umbilical.getClass().getName());
    //sendSend(umbilical);
    done(umbilical, reporter);

這一部分,判斷使用者是否指定使用新的map方式還是舊的map方式(新的方式和舊的方式主要功能並沒有變化,主要是新的runNewMapper在架構上要比runOldMapper好一點。。。官方說的),這裡我們以runOldMapper為例

InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()),
            splitIndex.getStartOffset());

這部分程式碼是指,map任務得到job分配給他的需要他來計算的那一部分分片。

RecordReader<INKEY, INVALUE> in = isSkipping() ?
            new SkippingRecordReader<INKEY, INVALUE>(umbilical, reporter, job) :
            new TrackedRecordReader<INKEY, INVALUE>(reporter, job);

這一部分是例項化一個RecordReader,主要用於按照行或者某種方式來讀取自己需要計算的那一個分片。具體取決於這裡被例項化的是哪一個實現類。
然後是hadoop會呼叫使用者編寫的map方法用來處理資料。

再下面是MapOutputCollector這個類,主要用於處理map的輸出。

這裡面有很多方法比較重要,比如sortAndSpill()這個方法。
這個方法主要是被SpillThread呼叫的,
SpillThread這個執行緒在MapOutputBuffer初始化時被啟動,它會一直監視著環形緩衝區當環形緩衝區超過一定值(我記得原始碼裡面好像是寫了是在大部分情況下是百分之八十被佔滿後)的時候將會呼叫sortAndSpill方法,講環形緩衝區中已經計算完畢的資料寫到本地檔案中。

當整個runOldMapper(或者是runNewMapper)完畢後將會執行done()函式傳送事件,通過狀態機變遷完成一些事情(這部分下次會提到)

大概主要的流程也就是這幾個,如果有需要的或者不明白的地方,歡迎發郵件給我[email protected]

相關推薦

Hadoop原始碼分析2————MapReduceMapTask

MapTask(Hadoop2.7.3) MapTask.java繼承於Task,是hadoop中Map節點主要所做的主要流程。 一般被jvmtask初始化或者在MapTaskAttemptImpl被初始化。其主要流程寫在run()方法中。 run()方法

spring4.2.9 java專案環境下ioc原始碼分析——refreshobtainFreshBeanFactory方法@2處理Resource、載入Document及解析前準備

接上篇文章,上篇文章講到載入完返回Rescouce。先找到要解析的程式碼位置,在AbstractBeanDefinitionReader類的loadBeanDefinitions(String location, Set<Resource> actualResou

spring4.2.9 java專案環境下ioc原始碼分析——refreshobtainFreshBeanFactory方法@4預設標籤bean,beans解析、最終註冊

接上篇文章,解析了import和alias標籤,只是開胃菜比較簡單,下面介紹bean標籤的載入,也是預設名稱空間下解析的重點。protected void processBeanDefinition(Element ele, BeanDefinitionParserDeleg

spring4.2.9 java專案環境下ioc原始碼分析——refreshobtainFreshBeanFactory方法@3預設標籤import,alias解析

接上篇文章,到了具體解析的時候了,具體的分為兩種情況,一種是預設名稱空間的標籤<bean>;另一種是自定義名稱空間的標籤比如<context:xxx>,<tx:xxx>等。先看下預設的名稱空間的標籤解析。protected void par

spring4.2.9 java專案環境下ioc原始碼分析——refreshobtainFreshBeanFactory方法@1準備工作與載入Resource

obtainFreshBeanFactory方法從字面的意思看獲取新的Bean工廠,實際上這是一個過程,一個載入Xml資源並解析,根據解析結果組裝BeanDefinitions,然後初始化BeanFactory的過程。在載入Xml檔案之前,spring還做了一些其他的工作,比

Mybatis 原始碼分析2—— 引數處理

Mybatis對引數的處理是值得推敲的,不然在使用的過程中對發生的一系列錯誤直接懵逼了。 以前遇到引數繫結相關的錯誤我就是直接給加@param註解,也稀裡糊塗地解決了,但是後來遇到了一些問題推翻了我的假設:單個引數不需要使用 @param 。由此產生了一個疑問,Mybatis到底是怎

PackageManagerService 原始碼分析2

  一.scanPackageLI PKMS 中呼叫scanDirLI來分析APK 檔案,如果目錄下的是apk檔案或者是目錄,會繼續呼叫scanPackageLI函式: private PackageParser.Package scanPackageLI(File s

Android進階3:Activity原始碼分析2 —— Activity啟動和銷燬流程8.0

上篇文章講述了app從啟動建立Activity呼叫onCreate,onStart, onResume方法,這篇文章講述一下Activity啟動的另一個切入點:startActivity方法,啟動Activity。 通過上一篇文章,我們總結一下: 1:A

spring原始碼閱讀2-aopjdk動態代理深入解析

續spring原始碼閱讀(2)-aop之j動態代理 我們從需求作為動態代理髮展的切入吧 現在有5個已經投產了的run100m的實現,我們新的需求需要監控不同實現的執行效能,如果我們針對這五個實現分別去新增效能監控的程式碼,如此就造成兩個問題: 一個是已經穩定的程式碼需要

spring原始碼閱讀2-aop原始碼解析篇

經過一個aop術語介紹和動態代理的深入講解,我們終於可以來看aop的原始碼了,下面跟著博主一點點剖析spring aop原始碼的實現吧 我們知道spring使用中我們只要做好相關的配置,spring自動幫我們做好了代理的相關工作。 我們從三個方面入手吧 1、配置 2、

tensorflowV1.11-原始碼分析2

通過前面的run_shell函式,執行python指令碼,並返回python庫的路徑。 def get_python_path(environ_cp, python_bin_path): """Get the python site package paths.""" python_paths = [

hadoop隨手筆記2--mapreduce的執行機理

(1)InputFormat輸入格式: 裡面定義了getSplits方法,主要將輸入的檔案分割成邏輯上的多個分片InputSplit,這裡面的分片不是真正意義上的分片,只是邏輯上的分片,每個分片同夥輸入檔案的路徑、開始時為止和偏移量三個資訊來唯一標識。 使用createRecordReader方

Django rest framework原始碼分析2----許可權

目錄 新增許可權 (1)API/utils資料夾下新建premission.py檔案,程式碼如下: message是當沒有許可權時,提示的資訊 # utils/permission.py class SVIPPremission(object): message =

【原創】docker原始碼分析2---docker server

上一節,分析了Engine和job。那這一節就開始講下docker server。 1、docker server 1.1 主體流程 我們從main函式開始,看看docker server

Android init原始碼分析2init.rc解析

action_for_each_trigger("early-init", action_add_queue_tail); queue_builtin_action(wait_for_coldboot_done_action, "wait_for_coldboot_done"); q

HDFS原始碼分析2----HDFS原始碼結構

BlockPlacementPolicy.java----抽象類:這個介面用於選擇放置塊副本的目標磁碟的所需的數目; BlockPlacementPolicyDefault.java----繼承實現類:這個類實現了選擇放置塊副本的目標磁碟的所需的數目; BlockPlacementPolicyWithNode

U-Boot啟動過程原始碼分析2-第二階段

先總述:第一階段cpu/arm920t/start.S和board/smdk2410/lowlevel_init.S進行初始化,再跳到第二階段的入口點lib_arm/board.c中的start_armboot函式。 第二階段start_armboot函式需

libevent原始碼分析2--2.1.8--結構體 struct event和struct event_callback

一、event_callback結構體 struct event_callback { //下一個回撥事件 TAILQ_ENTRY(event_callback) evcb_active_next; //回撥事件的狀態標識,具體為:

opendaylightLi l2switch 原始碼分析2--parent

本文主要介紹l2switch中的parent工程,該工程定義了執行L2switch所使用的依賴模組以及版本等。 該工程下只有一個pom.xml檔案,下面對該檔案中的主要內容進行說明: 1. <parent>     <groupId>org.open

MySQL原始碼分析2:Mysql中的記憶體分配相關

Mysql中的記憶體分配相關 涉及到記憶體的配置引數這些引數可以分成兩部分,分別對應MySQL中的兩個層次:伺服器層和儲存引擎層。 MySQL伺服器相關: 每個連線到MySQL伺服器的執行緒都需要有自己的緩衝,預設為其分配256K。事務開始之後,則需要增加更多的空間。執行較