1. 程式人生 > >MapReducer中原始碼Mapper和Reducer方法原始碼解析

MapReducer中原始碼Mapper和Reducer方法原始碼解析

原始碼中Mapper類中的方法

	/**
	   * The <code>Context</code> passed on to the {@link Mapper} implementations.
	   */
	  public abstract class Context
	    implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
	  }

上下文map結束後,向reduce或者下一個階段寫資料時候

/**
   * Called once at the beginning of the task.
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

任務開始的時候被呼叫一次

/**
   * Called once for each key/value pair in the input split. Most applications
   * 對於輸入分割中的每個鍵/值對呼叫一次。所有的應用程式
   * should override this, but the default is the identity function.
   * 應該重寫這個,但預設是identity函式
   * 這裡的key和value是輸入的
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
	//輸出的key-value context是上下文,屬於管理者
    context.write((KEYOUT) key, (VALUEOUT) value);
	
  }

處理整個map階段的核心業務

 /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }

任務結束的時候

/**
   * Expert users can override this method for more complete control over the
   * 專家使用者可以重寫此方法以更完整地控制執行的mapper
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
	//初始化資料(初始化集合,載入表等)
    setup(context);
    try {
      while (context.nextKeyValue()) {
		//核心業務邏輯
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
	//最終結束:流的關閉,資源的處理
      cleanup(context);
    }
  }
}

具體的執行map方法的順序

Reducer類

 /**
   * The <code>Context</code> passed on to the {@link Reducer} implementations.
   */
  public abstract class Context 
    implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  }

負責寫出資料的

 /**
   * Called once at the start of the task.
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

開始的時候呼叫,初始化操作

 /**
   * This method is called once for each key(這個方法被所有key使用). Most applications will define
   * their reduce class by overriding this method(所有的應用都會重寫這個方法). The default implementation(預設是identity函式)
   * is an identity function.
   */
  @SuppressWarnings("unchecked")
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

具體的Reducer業務邏輯

  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }

收尾的一些關閉流的操作

  /**
   * Advanced application writers can use the 高階應用程式編寫者可以使用
   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
   * control how the reduce task works.控制整個reduce task工作
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }

將所有方法串在一起