MapReduce二次排序
一、背景
按照年份升序排序,同時每一年中溫度降序排序
data文件為1949年-1955年每天的溫度數據。
要求:1、計算1949-1955年,每年溫度最高的時間
2、計算1949-1955年,每年溫度最高的十天
1949-10-01 14:21:02 34℃ 1949-10-02 14:01:02 36℃ 1950-01-01 14:21:02 32℃ 1950-10-01 11:01:02 37℃ 1951-10-01 14:21:02 23℃ 1950-10-02 17:11:02 41℃ 1950-10-01 18:20:02 27℃ 1951-07-01 14:01:02 45℃1951-07-02 13:21:02 46℃
二、二次排序原理
默認情況下,Map 輸出的結果會對 Key 進行默認的排序,但是有時候需要對 Key 排序的同時再對 Value 進行排序,這時候就要用到二次排序了。下面讓我們來介紹一下什麽是二次排序。
2.1 Map起始階段
在Map階段,使用job.setInputFormatClass()定義的InputFormat,將輸入的數據集分割成小數據塊split,同時InputFormat提供一個RecordReader的實現。在這裏我們使用的是TextInputFormat,它提供的RecordReader會將文本的行號作為Key,這一行的文本作為Value。這就是自定 Mapper的輸入是<LongWritable,Text> 的原因。然後調用自定義Mapper的map方法,將一個個<LongWritable,Text>鍵值對輸入給Mapper的map方法
2.2 Map最後階段
在Map階段的最後,會先調用job.setPartitionerClass()對這個Mapper的輸出結果進行分區,每個分區映射到一個Reducer。每個分區內又調用job.setSortComparatorClass()設置的Key比較函數類排序。可以看到,這本身就是一個二次排序。如果沒有通過job.setSortComparatorClass()設置 Key比較函數類,則使用Key實現的compareTo()方法
2.3 Reduce階段
在Reduce階段,reduce()方法接受所有映射到這個Reduce的map輸出後,也會調用job.setSortComparatorClass()方法設置的Key比較函數類,對所有數據進行排序。然後開始構造一個Key對應的Value叠代器。這時就要用到分組,使用 job.setGroupingComparatorClass()方法設置分組函數類。只要這個比較器比較的兩個Key相同,它們就屬於同一組,它們的 Value放在一個Value叠代器,而這個叠代器的Key使用屬於同一個組的所有Key的第一個Key。最後就是進入Reducer的 reduce()方法,reduce()方法的輸入是所有的Key和它的Value叠代器,同樣註意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。
三、二次排序流程
在本例中要比較兩次。先按照第年份排序,然後再對年份相同的按照溫度排序。根據這一點,我們可以構造一個復合類KeyPair ,它有兩個字段,先利用分區對第一字段排序,再利用分區內的比較對第二字段排序。二次排序的流程分為以下幾步。
3.1 自定義key
所有自定義的key應該實現接口WritableComparable,因為它是可序列化的並且可比較的。WritableComparable 的內部方法如下所示
// 反序列化,從流中的二進制轉換成IntPair public void readFields(DataInput in) throws IOException // 序列化,將IntPair轉化成使用流傳送的二進制 public void write(DataOutput out) // key的比較 public int compareTo(IntPair o) // 默認的分區類 HashPartitioner,使用此方法 public int hashCode() // 默認實現 public boolean equals(Object right)
3.2 自定義分區
自定義分區函數類FirstPartitioner,是key的第一次比較,完成對所有key的排序。
public static class FirstPartitioner extends Partitioner< IntPair,IntWritable>
在job中使用setPartitionerClasss()方法設置Partitioner
job.setPartitionerClasss(FirstPartitioner.Class);
3.3 自定義排序類
這是Key的第二次比較,對所有的Key進行排序,即同時完成IntPair中的first和second排序。該類是一個比較器,可以通過兩種方式實現。
1) 繼承WritableComparator。
public static class KeyComparator extends WritableComparator
必須有一個構造函數,並且重載以下方法。
public int compare(WritableComparable w1, WritableComparable w2)
2) 實現接口 RawComparator。
上面兩種實現方式,在Job中,可以通過setSortComparatorClass()方法來設置Key的比較類。
job.setSortComparatorClass(KeyComparator.Class);
3.4 自定義分組類
在Reduce階段,構造一個與 Key 相對應的 Value 叠代器的時候,只要first相同就屬於同一個組,放在一個Value叠代器。定義這個比較器,可以有兩種方式。
分組的實質也是排序,此例子中排序是按照年份和溫度,而分組只是按照年份。
1) 繼承WritableComparator。
public static class KeyComparator extends WritableComparator
必須有一個構造函數,並且重載以下方法。
public int compare(WritableComparable w1, WritableComparable w2)
2) 實現接口 RawComparator。
上面兩種實現方式,在Job中,可以通過setSortComparatorClass()方法來設置Key的比較類。
job.setGroupingComparatorClass(GroupingComparator.Class);
另外註意的是,如果reduce的輸入與輸出不是同一種類型,則 Combiner和Reducer 不能共用 Reducer 類,因為 Combiner 的輸出是 reduce 的輸入。除非重新定義一個Combiner。
四、代碼實現
思路:
1、按照年份升序排序,同時每一年中溫度降序排序
2、按照年份分組,每一年對應一個reduce任務
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class KeyPair implements WritableComparable<KeyPair> { private int year; //年份 private int hot; //溫度 public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getHot() { return hot; } public void setHot(int hot) { this.hot = hot; } @Override public void readFields(DataInput in) throws IOException { this.year = in.readInt(); this.hot = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(hot); } //重寫compareTo方法,用作key的比較,先比較年份,年份相同再比較溫度 @Override public int compareTo(KeyPair o) { int y = Integer.compare(year, o.getYear()); if(y == 0){ return Integer.compare(hot, o.getHot()); } return y; } @Override public String toString() { return year+"\t"+hot; } }自定義key
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class FirstPartitioner extends Partitioner<KeyPair, Text> { @Override public int getPartition(KeyPair key, Text value, int nums) { //按照年份分區,乘127是為了分散開,nums是reduce數量 return (key.getYear()*127 & Integer.MAX_VALUE) % nums; } }自定義分區類
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class SortKey extends WritableComparator { public SortKey() { super(KeyPair.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { KeyPair k1 = (KeyPair)a; KeyPair k2 = (KeyPair)b; //先比較年份 int pre = Integer.compare(k1.getYear(), k2.getYear()); if(pre != 0){ return pre; } //年份相同比較溫度 //溫度倒序 return -Integer.compare(k1.getHot(), k2.getHot()); } }自定義排序類
分組的實質也是排序,此例子中排序是按照年份和溫度,而分組只是按照年份。
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class GroupComparator extends WritableComparator { protected GroupComparator() { super(KeyPair.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { KeyPair k1 = (KeyPair)a; KeyPair k2 = (KeyPair)b; //按照年份分組,每一年一個reduce,不考慮溫度 return Integer.compare(k1.getYear(), k2.getYear()); } }自定義分組類
import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<LongWritable, Text, KeyPair, Text> { private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private KeyPair k = new KeyPair(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //keypair作為key,每一行文本作為value String line = new String(value.getBytes(), 0, value.getLength(), "GBK"); String[] tmp = line.split("\t"); System.out.println(tmp[0]+"\t"+tmp[1]); if(tmp.length>=2){ try { Date date = sdf.parse(tmp[0]); Calendar cal = Calendar.getInstance(); cal.setTime(date); int year = cal.get(1); k.setYear(year); } catch (ParseException e) { e.printStackTrace(); } int hot = Integer.parseInt(tmp[1].substring(0, tmp[1].indexOf("℃"))); k.setHot(hot); context.write(k, value); } } }自定義Mapper類
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer<KeyPair, Text, KeyPair,Text> { @Override protected void reduce(KeyPair key, Iterable<Text> value,Context context) throws IOException, InterruptedException { for(Text t : value){ context.write(key, t); } } }自定義Reducer類
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class YearHot { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "year hot sort"); job.setJarByClass(YearHot.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(3); job.setPartitionerClass(FirstPartitioner.class); job.setSortComparatorClass(SortKey.class); job.setGroupingComparatorClass(GroupComparator.class); job.setOutputKeyClass(KeyPair.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(GBKOutputFormat.class); FileInputFormat.addInputPath(job, new Path("hdfs://192.168.228.134:/usr/input/data.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.228.134:/usr/output")); System.exit(job.waitForCompletion(true)?0:1); } }驅動類
MapReduce二次排序