1. 程式人生 > >Hadoop Mapreduce分割槽、分組、連線以及輔助排序(也叫二次排序)過程詳解

Hadoop Mapreduce分割槽、分組、連線以及輔助排序(也叫二次排序)過程詳解

package com.hadoop;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WritableSample extends Configured implements Tool {
	@Override
	public int run(String[] args) throws Exception {
		Configuration conf=getConf();
		Job job=new Job(conf);
		job.setJarByClass(WritableSample.class);
		FileSystem fs=FileSystem.get(conf);
		fs.delete(new Path("out"),true);
		FileInputFormat.addInputPath(job, new Path("sample.txt"));
		FileOutputFormat.setOutputPath(job, new Path("out"));
        job.setMapperClass(MyWritableMap.class);
        job.setOutputKeyClass(MyPariWritable.class);
        job.setOutputValueClass(NullWritable.class);
        job.setReducerClass(MyWriableReduce.class);
        job.setSortComparatorClass(PairKeyComparator.class);
        job.setGroupingComparatorClass(GroupComparatored.class);
        job.waitForCompletion(true);
		return 0;
	}
	public static void main(String[] args) throws Exception {
		Tool tool=new WritableSample();
		ToolRunner.run(tool, args);
	}
}
class MyPariWritable implements WritableComparable<MyPariWritable>{
	Text first;
	IntWritable second;
	public void set(Text first,IntWritable second){
		this.first=first;
		this.second=second;
	}
	public Text getFirst(){
		return this.first;
	}
	public IntWritable getSecond(){
		return this.second;
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		first=new Text(in.readUTF());
		second=new IntWritable(in.readInt());
	}
	public void write(DataOutput out){
		try {
			out.writeUTF(first.toString());
			out.writeInt(second.get());
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	@Override
	public int compareTo(MyPariWritable o) {
		// TODO Auto-generated method stub
		if(this.first!=o.getFirst()){
			return this.first.toString().compareTo(o.getFirst().toString());
		}else if(this.second!=o.getSecond()){
			return this.second.get()-o.getSecond().get();
		}else return 0;
	}
	@Override
	public String toString() {
		// TODO Auto-generated method stub
		return first.toString()+" "+second.get();
	}
	@Override
	public boolean equals(Object obj) {
		MyPariWritable temp=(MyPariWritable) obj;
		return first.equals(temp.first)&&second.equals(temp.second);
	}
	@Override
	public int hashCode() {
		return first.hashCode()*163+second.hashCode();
	}
}
class MyWritableMap extends Mapper<LongWritable, Text, MyPariWritable, NullWritable>{
	MyPariWritable pair=new MyPariWritable();
	protected void map(LongWritable key, Text value, Context context) throws IOException ,InterruptedException {
		String strs[]=value.toString().split(" ");
		Text keyy=new Text(strs[0]);
		IntWritable valuee=new IntWritable(Integer.parseInt(strs[1]));
		pair.set(keyy, valuee);
		context.write(pair, NullWritable.get());
	};
}
class PairKeyComparator extends WritableComparator{
	public PairKeyComparator() {
		super(MyPariWritable.class,true);
			}
	@SuppressWarnings("rawtypes")
	@Override
	public int compare(WritableComparable a,  WritableComparable b) {
		MyPariWritable p1=(MyPariWritable) a;
		MyPariWritable p2=(MyPariWritable) b;
		if(!p1.getFirst().toString().equals(p2.getFirst().toString())){
			return p1.first.toString().compareTo(p2.first.toString());
		}else{
			return p1.getSecond().get()-p2.getSecond().get();
		}
	}
}
class MyWriableReduce extends Reducer<MyPariWritable, NullWritable, MyPariWritable, NullWritable>{
	protected void reduce(MyPariWritable key, java.lang.Iterable<NullWritable> values, Context context) throws IOException ,InterruptedException {
		context.write(key, NullWritable.get());
	};
}
class GroupComparatored extends WritableComparator{
	public GroupComparatored(){
		super(MyPariWritable.class,true);
	}
	@SuppressWarnings("rawtypes")
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		MyPariWritable p1=(MyPariWritable) a;
		MyPariWritable p2=(MyPariWritable) b;
		return p1.first.toString().compareTo(p2.first.toString());//這裡只比較第一個元素,只要自然鍵型別為MyPariWritable的Key就認為是相同,目的是找出自然鍵對應的最小自然值。
	}
}

mr自帶的例子中的原始碼SecondarySort,我重新寫了一下,基本沒變。

這個例子中定義的map和reduce如下,關鍵是它對輸入輸出型別的定義:(java泛型程式設計) 

public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>  public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable, IntWritable> 

1 首先說一下工作原理:

在map階段,使用job.setInputFormatClass定義的InputFormat將輸入的資料集分割成小資料塊splites,同時InputFormat提供一個RecordReder的實現。本例子中使用的是TextInputFormat,他提供的RecordReder會將文字的一行的行號作為key,這一行的文字作為value。這就是自定義Map的輸入是<LongWritable, Text>的原因。然後呼叫自定義Map的map方法,將一個個<LongWritable, Text>對輸入給Map的map方法。注意輸出應該符合自定義Map中定義的輸出<IntPair, IntWritable>。最終是生成一個List<IntPair, IntWritable>。在map階段的最後,會先呼叫job.setPartitionerClass對這個List進行分割槽,每個分割槽對映到一個reducer。每個分割槽內又呼叫job.setSortComparatorClass設定的key比較函式類排序。可以看到,這本身就是一個二次排序。如果沒有通過job.setSortComparatorClass設定key比較函式類,則使用key的實現的compareTo方法。在第一個例子中,使用了IntPair實現的compareTo方法,而在下一個例子中,專門定義了key比較函式類。 在reduce階段,reducer接收到所有對映到這個reducer的map輸出後,也是會呼叫job.setSortComparatorClass設定的key比較函式類對所有資料對排序。然後開始構造一個key對應的value迭代器。這時就要用到分組,使用jobjob.setGroupingComparatorClass設定的分組函式類。只要這個比較器比較的兩個key相同,他們就屬於同一個組,它們的value放在一個value迭代器,而這個迭代器的key使用屬於同一個組的所有key的第一個key。最後就是進入Reducer的reduce方法,reduce方法的輸入是所有的(key和它的value迭代器)。同樣注意輸入與輸出的型別必須與自定義的Reducer中宣告的一致。 

2  二次排序就是首先按照第一欄位排序,然後再對第一欄位相同的行按照第二欄位排序,注意不能破壞第一次排序的結果 。例如

輸入檔案 20 21 50 51 50 52 50 53 50 54 60 51 60 53 60 52 60 56 60 57 70 58 60 61 70 54 70 55 70 56 70 57 70 58 1 2 3 4 5 6 7 82 203 21 50 512 50 522 50 53 530 54 40 511 20 53 20 522 60 56 60 57 740 58 63 61 730 54 71 55 71 56 73 57 74 58 12 211 31 42 50 62 7 8 輸出:(注意需要分割線) ------------------------------------------------ 1       2 ------------------------------------------------ 3       4 ------------------------------------------------ 5       6 ------------------------------------------------ 7       8 7       82 ------------------------------------------------ 12      211 ------------------------------------------------ 20      21 20      53 20      522 ------------------------------------------------ 31      42 ------------------------------------------------ 40      511 ------------------------------------------------ 50      51 50      52 50      53 50      53 50      54 50      62 50      512 50      522 ------------------------------------------------ 60      51 60      52 60      53 60      56 60      56 60      57 60      57 60      61 ------------------------------------------------ 63      61 ------------------------------------------------ 70      54 70      55 70      56 70      57 70      58 70      58 ------------------------------------------------ 71      55 71      56 ------------------------------------------------ 73      57 ------------------------------------------------ 74      58 ------------------------------------------------ 203     21 ------------------------------------------------ 530     54 ------------------------------------------------ 730     54 ------------------------------------------------ 740     58 

3  具體步驟: (1)自定義key

在mr中,所有的key是需要被比較和排序的,並且是二次,先根據partitione,再根據大小。而本例中也是要比較兩次。先按照第一欄位排序,然後再對第一欄位相同的按照第二欄位排序。根據這一點,我們可以構造一個複合類IntPair,他有兩個欄位,先利用分割槽對第一欄位排序,再利用分割槽內的比較對第二欄位排序。 所有自定義的key應該實現介面WritableComparable,因為是可序列的並且可比較的。並重載方法:

  1. //反序列化,從流中的二進位制轉換成IntPair
  2. publicvoid readFields(DataInput in) throws IOException          
  3. //序列化,將IntPair轉化成使用流傳送的二進位制
  4. publicvoid write(DataOutput out)  
  5. //key的比較
  6. publicint compareTo(IntPair o)          
  7. //另外新定義的類應該重寫的兩個方法
  8. //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
  9. publicint hashCode()   
  10. public boolean equals(Object right)  

(2)由於key是自定義的,所以還需要自定義一下類: (2.1)分割槽函式類。這是key的第一次比較。

  1. publicstaticclass FirstPartitioner extends Partitioner<IntPair,IntWritable>  

在job中使用setPartitionerClasss設定Partitioner。 (2.2)key比較函式類。這是key的第二次比較。這是一個比較器,需要繼承WritableComparator。

  1. publicstaticclass KeyComparator extends WritableComparator  

必須有一個建構函式,並且過載 public int compare(WritableComparable w1, WritableComparable w2)

另一種方法是 實現介面RawComparator。 在job中使用setSortComparatorClass設定key比較函式類。 (2.3)分組函式類。在reduce階段,構造一個key對應的value迭代器的時候,只要first相同就屬於同一個組,放在一個value迭代器。這是一個比較器,需要繼承WritableComparator。

  1. publicstaticclass GroupingComparator extends WritableComparator  

分組函式類也必須有一個建構函式,並且過載 public int compare(WritableComparable w1, WritableComparable w2) 分組函式類的另一種方法是實現介面RawComparator。 在job中使用setGroupingComparatorClass設定分組函式類。 另外注意的是,如果reduce的輸入與輸出不是同一種類型,則不要定義Combiner也使用reduce,因為Combiner的輸出是reduce的輸入。除非重新定義一個Combiner。 

3 程式碼。

這個例子中沒有使用key比較函式類,而是使用key的實現的compareTo方法。 

  1. package secondarySort;  
  2. import java.io.DataInput;  
  3. import java.io.DataOutput;  
  4. import java.io.IOException;  
  5. import java.util.StringTokenizer;  
  6. import org.apache.hadoop.conf.Configuration;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.IntWritable;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.io.WritableComparable;  
  12. import org.apache.hadoop.io.WritableComparator;  
  13. import org.apache.hadoop.mapreduce.Job;  
  14. import org.apache.hadoop.mapreduce.Mapper;  
  15. import org.apache.hadoop.mapreduce.Partitioner;  
  16. import org.apache.hadoop.mapreduce.Reducer;  
  17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  18. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  20. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  21. publicclass SecondarySort {  
  22.     //自己定義的key類應該實現WritableComparable介面
  23.     publicstaticclass IntPair implements WritableComparable<IntPair> {  
  24.         int first;  
  25.         int second;  
  26.         /** 
  27.          * Set the left and right values. 
  28.          */
  29.         publicvoid set(int left, int right) {  
  30.             first = left;  
  31.             second = right;  
  32.         }  
  33.         publicint getFirst() {  
  34.             return first;  
  35.         }  
  36.         publicint getSecond() {  
  37.             return second;  
  38.         }  
  39.         @Override
  40.         //反序列化,從流中的二進位制轉換成IntPair
  41.         publicvoid readFields(DataInput in) throws IOException {  
  42.             // TODO Auto-generated method stub
  43.             first = in.readInt();  
  44.             second = in.readInt();  
  45.         }  
  46.         @Override
  47.         //序列化,將IntPair轉化成使用流傳送的二進位制
  48.         publicvoid write(DataOutput out) throws IOException {  
  49.             // TODO Auto-generated method stub
  50.             out.writeInt(first);  
  51.             out.writeInt(second);  
  52.         }  
  53.         @Override
  54.         //key的比較
  55.         publicint compareTo(IntPair o) {  
  56.             // TODO Auto-generated method stub
  57.             if (first != o.first) {  
  58.                 return first < o.first ? -1 : 1;  
  59.             } elseif (second != o.second) {  
  60.                 return second < o.second ? -1 : 1;  
  61.             } else {  
  62. 相關推薦

    Hadoop Mapreduce分割槽分組連線以及輔助排序排序過程

    package com.hadoop; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import or

    Hadoop Mapreduce分割槽分組排序過程[轉]

    徐海蛟 教學用途 1、MapReduce中資料流動    (1)最簡單的過程:  map - reduce    (2)定製了partitioner以將map的結果送往指定reducer的過程: map - partition - reduce    (3)增加了在本地先進性一次reduce(優化)過程: 

    Hadoop Mapreduce分割槽分組排序過程

    這篇文章分析的特別好,耐心看下去。。1、MapReduce中資料流動   (1)最簡單的過程:  map - reduce   (2)定製了partitioner以將map的結果送往指定reducer的過程: map - partition - reduce   (3)增加了

    Hadoop鏈式MapReduce多維排序倒排索引連線演算法排序Join效能優化處理員工資訊Join實戰URL流量分析TopN及其排序求平均值和最大最小值資料清洗ETL分析氣

    Hadoop Mapreduce 演算法彙總  第52課:Hadoop鏈式MapReduce程式設計實戰...1 第51課:Hadoop MapReduce多維排序解析與實戰...2 第50課:HadoopMapReduce倒排索引解析與實戰...3 第49課:Hado

    mapreduce自定義分組自定義分割槽排序

    mapreduce中二次排序的思想中,我們常常需要對資料的分割槽分組進行自定義, 以下就介紹一下自定義分割槽分組的簡單實現 1、自定義分割槽: public class demoPartitioner<K, V> extends Partitioner<

    MySQL UNION排序分組連線NULL值處理和正則表示式

    UNION SQL UNION 下面的SQL語句從product和orderdetail表中選取所有不同的pCode(只有不同值) SELECT pCode FROM product UNION SELECT pCode FROM orderdetail ORDER BY p

    MySQL UNION排序分組連線NULL值處理和正則表示式

    UNION SQL UNION 下面的SQL語句從product和orderdetail表中選取所有不同的pCode(只有不同值) SELECT pCode FROM product UNION SEL

    10Oracle:左連線連線全外連線以及+號用法

    回到目錄 1、準備工作 Oracle  外連線(OUTER JOIN)包括以下: 左外連線(左邊的表不加限制)右外連線(右邊的表不加限制)全外連線(左右兩表都不加限制) 對應SQL:LEFT/RIGHT/FULL OUTER JOIN。 通常省略OUTER關鍵字,

    oracle左外連線右外連線完全外連線以及(+)號用法

    準備工作 oracle連線分為: 左外連線:左表不加限制,保留左表的資料,匹配右表,右表沒有匹配到的行中的列顯示為null。右外連線:右表不加限制,保留右表的資料。匹配左表,左表沒有匹配到的行中列顯

    Oracle左連線連線全外連線以及+號用法

    回到目錄 1、準備工作 Oracle  外連線(OUTER JOIN)包括以下: 左外連線(左邊的表不加限制)右外連線(右邊的表不加限制)全外連線(左右兩表都不加限制) 對應SQL:LEFT/RIGHT/FULL OUTER JOIN。 通常省略OUTER關鍵字, 寫成:LEFT/RIGHT/FULL

    Mapreduce中的 自定義型別分組排序

    0、需求說明 資料格式 期望輸出的結果 做簡單分析: a. 由於只有兩列,所以可以將map的InputFormat設定為KeyValueTextInputFormat b. 事實上這裡實現了兩個排序,即對輸出的k

    java8 快速實現List轉map 分組過濾等操作

    利用java8新特性,可以用簡潔高效的程式碼來實現一些資料處理。 定義1個Apple物件: public class Apple { private Integer id; private String name; private BigDecimal mone

    Java 8 辣麼大lambda表示式不慌之—–示例-Collectors中的統計分組排序

    Java 8 辣麼大(lambda)表示式不慌之—–(五)示例-Collectors中的統計、分組、排序等 summarizingInt 按int型別統計 maxBy取最大/minBy取最小 averagingInt /averagingLong/avera

    SQL的聚合函式分組子查詢及組合查詢用法

    聚合函式: SQL中提供的聚合函式可以用來統計、求和、求最值等等。 分類: –COUNT:統計行數量 –SUM:獲取單個列的合計值 –AVG:計算某個列的平均值 –MAX:計算列的最大值 –MIN:計算列的最小值 首先,建立資料表如下:

    Python Hadoop Mapreduce 實現Hadoop Streaming分組排序

    需求:公司給到一份全國各門店銷售資料,要求:1.按門店市場分類,將同一市場的門店放到一起;2.將各家門店按銷售額從大到小,再按利潤從大到小排列 一 需求一:按市場對門店進行分組 分組(partition) Hadoop streaming框架預設情況下會以’/t

    hadoop排序 (Map/Reduce中分割槽分組的問題)

    1.二次排序概念:首先按照第一欄位排序,然後再對第一欄位相同的行按照第二欄位排序,注意不能破壞第一次排序的結果 。如: 輸入檔案:20 21 50 51 50 52 50 53 50 54 60 51 60 53 60 52 60 56 60 57 70 58 60 61 70 54 70 55 70 56 

    報文報文段分組資料報資料流的概念區別

    1.報文(message) 我們將位於應用層的資訊分組稱為報文。報文是網路中交換與傳輸的資料單元,也是網路傳輸的單元。報文包含了將要傳送的完整的資料資訊,其長短不需一致。報文在傳輸過程中會不斷地封裝成分組、包、幀來傳輸,封裝的方式就是新增一些控制資訊組成的首部,那些就是報文頭。 2.報文段(se

    儲存過程用到的表分組排序聯結

    查詢儲存過程用到的表,並進行分組、排序、聯結: 1 SELECT 2 REFERENCED_OWNER, 3 REFERENCED_NAME, 4 LISTAGG(XH||'>'||NAME,',') WITHIN GROUP(ORDER BY

    報文報文段分組資料報的概念區別

    分組、包,packet,資訊在網際網路當中傳輸的單元,網路層實現分組交付。用抓包工具抓到的一條條記錄就是包。 幀,frame,資料鏈路層的協議資料單元。我們將鏈路層分組稱為幀。 資料報,Datagram,通過網路傳輸的資料的基本單元,包含一個報頭(header)和資料本身,

    RecycleView分割線分組粘性佈局之ItemDecoration

        我們在使用 ListView 的時候要設定分割線只要在xml檔案中,使用android:divider就可以了,但是在RecyclerView 卻沒有直接的設定方法。ListView中要實現分組效果、粘性佈局,絕大多數都會使用PinnedSectionListView