1. 程式人生 > >Hadoop 二次排序 Secondary Sort

Hadoop 二次排序 Secondary Sort

mr自帶的例子中的原始碼SecondarySort,我重新寫了一下,基本沒變。

這個例子中定義的map和reduce如下,關鍵是它對輸入輸出型別的定義:(java泛型程式設計) 

public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> 
public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable, IntWritable> 

1 首先說一下工作原理:

在map階段,使用job.setInputFormatClass定義的InputFormat將輸入的資料集分割成小資料塊splites,同時InputFormat提供一個RecordReder的實現。本例子中使用的是TextInputFormat,他提供的RecordReder會將文字的一行的行號作為key,這一行的文字作為value。這就是自定義Map的輸入是<LongWritable, Text>的原因。然後呼叫自定義Map的map方法,將一個個<LongWritable, Text>對輸入給Map的map方法。注意輸出應該符合自定義Map中定義的輸出<IntPair, IntWritable>。最終是生成一個List<IntPair, IntWritable>。在map階段的最後,會先呼叫job.setPartitionerClass對這個List進行分割槽,每個分割槽對映到一個reducer。每個分割槽內又呼叫job.setSortComparatorClass設定的key比較函式類排序。可以看到,這本身就是一個二次排序。如果沒有通過job.setSortComparatorClass設定key比較函式類,則使用key的實現的compareTo方法。在第一個例子中,使用了IntPair實現的compareTo方法,而在下一個例子中,專門定義了key比較函式類。
在reduce階段,reducer接收到所有對映到這個reducer的map輸出後,也是會呼叫job.setSortComparatorClass設定的key比較函式類對所有資料對排序。然後開始構造一個key對應的value迭代器。這時就要用到分組,使用jobjob.setGroupingComparatorClass設定的分組函式類。只要這個比較器比較的兩個key相同,他們就屬於同一個組,它們的value放在一個value迭代器,而這個迭代器的key使用屬於同一個組的所有key的第一個key。最後就是進入Reducer的reduce方法,reduce方法的輸入是所有的(key和它的value迭代器)。同樣注意輸入與輸出的型別必須與自定義的Reducer中宣告的一致。 

跟蹤原始碼發現,Reduce中的Context繼承自ReduceContext,ReduceContext的public boolean nextKey() 是通過判斷hasMore && nextKeyIsSame,而對hasMore 和 nextKeyIsSame修改的方法是public boolean nextKeyValue(),該方法中通過

hasMore = input.next();
    if (hasMore) {
      next = input.getKey();
      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
                                         currentRawKey.getLength(),
                                         next.getData(),
                                         next.getPosition(),
                                         next.getLength() - next.getPosition()
                                         ) == 0;
    } else {
      nextKeyIsSame = false;
    }

hasMore = input.next() 中會使用ReduceContext構造方法中傳入的comparator物件,而生成ReduceContext物件的是ReduceTask中的如下程式碼:

RawComparator comparator = job.getOutputValueGroupingComparator();

    if (useNewApi) {
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);

在runNewReducer()中有如下程式碼:

org.apache.hadoop.mapreduce.Reducer.Context 
         reducerContext = createReduceContext(reducer, job, getTaskID(),
                                               rIter, reduceInputKeyCounter,
                                               reduceInputValueCounter, 
                                               trackedRW, committer,
                                               reporter, comparator, keyClass,
                                               valueClass);

createReduceContext程式碼如下:

protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> 
  org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
  createReduceContext(org.apache.hadoop.mapreduce.Reducer
                        <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
                      Configuration job,
                      org.apache.hadoop.mapreduce.TaskAttemptID taskId, 
                      RawKeyValueIterator rIter,
                      org.apache.hadoop.mapreduce.Counter inputKeyCounter,
                      org.apache.hadoop.mapreduce.Counter inputValueCounter,
                      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, 
                      org.apache.hadoop.mapreduce.OutputCommitter committer,
                      org.apache.hadoop.mapreduce.StatusReporter reporter,
                      RawComparator<INKEY> comparator,
                      Class<INKEY> keyClass, Class<INVALUE> valueClass
  ) throws IOException, ClassNotFoundException {
    try {

      return contextConstructor.newInstance(reducer, job, taskId,
                                            rIter, inputKeyCounter, 
                                            inputValueCounter, output, 
                                            committer, reporter, comparator, 
                                            keyClass, valueClass);
    } catch (InstantiationException e) {
      throw new IOException("Can't create Context", e);
    } catch (InvocationTargetException e) {
      throw new IOException("Can't invoke Context constructor", e);
    } catch (IllegalAccessException e) {
      throw new IOException("Can't invoke Context constructor", e);
    }
  }
contextConstructor為反射出的Constructor object
private static final Constructor<org.apache.hadoop.mapreduce.Reducer.Context> contextConstructor;
  static {
    try {
      contextConstructor = 
        org.apache.hadoop.mapreduce.Reducer.Context.class.getConstructor
        (new Class[]{org.apache.hadoop.mapreduce.Reducer.class,
            Configuration.class,
            org.apache.hadoop.mapreduce.TaskAttemptID.class,
            RawKeyValueIterator.class,
            org.apache.hadoop.mapreduce.Counter.class,
            org.apache.hadoop.mapreduce.Counter.class,
            org.apache.hadoop.mapreduce.RecordWriter.class,
            org.apache.hadoop.mapreduce.OutputCommitter.class,
            org.apache.hadoop.mapreduce.StatusReporter.class,
            RawComparator.class,
            Class.class,
            Class.class});
    } catch (NoSuchMethodException nme) {
      throw new IllegalArgumentException("Can't find constructor");
    }
  }
可以發現ReduceContext中的comparator是設定的GroupingComparator

2  二次排序就是首先按照第一欄位排序,然後再對第一欄位相同的行按照第二欄位排序,注意不能破壞第一次排序的結果 。例如

輸入檔案
20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8
輸出:(注意需要分割線)
------------------------------------------------
1       2
------------------------------------------------
3       4
------------------------------------------------
5       6
------------------------------------------------
7       8
7       82
------------------------------------------------
12      211
------------------------------------------------
20      21
20      53
20      522
------------------------------------------------
31      42
------------------------------------------------
40      511
------------------------------------------------
50      51
50      52
50      53
50      53
50      54
50      62
50      512
50      522
------------------------------------------------
60      51
60      52
60      53
60      56
60      56
60      57
60      57
60      61
------------------------------------------------
63      61
------------------------------------------------
70      54
70      55
70      56
70      57
70      58
70      58
------------------------------------------------
71      55
71      56
------------------------------------------------
73      57
------------------------------------------------
74      58
------------------------------------------------
203     21
------------------------------------------------
530     54
------------------------------------------------
730     54
------------------------------------------------
740     58 

3  具體步驟:
(1)自定義key

在mr中,所有的key是需要被比較和排序的,並且是二次,先根據partitione,再根據大小。而本例中也是要比較兩次。先按照第一欄位排序,然後再對第一欄位相同的按照第二欄位排序。根據這一點,我們可以構造一個複合類IntPair,他有兩個欄位,先利用分割槽對第一欄位排序,再利用分割槽內的比較對第二欄位排序。
所有自定義的key應該實現介面WritableComparable,因為是可序列的並且可比較的。並重載方法:

  1. //反序列化,從流中的二進位制轉換成IntPair
  2. publicvoid readFields(DataInput in) throws IOException          
  3. //序列化,將IntPair轉化成使用流傳送的二進位制
  4. publicvoid write(DataOutput out)  
  5. //key的比較
  6. publicint compareTo(IntPair o)          
  7. //另外新定義的類應該重寫的兩個方法
  8. //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
  9. publicint hashCode()   
  10. public boolean equals(Object right)  

(2)由於key是自定義的,所以還需要自定義一下類:
(2.1)分割槽函式類。這是key的第一次比較。

  1. publicstaticclass FirstPartitioner extends Partitioner<IntPair,IntWritable>  

在job中使用setPartitionerClasss設定Partitioner。
(2.2)key比較函式類。這是key的第二次比較。這是一個比較器,需要繼承WritableComparator。

  1. publicstaticclass KeyComparator extends WritableComparator  

必須有一個建構函式,並且過載 public int compare(WritableComparable w1, WritableComparable w2)

另一種方法是 實現介面RawComparator。
在job中使用setSortComparatorClass設定key比較函式類。
(2.3)分組函式類。在reduce階段,構造一個key對應的value迭代器的時候,只要first相同就屬於同一個組,放在一個value迭代器。這是一個比較器,需要繼承WritableComparator。

  1. publicstaticclass GroupingComparator extends WritableComparator  

分組函式類也必須有一個建構函式,並且過載 public int compare(WritableComparable w1, WritableComparable w2)
分組函式類的另一種方法是實現介面RawComparator。
在job中使用setGroupingComparatorClass設定分組函式類。

另外注意的是,如果reduce的輸入與輸出不是同一種類型,則不要定義Combiner也使用reduce,因為Combiner的輸出是reduce的輸入。除非重新定義一個Combiner。 

3 程式碼。

這個例子中沒有使用key比較函式類,而是使用key的實現的compareTo方法。 

  1. package secondarySort;  
  2. import java.io.DataInput;  
  3. import java.io.DataOutput;  
  4. import java.io.IOException;  
  5. import java.util.StringTokenizer;  
  6. import org.apache.hadoop.conf.Configuration;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.IntWritable;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.io.WritableComparable;  
  12. import org.apache.hadoop.io.WritableComparator;  
  13. import org.apache.hadoop.mapreduce.Job;  
  14. import org.apache.hadoop.mapreduce.Mapper;  
  15. import org.apache.hadoop.mapreduce.Partitioner;  
  16. import org.apache.hadoop.mapreduce.Reducer;  
  17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  18. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  20. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  21. publicclass SecondarySort {  
  22.     //自己定義的key類應該實現WritableComparable介面
  23.     publicstaticclass IntPair implements WritableComparable<IntPair> {  
  24.         int first;  
  25.         int second;  
  26.         /** 
  27.          * Set the left and right values. 
  28.          */
  29.         publicvoid set(int left, int right) {  
  30.             first = left;  
  31.             second = right;  
  32.         }  
  33.         publicint getFirst() {  
  34.             return first;  
  35.         }  
  36.         publicint getSecond() {  
  37.             return second;  
  38.         }  
  39.         @Override
  40.         //反序列化,從流中的二進位制轉換成IntPair
  41.         publicvoid readFields(DataInput in) throws IOException {  
  42.             // TODO Auto-generated method stub
  43.             first = in.readInt();  
  44.             second = in.readInt();  
  45.         }  
  46.         @Override
  47.         //序列化,將IntPair轉化成使用流傳送的二進位制
  48.         publicvoid write(DataOutput out) throws IOException {  
  49.             // TODO Auto-generated method stub
  50.             out.writeInt(first);  
  51.             out.writeInt(second);  
  52.         }  
  53.         @Override
  54.         //key的比較
  55.         publicint compareTo(IntPair o) {  
  56.             // TODO Auto-generated method stub
  57.             if (first != o.first) {  
  58.                 return first < o.first ? -1 : 1;  
  59.             } elseif (second != o.second) {  
  60.                 return second < o.second ? -1 : 1;  
  61.             } else {  
  62.                 return0;  
  63.             }  
  64.         }  
  65.         //新定義類應該重寫的兩個方法
  66. 相關推薦

    Hadoop 排序 Secondary Sort

    mr自帶的例子中的原始碼SecondarySort,我重新寫了一下,基本沒變。 這個例子中定義的map和reduce如下,關鍵是它對輸入輸出型別的定義:(java泛型程式設計)  public static class Map extends Mapp

    hadoop 排序的一些思考

    先說一下mr的二次排序需求: 假如檔案有兩列分別為name、score,需求是先按照name排序,name相同按照score排序 資料如下: jx 20 gj 30 jx 10 gj 15 輸出結果要求: gj 15 gj 30 jx 10 jx 20 我們常見的實現思路是: 1. 自

    一起學Hadoop——排序演算法的實現

    二次排序,從字面上可以理解為在對key排序的基礎上對key所對應的值value排序,也叫輔助排序。一般情況下,MapReduce框架只對key排序,而不對key所對應的值排序,因此value的排序經常是不固定的。但是我們經常會遇到同時對key和value排序的需求,例如Hadoop權威指南中的求一年的高高氣溫

    hadoop 排序和一個java實現

    需要二次排序的原因:mapreduce架構自動對對映器生成的鍵進行排序,即歸約器啟動之前,所有鍵是有序的,但是值是隨機的,二次排序指的是對值進行排序。歸約器輸入形如:,即一個key對應多個值,這些值是無序的,排序後得到有序的值,如下: 其中,S按照升序或者降序排列

    hadoop排序 (Map/Reduce中分割槽和分組的問題)

    1.二次排序概念:首先按照第一欄位排序,然後再對第一欄位相同的行按照第二欄位排序,注意不能破壞第一次排序的結果 。如: 輸入檔案:20 21 50 51 50 52 50 53 50 54 60 51 60 53 60 52 60 56 60 57 70 58 60 61 70 54 70 55 70 56 

    hadoop排序

    趁這個時候,順便把hadoop的用於比較的Writable, WritableComparable, Comprator等搞清楚。。 1.二次排序概念: 首先按照第一欄位排序,然後再對第一欄位相同的行按照第二欄位排序,注意不能破壞第一次排序的結果 。 如: 輸入檔案: 20 21 50 51

    Hadoop排序及MapReduce處理流程例項詳解

    一、概述 MapReduce框架對處理結果的輸出會根據key值進行預設的排序,這個預設排序可以滿足一部分需求,但是也是十分有限的,在我們實際的需求當中,往往有要對reduce輸出結果進行二次排序的需求。對於二次排序的實現,網路上已經有很多人分享過了,但是對二次排序的實現原理

    Hadoop 排序實現

    業務場景:通常情況下,在MR操作中到達Reduce中的key值都是按照指定的規則進行排序,在單一key的情況下一切都進行的很自然,直到我們要求資料不再單純的按key進行排序,以如下資料舉例: Key   ->      value: 100  ->      2

    hadoop排序實現join

    package join; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; import org.apac

    hadoop排序的原理和實現

    預設情況下,Map輸出的結果會對Key進行預設的排序,但是有時候需要對Key排序的同時還需要對Value進行排序,這時候就要用到二次排序了。下面我們來說說二次排序 1、二次排序原理   我們把二次排序分為以下幾個階段   Map起始階段     在Map階段,使用jo

    hadoop中MapReduce的sort(部分排序,完全排序,排序)

    1.部分排序 MapReduce預設就是在每個分割槽裡進行排序 2.完全排序 在所有的分割槽中,整體有序                 1)使用一個reduce             2)自定義分割槽函式 不同的key進入的到不同的分割槽之中,在每個分割槽中自動

    大資料技術學習筆記之Hadoop框架基礎5-Hadoop高階特性HA及排序思想

    一、回顧     -》shuffle流程         -》input:讀取mapreduce輸入的          &nbs

    Hadoop MapReduce排序演算法與實現之演算法解析

    MapReduce二次排序的原理     1.在Mapper階段,會通過inputFormat的getSplits來把資料集分割成split public abstract class Input

    Python Hadoop Mapreduce 實現Hadoop Streaming分組和排序

    需求:公司給到一份全國各門店銷售資料,要求:1.按門店市場分類,將同一市場的門店放到一起;2.將各家門店按銷售額從大到小,再按利潤從大到小排列 一 需求一:按市場對門店進行分組 分組(partition) Hadoop streaming框架預設情況下會以’/t

    hadoop MR 排序

    二次排序 例如這樣一組氣溫資料 年份 溫度 2006 -20 2006 21 2007 55 2007 16 2007 33 經過reduce處理年份會自動排序 但是如果要對年份和氣溫分別排序那就需要二次排序了 例如年份升序對氣溫降序 2006 21 200

    《資料演算法-Hadoop/Spark大資料處理技巧》讀書筆記(一)——排序

    寫在前面: 在做直播的時候有同學問Spark不是用Scala語言作為開發語言麼,的確是的,從網上查資料的話也會看到大把大把的用Scala編寫的Spark程式,但是仔細看就會發現這些用Scala寫的文章

    Hadoop Mapreduce分割槽、分組、排序過程詳解[轉]

    徐海蛟 教學用途 1、MapReduce中資料流動    (1)最簡單的過程:  map - reduce    (2)定製了partitioner以將map的結果送往指定reducer的過程: map - partition - reduce    (3)增加了在本地先進性一次reduce(優化)過程: 

    Hadoop鏈式MapReduce、多維排序、倒排索引、自連線演算法、排序、Join效能優化、處理員工資訊Join實戰、URL流量分析、TopN及其排序、求平均值和最大最小值、資料清洗ETL、分析氣

    Hadoop Mapreduce 演算法彙總  第52課:Hadoop鏈式MapReduce程式設計實戰...1 第51課:Hadoop MapReduce多維排序解析與實戰...2 第50課:HadoopMapReduce倒排索引解析與實戰...3 第49課:Hado

    hadoop平臺使用python編寫mapreduce排序小程式

    接上一個博文的環境 使用的是官網的專利使用資料,這裡只截取了一部分 3858241,956203 3858241,1324234 3858241,3398406 3858241,3557384 38

    Hadoop和Spark分別實現排序

    將下列資料中每個分割槽中的第一列順序排列,第二列倒序排列。 Text  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 2021 5051