1. 程式人生 > >Hadoop原始碼詳解之Mapper類

Hadoop原始碼詳解之Mapper類

Hadoop原始碼詳解之Mapper

1. 類釋義

Maps input key/value pairs to a set of intermediate key/value pairs.
將輸入的鍵值對應成一系列的中間鍵值對

Maps are the individual tasks which transform input records into a intermediate records. The transformed intermediate records need not be of the same type as the input records. A given input pair may map to zero or many output pairs.
Maps 將輸入記錄轉換成一箇中間的記錄 是單個任務。轉換後的中間記錄不需要與輸入記錄相同。一個給出的對可能對映0或多個輸出對。

The Hadoop Map-Reduce framework spawns one map task for each InputSplit generated by the InputFormat for the job. Mapper implementations can access the Configuration for the job via the JobContext.getConfiguration().
HadoopMap-Reduce框架 對每一個job的InputFormat InputSplit產出一個 map任務。Mapper實現能夠訪問job的配置,通過 JobContext.getConfiguration()

The framework first calls setup(org.apache.hadoop.mapreduce.Mapper.Context), followed by map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context) for each key/value pair in the InputSplit. Finally cleanup(org.apache.hadoop.mapreduce.Mapper.Context) is called.
框架首先呼叫setup(…),緊接著是map(…)針對每一個key/value對在InputSplit中。最後是呼叫 cleanup(...)

All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to a Reducer to determine the final output. Users can control the sorting and grouping by specifying two key RawComparator classes.
與給出的輸出鍵關聯的所有的中間值隨後由框架分組,並傳遞給 一個 Reducer 以確定最終輸出。 使用者可以通過指定兩個關鍵的RawComparator 類來控制排序和分組。 【什麼是兩個關鍵的RawComparator?】

The Mapper outputs are partitioned per Reducer. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.
Mapper 輸出被每一個Reducer 分割槽。使用者能夠控制哪一個鍵(因此控制的是記錄)去到哪一個Reducer通過實現自定義的Partitioner

Users can optionally specify a combiner, via Job.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.
使用者能夠選擇一個指定的combiner,通過 Job.setCombinerClass(Class),去執行一個本地的聚合,針對中間的輸出,這能夠幫助減少從MapperReducer時傳輸資料的數量。

Applications can specify if and how the intermediate outputs are to be compressed and which CompressionCodecs are to be used via the Configuration.
應用能夠使用Configuration 去指定是否以及怎樣比較中間輸出結果以及哪個CompressionCodecs會被使用。

2. 類原始碼

package org.apache.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.task.MapContextImpl;

 /* 
 * @see InputFormat
 * @see JobContext
 * @see Partitioner  
 * @see Reducer
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

  /**
   * The <code>Context</code> passed on to the {@link Mapper} implementations.
   */
  public abstract class Context
    implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  }
  
  /**
   * Called once at the beginning of the task.
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }
  
  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
}

3. 類方法

3.1 內部類 Context
  • 類釋義

The Context passed on to the Mapper implementations.
傳遞給Mapper實現的Context

  • 類程式碼
public abstract class Context
    implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  }

可以看到這個抽象類Context 繼承了MapContext 這個介面,其泛型是<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
MapContext這個類又繼承自TaskInputOutputContext,這個TaskInputOutputContext 類中有一個方法write()。在WordCountMapper類中,就是使用這個write()方法去輸出中間的鍵值對

//輸出<單詞,1>
for(String word:words){
 //1.write():Generate an output key/value pair.=>(KeyOut[Type is Text],ValueOut[Type is IntWritable])
 context.write(new Text(word), new IntWritable(1));
}

???但是我不清楚的是,這個write()方法的真正實現在哪裡。???