1. 程式人生 > >Mapper中map方法下context.write的流程與程式碼詳解

Mapper中map方法下context.write的流程與程式碼詳解

本文的分析基於Hadoop 2.4.0版本

任何Map任務在Hadoop中都會被一個MapTask物件所詳細描述,MapTask會最終呼叫其run方法來執行它對應的Map任務,需要執行任務就必須要有相關的輸入輸出資訊,這些資訊都包含在Map任務對應的Context物件中,Context通過RecordReader來獲取輸入資料,Map任務的輸入檔案儲存在InputSplit中,這個InputSplit儲存了檔案的路徑、範圍和位置;通過RecordWriter來儲存處理後的資料,在Context中還有個任務報告器TaskReporter,它不斷向ApplicationMaster報告任務的執行進度

Mapper abstract class Context implements MapContext(介面)

Reducer的abstract class Context implements ReduceContext(介面)

MapContext和ReduceContext都是extends TaskInputOutputContext(介面)

interface TaskInputOutputContext的實現abstractclass TaskInputOutputContextImpl

TaskInputOutputContextImpl的write方法由abstract

class RecordWriter<KEYOUT,VALUEOUT>實現

Map和Reduce中的Context物件的write方法都是呼叫RecordWriter的write方法

其中RecordWriter有很多的實現類如classMapTask下的privateclass NewOutputCollector

private class NewDirectOutputCollector

NewOutputCollector舉例:

public void write(K key, V value)throws IOException, InterruptedException {

      collector.collect(key, value,partitioner.getPartition(key,value, partitions));

}

其中collector的型別是MapOutputCollector(介面),最後實現類是static class MapOutputBuffer,是MapTask下的類,方法簽名如下:

public synchronized void collect(K key, V value, final int partition ) throws IOException
在2.x中對1.x進行了改進,1.x中的int [] kvoffsets, int[] kvindices2.x中的IntBufferkvmeta給替換了

首選會呼叫publicvoid init(MapOutputCollector.Context context 方法進行初始化)

什麼時候呼叫init的呢?

如果是使用runNewMapper的話,當getNumReduceTasks() != 0時,呼叫private <KEY, VALUE> MapOutputCollector<KEY,VALUE>createSortingCollector(JobConfjob, TaskReporter reporter)方法內部呼叫的collector.init(context),其中在NewOutputCollector類的構造方法中呼叫了collector =createSortingCollector(job, reporter),getNumReduceTasks() == 0時,呼叫另一種直接的方式

使用runOldMapper的話,是直接當numReduceTasks > 0時呼叫createSortingCollector(job, reporter),當numReduceTasks =0時,呼叫另一種直接的方式

final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);

final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);

key和value需要進行序列化的操作

keyClass = (Class<K>)job.getMapOutputKeyClass();

valClass = (Class<V>)job.getMapOutputValueClass();

serializationFactory = new SerializationFactory(job);

keySerializer = serializationFactory.getSerializer(keyClass);

keySerializer.open(bb);

valSerializer = serializationFactory.getSerializer(valClass);

valSerializer.open(bb);

初始化操作之後就執行public synchronized void collect(K key,V value, final int partition)方法

然後執行flush方法:將快取中的資料刷到磁碟上

進行spill的時候,先是對快取中的資料進行排序

sorter.sort(MapOutputBuffer.this, mstart, mend,reporter)

MapTask的執行主要程式碼:

MapTask的入口是run方法publicvoid run(final JobConf job,final TaskUmbilicalProtocol umbilical)

    if (isMapTask()) {

      // 如果沒有reducer的任務,map階段會支配所有的程序

      if (conf.getNumReduceTasks() == 0) {

        mapPhase = getProgress().addPhase("map", 1.0f);

      } else {

        // 如果有reducer的任務,全部被分為map階段和sort階段,各自佔據一定的處理過程.

        mapPhase = getProgress().addPhase("map", 0.667f);

        sortPhase  =getProgress().addPhase("sort", 0.333f);

      }

}

//判斷是否使用新的api

boolean useNewApi = job.getUseNewMapper()

Configuration類中的方法,預設是false

  public boolean getUseNewMapper() {

    return getBoolean("mapred.mapper.new-api",false);

  }

//useNewAPi是關於committer的設定不同,

initialize(job, getJobID(), reporter,useNewApi);

    if (useNewApi) {

      if (LOG.isDebugEnabled()) {

        LOG.debug("using new api for outputcommitter");

      }

      outputFormat =

        ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);

      committer = outputFormat.getOutputCommitter(taskContext);

    } else {

      committer = conf.getOutputCommitter();

    }

做一些處理操作

    if (jobCleanup) {

      runJobCleanupTask(umbilical, reporter);

      return;

    }

    if (jobSetup) {

      runJobSetupTask(umbilical, reporter);

      return;

    }

    if (taskCleanup) {

      runTaskCleanupTask(umbilical, reporter);

      return;

    }

然後

    if (useNewApi) {

      runNewMapper(job, splitMetaInfo, umbilical, reporter);

    } else {

      runOldMapper(job, splitMetaInfo, umbilical, reporter);

    }

runNewMapper與runOldMapper的不同,對程式碼的不同包裝,實現效果基本相同