1. 程式人生 > >shuffle的關鍵階段sort(Map端和Reduce端)原始碼分析

shuffle的關鍵階段sort(Map端和Reduce端)原始碼分析

原始碼中有這樣一段程式碼

1. Map端排序獲取的比較器


public RawComparator getOutputKeyComparator() {
   // 獲取mapreduce.job.output.key.comparator.class,必須是RawComparator型別,如果沒設定,是null
    Class<? extends RawComparator> theClass = getClass(
      JobContext.KEY_COMPARATOR, null, RawComparator.class);
    // 如果使用者自定義了這個引數,那麼例項化使用者自定義的比較器
    if (theClass != null)
      return ReflectionUtils.newInstance(theClass, this);
   // 預設情況,使用者是沒用自定義這個引數
   //  判斷Map輸出的key,是否是WritableComparable的子類
  //   如果是,呼叫當前類的內部的Comparator!
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
  }

總結: 如何對感興趣的資料進行排序?

              ① 資料必須作為key

             ② 排序是框架自動排序,我們提供基於key的比較器,也就是Comparator,必須是RawComparator型別

                            a) 自定義類,實現RawComparator,重寫compare()

                                          指定mapreduce.job.output.key.comparator.class為自定義的比較器型別

                            b)key實現WritableComparable(推薦)

              ③ 實質都是呼叫相關的comparaTo()方法,進行比較

2. Reduce端進行分組的比較器

RawComparator comparator = job.getOutputValueGroupingComparator();

// 獲取mapreduce.job.output.group.comparator.class,必須是RawComparator型別

// 如果沒用設定,直接獲取MapTask排序使用的比較器

// 也是比較key

 

public RawComparator getOutputValueGroupingComparator() {

    Class<? extends RawComparator> theClass = getClass(

      JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);

    if (theClass == null) {

      return getOutputKeyComparator();

    }

    // 如果設定了,就使用設定的比較器

    return ReflectionUtils.newInstance(theClass, this);

  }