1. 程式人生 > >MapReduce二次排序

MapReduce二次排序

必須 .lib rec settime string == 技術分享 字段排序 protect

一、背景

  按照年份升序排序,同時每一年中溫度降序排序

  data文件為1949年-1955年每天的溫度數據。

  要求:1、計算1949-1955年,每年溫度最高的時間

     2、計算1949-1955年,每年溫度最高的十天

1949-10-01 14:21:02    341949-10-02 14:01:02    361950-01-01 14:21:02    321950-10-01 11:01:02    371951-10-01 14:21:02    231950-10-02 17:11:02    411950-10-01 18:20:02    271951-07-01 14:01:02    45
1951-07-02 13:21:02 46℃

二、二次排序原理

  默認情況下,Map 輸出的結果會對 Key 進行默認的排序,但是有時候需要對 Key 排序的同時再對 Value 進行排序,這時候就要用到二次排序了。下面讓我們來介紹一下什麽是二次排序。

2.1 Map起始階段

  在Map階段,使用job.setInputFormatClass()定義的InputFormat,將輸入的數據集分割成小數據塊split,同時InputFormat提供一個RecordReader的實現。在這裏我們使用的是TextInputFormat,它提供的RecordReader會將文本的行號作為Key,這一行的文本作為Value。這就是自定 Mapper的輸入是<LongWritable,Text> 的原因。然後調用自定義Mapper的map方法,將一個個<LongWritable,Text>鍵值對輸入給Mapper的map方法

2.2 Map最後階段

  在Map階段的最後,會先調用job.setPartitionerClass()對這個Mapper的輸出結果進行分區,每個分區映射到一個Reducer。每個分區內又調用job.setSortComparatorClass()設置的Key比較函數類排序。可以看到,這本身就是一個二次排序。如果沒有通過job.setSortComparatorClass()設置 Key比較函數類,則使用Key實現的compareTo()方法

2.3 Reduce階段

在Reduce階段,reduce()方法接受所有映射到這個Reduce的map輸出後,也會調用job.setSortComparatorClass()方法設置的Key比較函數類,對所有數據進行排序。然後開始構造一個Key對應的Value叠代器。這時就要用到分組,使用 job.setGroupingComparatorClass()方法設置分組函數類。只要這個比較器比較的兩個Key相同,它們就屬於同一組,它們的 Value放在一個Value叠代器,而這個叠代器的Key使用屬於同一個組的所有Key的第一個Key。最後就是進入Reducer的 reduce()方法,reduce()方法的輸入是所有的Key和它的Value叠代器,同樣註意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。

三、二次排序流程

  在本例中要比較兩次。先按照第年份排序,然後再對年份相同的按照溫度排序。根據這一點,我們可以構造一個復合類KeyPair ,它有兩個字段,先利用分區對第一字段排序,再利用分區內的比較對第二字段排序。二次排序的流程分為以下幾步。

3.1 自定義key

  所有自定義的key應該實現接口WritableComparable,因為它是可序列化的並且可比較的。WritableComparable 的內部方法如下所示

// 反序列化,從流中的二進制轉換成IntPair
public void readFields(DataInput in) throws IOException

// 序列化,將IntPair轉化成使用流傳送的二進制
public void write(DataOutput out)

//  key的比較
public int compareTo(IntPair o)

//  默認的分區類 HashPartitioner,使用此方法
public int hashCode()

//  默認實現
public boolean equals(Object right)

3.2 自定義分區

  自定義分區函數類FirstPartitioner,是key的第一次比較,完成對所有key的排序。

public static class FirstPartitioner extends Partitioner< IntPair,IntWritable>

  在job中使用setPartitionerClasss()方法設置Partitioner

job.setPartitionerClasss(FirstPartitioner.Class);

3.3 自定義排序類

  這是Key的第二次比較,對所有的Key進行排序,即同時完成IntPair中的first和second排序。該類是一個比較器,可以通過兩種方式實現。

   1) 繼承WritableComparator。

public static class KeyComparator extends WritableComparator

  必須有一個構造函數,並且重載以下方法。

public int compare(WritableComparable w1, WritableComparable w2)

  2) 實現接口 RawComparator。

  上面兩種實現方式,在Job中,可以通過setSortComparatorClass()方法來設置Key的比較類。

job.setSortComparatorClass(KeyComparator.Class);

3.4 自定義分組類

  在Reduce階段,構造一個與 Key 相對應的 Value 叠代器的時候,只要first相同就屬於同一個組,放在一個Value叠代器。定義這個比較器,可以有兩種方式。

  分組的實質也是排序,此例子中排序是按照年份和溫度,而分組只是按照年份。

   1) 繼承WritableComparator。

public static class KeyComparator extends WritableComparator

  必須有一個構造函數,並且重載以下方法。

public int compare(WritableComparable w1, WritableComparable w2)

  2) 實現接口 RawComparator。

  上面兩種實現方式,在Job中,可以通過setSortComparatorClass()方法來設置Key的比較類。

job.setGroupingComparatorClass(GroupingComparator.Class);

  另外註意的是,如果reduce的輸入與輸出不是同一種類型,則 Combiner和Reducer 不能共用 Reducer 類,因為 Combiner 的輸出是 reduce 的輸入。除非重新定義一個Combiner。

四、代碼實現

思路:

  1、按照年份升序排序,同時每一年中溫度降序排序

  2、按照年份分組,每一年對應一個reduce任務

技術分享圖片
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class KeyPair implements WritableComparable<KeyPair> {
    
    private int year; //年份
    private int hot; //溫度
    

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getHot() {
        return hot;
    }

    public void setHot(int hot) {
        this.hot = hot;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.year = in.readInt();
        this.hot = in.readInt();
        
    }

    @Override
    public void write(DataOutput out) throws IOException {
    
        out.writeInt(year);
        out.writeInt(hot);
    }

    //重寫compareTo方法,用作key的比較,先比較年份,年份相同再比較溫度
    @Override
    public int compareTo(KeyPair o) {
        int y = Integer.compare(year, o.getYear());
        if(y == 0){
            return Integer.compare(hot, o.getHot());
        }
        return y;
    }

    @Override
    public String toString() {
        return year+"\t"+hot;
    }
}
自定義key 技術分享圖片
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FirstPartitioner extends Partitioner<KeyPair, Text> {
    @Override
    public int getPartition(KeyPair key, Text value, int nums) {
        //按照年份分區,乘127是為了分散開,nums是reduce數量
        return (key.getYear()*127 & Integer.MAX_VALUE) % nums;
    }
}
自定義分區類 技術分享圖片
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class SortKey extends WritableComparator {

    public SortKey() {
        super(KeyPair.class,true);
    }
    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        KeyPair k1 = (KeyPair)a;
        KeyPair k2 = (KeyPair)b;
        //先比較年份
        int pre = Integer.compare(k1.getYear(), k2.getYear());
        if(pre != 0){
            return pre;
        }
        //年份相同比較溫度
        //溫度倒序
        return -Integer.compare(k1.getHot(), k2.getHot());
    }
}
自定義排序類

  分組的實質也是排序,此例子中排序是按照年份和溫度,而分組只是按照年份。

技術分享圖片
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;



public class GroupComparator extends WritableComparator  {
    
    protected GroupComparator() {       
        super(KeyPair.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        KeyPair k1 = (KeyPair)a;
        KeyPair k2 = (KeyPair)b;
        //按照年份分組,每一年一個reduce,不考慮溫度
        return Integer.compare(k1.getYear(), k2.getYear());
    }

}
自定義分組類 技術分享圖片
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable, Text, KeyPair, Text> {
    
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private KeyPair k = new KeyPair();
    
    @Override
    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
        //keypair作為key,每一行文本作為value
        String line = new String(value.getBytes(), 0, value.getLength(), "GBK");
        String[] tmp = line.split("\t");
        System.out.println(tmp[0]+"\t"+tmp[1]);
        if(tmp.length>=2){
            try {
                Date date = sdf.parse(tmp[0]);
                Calendar cal = Calendar.getInstance();
                cal.setTime(date);
                int year = cal.get(1);
                k.setYear(year);
            } catch (ParseException e) {
                e.printStackTrace();
            }
            int hot = Integer.parseInt(tmp[1].substring(0, tmp[1].indexOf("℃")));
            k.setHot(hot);
            context.write(k, value);
        }
    }

}
自定義Mapper類 技術分享圖片
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<KeyPair, Text, KeyPair,Text> {

    @Override
    protected void reduce(KeyPair key, Iterable<Text> value,Context context) throws IOException, InterruptedException {
        
        for(Text t : value){
            context.write(key, t);
        }
    }
}
自定義Reducer類 技術分享圖片
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class YearHot {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "year hot sort");
        
        job.setJarByClass(YearHot.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        
        job.setNumReduceTasks(3);
        job.setPartitionerClass(FirstPartitioner.class);
        job.setSortComparatorClass(SortKey.class);
        job.setGroupingComparatorClass(GroupComparator.class);
        
        job.setOutputKeyClass(KeyPair.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormatClass(GBKOutputFormat.class);
        
        FileInputFormat.addInputPath(job, new Path("hdfs://192.168.228.134:/usr/input/data.txt"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.228.134:/usr/output"));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}
驅動類

MapReduce二次排序