1. 程式人生 > >Hadoop之MapReduce自定義二次排序流程例項詳解

Hadoop之MapReduce自定義二次排序流程例項詳解

一、概述
MapReduce框架對處理結果的輸出會根據key值進行預設的排序,這個預設排序可以滿足一部分需求,但是也是十分有限的。在我們實際的需求當中,往往有要對reduce輸出結果進行二次排序的需求。對於二次排序的實現,網路上已經有很多人分享過了,但是對二次排序的實現的原理以及整個MapReduce框架的處理流程的分析還是有非常大的出入,而且部分分析是沒有經過驗證的。本文將通過一個實際的MapReduce二次排序例子,講述二次排序的實現和其MapReduce的整個處理流程,並且通過結果和map、reduce端的日誌來驗證所描述的處理流程的正確性。

二、需求描述
1、輸入資料:

sort1    1


sort2    3
sort2    77
sort2    54
sort1    2
sort6    22
sort6    221
sort6    20
2、目標輸出
sort1 1,2
sort2 3,54,77
sort6 20,22,221

三、解決思路

 1、首先,在思考解決問題思路時,我們先應該深刻的理解MapReduce處理資料的整個流程,這是最基礎的,不然的話是不可能找到解決問題的思路的。我描述一下MapReduce處理資料的大概簡單流程:首先,MapReduce框架通過getSplit方法實現對原始檔案的切片之後,每一個切片對應著一個map task,inputSplit輸入到Map函式進行處理,中間結果經過環形緩衝區的排序,然後分割槽、自定義二次排序(如果有的話)和合並,再通過shuffle操作將資料傳輸到reduce task端,reduce端也存在著緩衝區,資料也會在緩衝區和磁碟中進行合併排序等操作,然後對資料按照Key值進行分組,然後沒處理完一個分組之後就會去呼叫一次reduce函式,最終輸出結果。大概流程我畫了一下,如下圖:



2、具體解決思路

(1)Map端處理:

  根據上面的需求,我們有一個非常明確的目標就是要對第一列相同的記錄合併,並且對合並後的數字進行排序。我們都知道MapReduce框架不管是預設排序或者是自定義排序都只是對Key值進行排序,現在的情況是這些資料不是key值,怎麼辦?其實我們可以將原始資料的Key值和其對應的資料組合成一個新的Key值,然後新的Key值對應的還是之前的數字。那麼我們就可以將原始資料的map輸出變成類似下面的資料結構:

{[sort1,1],1}
{[sort2,3],3}
{[sort2,77],77}
{[sort2,54],54}
{[sort1,2],2}
{[sort6,22],22}
{[sort6,221],221}
{[sort6,20],20}
那麼我們只需要對[]裡面的新key值進行排序就ok了。然後我們需要自定義一個分割槽處理器,因為我的目標不是想將新key相同的傳到同一個reduce中,而是想將新key中的第一個欄位相同的才放到同一個reduce中進行分組合並,所以我們需要根據新key值中的第一個欄位來自定義一個分割槽處理器。通過分割槽操作後,得到的資料流如下:

Partition1:{[sort2,3],3}、{[sort2,54],54}、{[sort2,77],77}
Partition2:{[sort1,1],1}、{[sort1,2],2}
Partition3:{[sort6,20],20}、{[sort6,22],22}、{[sort6,221],221}

第一次排序:

這裡的過程就是第一次排序的過程,第一次排序先按FirstKey排序,然後按照secondKey排序。

job.setSortComparatorClass(DefinedComparator.class);

分割槽:

job.setPartitionerClass(DefinedPartition.class);

(2)Reduce端處理:

分割槽操作完成之後,我呼叫自己的自定義排序器對新的Key值進行排序。
{[sort1,1],1}
{[sort1,2],2}
{[sort2,3],3}
{[sort2,54],54}
{[sort2,77],77}
{[sort6,20],20}
{[sort6,22],22}
{[sort6,221],221}

第二次排序:

是對組的排序,這裡的這個GroupingComparator就是對key進行了第二次的處理,使得每個key後邊可以掛一個val的列表。

job.setGroupingComparatorClass(SecondComparator.class);


  經過Shuffle處理之後,資料傳輸到Reducer端了。在Reducer端對按照組合鍵的第一個欄位來進行分組,並且每處理完一次分組之後就會呼叫一次reduce函式來對這個分組進行處理輸出。最終的各個分組的資料結構變成類似下面的資料結構:
{sort1,[1,2]}
{sort2,[3,54,77]}

{sort6,[20,22,221]}

四、具體實現

1、自定義組合鍵
package com.mr; 
import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import org.apache.Hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
/** 
 * 自定義組合鍵 
 * @author zenghzhaozheng 
 */
public class CombinationKey implements WritableComparable<CombinationKey>{ 
    private static final Logger logger = LoggerFactory.getLogger(CombinationKey.class); 
    private Text firstKey; 
    private IntWritable secondKey; 
    public CombinationKey() { 
        this.firstKey = new Text(); 
        this.secondKey = new IntWritable(); 
    } 
    public Text getFirstKey() { 
        return this.firstKey; 
    } 
    public void setFirstKey(Text firstKey) { 
        this.firstKey = firstKey; 
    } 
    public IntWritable getSecondKey() { 
        return this.secondKey; 
    } 
    public void setSecondKey(IntWritable secondKey) { 
        this.secondKey = secondKey; 
    } 
    @Override
    public void readFields(DataInput dateInput) throws IOException { 
        // TODO Auto-generated method stub 
        this.firstKey.readFields(dateInput); 
        this.secondKey.readFields(dateInput); 
    } 
    @Override
    public void write(DataOutput outPut) throws IOException { 
        this.firstKey.write(outPut); 
        this.secondKey.write(outPut); 
    } 
    /** 
    * 自定義比較策略 
    * 注意:該比較策略用於mapreduce的第一次預設排序,也就是發生在map階段的sort小階段, 
    * 發生地點為環形緩衝區(可以通過io.sort.mb進行大小調整) 
    */
    @Override
    public int compareTo(CombinationKey combinationKey) { 
        logger.info("-------CombinationKey flag-------"); 
        return this.firstKey.compareTo(combinationKey.getFirstKey()); 
    } 
}
說明:在自定義組合鍵的時候,我們需要特別注意,一定要實現WritableComparable介面,並且實現compareTo方法的比較策略。這個用於mapreduce的第一次預設排序,也就是發生在map階段的sort小階段,發生地點為環形緩衝區(可以通過io.sort.mb進行大小調整),但是其對我們最終的二次排序結果是沒有影響的。我們二次排序的最終結果是由我們的自定義比較器決定的

由於後面的進行了CombinationKey 物件的相等比較操作,最好重寫hashCode()和equal()方法。

參考程式碼如下:主要是讓類中個每個成員變數都參與計算和比較

  public static class IntPair 
                      implements WritableComparable<IntPair> {
    private int first = 0;
    private int second = 0;
    
    /**
     * Set the left and right values.
     */
    public void set(int left, int right) {
      first = left;
      second = right;
    }
    public int getFirst() {
      return first;
    }
    public int getSecond() {
      return second;
    }
    /**
     * Read the two integers. 
     * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
     */
    @Override
    public void readFields(DataInput in) throws IOException {
      first = in.readInt() + Integer.MIN_VALUE;
      second = in.readInt() + Integer.MIN_VALUE;
    }
    @Override
    public void write(DataOutput out) throws IOException {
      out.writeInt(first - Integer.MIN_VALUE);
      out.writeInt(second - Integer.MIN_VALUE);
    }
    @Override
    public int hashCode() {//重寫hashCode()方法
      return first * 157 + second;
    }
    @Override
    public boolean equals(Object right) {//重寫equals()方法
      if (right instanceof IntPair) {
        IntPair r = (IntPair) right;
        return r.first == first && r.second == second;
      } else {
        return false;
      }
    }
    
    @Override
    public int compareTo(IntPair o) {
      if (first != o.first) {
        return first < o.first ? -1 : 1;
      } else if (second != o.second) {
        return second < o.second ? -1 : 1;
      } else {
        return 0;
      }
    }
  }


2、自定義分割槽器

package com.mr; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.mapreduce.Partitioner; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
/** 
 * 自定義分割槽 
 * @author zengzhaozheng 
 */
public class DefinedPartition extends Partitioner<CombinationKey,IntWritable>{ 
    private static final Logger logger = LoggerFactory.getLogger(DefinedPartition.class); 
    /** 
    *  資料輸入來源:map輸出 
    * @author zengzhaozheng 
    * @param key map輸出鍵值 
    * @param value map輸出value值 
    * @param numPartitions 分割槽總數,即reduce task個數 
    */
    @Override
    public int getPartition(CombinationKey key, IntWritable value,int numPartitions) { 
        logger.info("--------enter DefinedPartition flag--------"); 
        /** 
        * 注意:這裡採用預設的hash分割槽實現方法 
        * 根據組合鍵的第一個值作為分割槽 
        * 這裡需要說明一下,如果不自定義分割槽的話,mapreduce框架會根據預設的hash分割槽方法, 
        * 將整個組合將相等的分到一個分割槽中,這樣的話顯然不是我們要的效果 
        */
        logger.info("--------out DefinedPartition flag--------"); 
        return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions; //字串的分割槽寫法
    } 
}

數字的分割槽寫法,可參考如下程式碼:

  /**
   * Partition based on the first part of the pair.
   * 
   * 根據第一部分,分割槽
   * 
   */
  public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
    @Override
    public int getPartition(IntPair key, IntWritable value, 
                            int numPartitions) {
      return Math.abs(key.getFirst() * 127) % numPartitions;//數值型key分割槽寫法
    }
  }

說明:具體說明看程式碼註釋。

3、自定義比較器

package com.mr; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.io.WritableComparator; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
/** 
 * 自定義二次排序策略 
 * @author zengzhaoheng 
 */
public class DefinedComparator extends WritableComparator { 
    private static final Logger logger = LoggerFactory.getLogger(DefinedComparator.class); 
    public DefinedComparator() { 
        super(CombinationKey.class,true); 
    } 
    @Override
    public int compare(WritableComparable combinationKeyOne, 
            WritableComparable CombinationKeyOther) { 
        logger.info("---------enter DefinedComparator flag---------"); 
                                                      
        CombinationKey c1 = (CombinationKey) combinationKeyOne; 
        CombinationKey c2 = (CombinationKey) CombinationKeyOther; 
                                                      
        /** 
        * 確保進行排序的資料在同一個區內,如果不在同一個區則按照組合鍵中第一個鍵排序 
        * 另外,這個判斷是可以調整最終輸出的組合鍵第一個值的排序 
        * 下面這種比較對第一個欄位的排序是升序的,如果想降序這將c1和c2顛倒過來(假設1) 
        */
        if(!c1.getFirstKey().equals(c2.getFirstKey())){ 
            logger.info("---------out DefinedComparator flag---------"); 
            return c1.getFirstKey().compareTo(c2.getFirstKey()); 
            } 
        else{//按照組合鍵的第二個鍵的升序排序,將c1和c2倒過來則是按照數字的降序排序(假設2) 
            logger.info("---------out DefinedComparator flag---------"); 
            return c1.getSecondKey().get()-c2.getSecondKey().get();//0,負數,正數 
        } 
        /** 
        * (1)按照上面的這種實現最終的二次排序結果為: 
        * sort1    1,2 
        * sort2    3,54,77 
        * sort6    20,22,221 
        * (2)如果實現假設1,則最終的二次排序結果為: 
        * sort6    20,22,221 
        * sort2    3,54,77 
        * sort1    1,2 
        * (3)如果實現假設2,則最終的二次排序結果為: 
        * sort1    2,1 
        * sort2    77,54,3 
        * sort6    221,22,20 
        */
        } 
}
說明:自定義比較器決定了我們二次排序的結果。自定義比較器需要繼承WritableComparator類,並且重寫compare方法實現自己的比較策略。具體的排序問題請看註釋。

4.具體的mapreduce程式碼

/**
 * 二次排序demo
 * @author hadoop
 *
 */
public class SecondSort extends Configured implements Tool{

	 public static class Map extends Mapper<LongWritable,Text,CombinationKey,IntWritable>{
		  private Text firstKey  = new Text(); 
		  private IntWritable secondKey = new IntWritable(0); 
		    
		  public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
			  String line = value.toString();
			  StringTokenizer tokenizer = new StringTokenizer(line);
			  while(tokenizer.hasMoreTokens()){
				  
				  firstKey.set(tokenizer.nextToken());
				  
				  if(tokenizer.hasMoreTokens()){//如果還存在下一個記錄
					   String num=tokenizer.nextToken();//獲得
					   int scoreInt=Integer.parseInt(num);//將字串轉為數字
					   secondKey.set(scoreInt);
					   CombinationKey ckey = new CombinationKey();
						  ckey.setFirstKey(firstKey);
						  ckey.setSecondKey(secondKey);
						  
					   context.write(ckey, new IntWritable(scoreInt));
				   }
			  }
		  }
	 }
	 
	 
	 
	 public static class Reduce extends Reducer<CombinationKey ,IntWritable,Text,IntWritable>{
		 private final Text first = new Text();
		 
		 public void reduce(CombinationKey key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
			   for (IntWritable val : values) {
				   context.write( key.getFirstKey(), val);
			   }
			  
		   }
	 }
	
	
	
	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	    if (otherArgs.length != 2) {
	      System.err.println("Usage: secondarysort <in> <out>");
	      System.exit(2);
	    }
		Job job = Job.getInstance(conf, "SecondSort");
		job.setJarByClass(SecondSort.class);
		job.setJobName("SecondSort");
		
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		
		//指定map輸出時key值的排序,如果不指定,預設使用key物件CombinationKey的比較方法compareTo()
		job.setSortComparatorClass(DefinedComparator.class);
		job.setPartitionerClass(DefinedPartition.class);//分割槽
	        //指定分組排序使用的比較器,預設使用key物件自身的compareTo()方法
		job.setGroupingComparatorClass(SecondComparator.class);
		
		//map輸出
		job.setMapOutputKeyClass(CombinationKey.class);  
		job.setMapOutputValueClass(IntWritable.class);  

		//reduce輸出
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.setNumReduceTasks(1);//設定reduce  Task的數量,預設是1
		
		boolean success = job.waitForCompletion(true);
		return success ?0 : 1;
	}
	public static void main(String[] args) throws Exception{
		 String[] ars=new String[]{"hdfs://192.168.137.100:9000/user/root/data/secondSort","hdfs://192.168.137.100:9000/user/root/output/secondSort"};
		int ret = ToolRunner.run(new SecondSort(), ars);
		System.exit(ret);
	}
   
}