1. 程式人生 > >Hadoop之手寫原生態MapReduce的排序

Hadoop之手寫原生態MapReduce的排序

測試資料:
2030 59
1976 68
2030 19
1997 5
年與溫度的文字,資料可以用java程式碼生成。

生成10000條資料程式碼:
public  void makeData() throws IOException {
        FileWriter fw = new FileWriter("e:/mr/tmp/temp.txt");
        for (int i = 0; i < 10000;i++){
            int year = 1970 + new Random().nextInt(100);
            int temp = -30
+ new Random().nextInt(100); fw.write(""+year +" "+temp +"\r\n"); } fw.close(); }

MapReduce全排序

1、應用場景

當需要從大量資料中獲取某一最大值最小值時,就得進行排序,這樣減少掉檢索的時間,優化了程式的執行效率。

2、實現方式

1、定義一個Reduce
2、自定義分割槽函式
3、使用hadoop取樣機制

3、程式碼

public static void main(String args[]) throws Exception {
        Configuration conf = new
Configuration(); conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); //設定job的各種屬性 job.setJobName("MaxTempApp"); //作業名稱 job.setJarByClass(MaxTempApp.class); //搜尋類 job.setInputFormatClass(SequenceFileInputFormat.class
); //設定輸入格式 //設定輸出格式類 //job.setOutputFormatClass(SequenceFileOutputFormat.class); //新增輸入路徑 FileInputFormat.addInputPath(job, new Path(args[0])); //設定輸出路徑 FileOutputFormat.setOutputPath(job, new Path(args[1])); //設定最大切片數 //FileInputFormat.setMaxInputSplitSize(job,1024); //設定最小切片數 //FileInputFormat.setMinInputSplitSize(job,1); //設定合成類 --不能取平均值 //job.setCombinerClass(MaxTempReducer.class); job.setMapperClass(MaxTempMapper.class); //mapper類 job.setReducerClass(MaxTempReducer.class); //reducer類 //可以設定reduce個數為1 job.setNumReduceTasks(3); //reducer個數 job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); //建立隨機取樣器物件 //freq:每個key被選中的概率 //numSamples:抽取的樣本總數 //maxSplitsSampled:最大采樣切片數(分割槽數) InputSampler.Sampler<IntWritable,IntWritable> sampler = new InputSampler.RandomSampler<IntWritable, IntWritable>(0.1,6000,3); //生成的檔案value為空,key為取樣的區間 例如:本次測試的顯示內容2002年、2036年區間節點 //setPartitionFile(conf,path) 不要使用conf,設定job物件的conf(該物件的conf在底層重新建立) TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path("d:/mr/par.lst")); //設定全排序分割槽類 job.setPartitionerClass(TotalOrderPartitioner.class); //設定自定義分割槽 //將sampler寫入分割槽檔案 InputSampler.writePartitionFile(job,sampler); job.waitForCompletion(true); }

MapReduce二次排序

1、應用場景

由於MapReduce只能對key排序,當需求是獲取value的最大值最小值,對value進行排序稱之為二次排序。

2、實現方式

1、自定義key
實現org.apache.hadoop.io.WritableComparable介面
2、自定義分割槽類
繼承org.apache.hadoop.mapreduce.Partitioner類
3、定義分組對比起
繼承org.apache.hadoop.io.WritableComparator類
4、定義自定義key的排序對比器
繼承org.apache.hadoop.io.WritableComparator類

3、程式碼

自定義key

public class ComboKey implements WritableComparable<ComboKey> {
    private int year ;
    private int temp ;

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getTemp() {
        return temp;
    }

    public void setTemp(int temp) {
        this.temp = temp;
    }

    /**
     * 對key進行比較實現
     */
    public int compareTo(ComboKey o) {
        System.out.println("ComboKey.CompareTo "+ o.toString());
        int y0 = o.getYear();
        int t0 = o.getTemp() ;
        //年份相同(升序)
        if(year == y0){
            //氣溫降序
            return -(temp - t0) ;
        }
        else{
            return year - y0 ;
        }
    }

    /**
     * 序列化過程
     */
    public void write(DataOutput out) throws IOException {
        //年份
        out.writeInt(year);
        //氣溫
        out.writeInt(temp);
    }

    public void readFields(DataInput in) throws IOException {
        year = in.readInt();
        temp = in.readInt();
    }


    public String toString() {
        return year+":"+temp;
    }
}

自定義分割槽類

public class YearPartitioner extends Partitioner<ComboKey,NullWritable> {

    public int getPartition(ComboKey key, NullWritable nullWritable, int numPartitions) {
        int year = key.getYear();
        return year % numPartitions;
    }
}

自定義分組對比器

public class YearGroupComparator extends WritableComparator {

    protected YearGroupComparator() {
        super(ComboKey.class, true);
    }

    public int compare(WritableComparable a, WritableComparable b) {
        System.out.println("YearGroupComparator"+a+","+b);
        ComboKey k1 = (ComboKey)a ;
        ComboKey k2 = (ComboKey)b ;
        return k1.getYear() - k2.getYear() ;
    }
}

自定義key排序對比器

public class ComboKeyComparator extends WritableComparator {

    protected ComboKeyComparator() {
        super(ComboKey.class, true);
    }

    public int compare(WritableComparable a, WritableComparable b) {
        System.out.println("ComboKeyComparator"+a+","+b);
        ComboKey k1 = (ComboKey) a;
        ComboKey k2 = (ComboKey) b;
        //對比方法在自定義key類中
        return k1.compareTo(k2);
    }
}

編寫Mapper

public class MaxTempMapper extends Mapper<LongWritable,Text,ComboKey,NullWritable> {

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println("MaxTempMapper.map");
        String line = value.toString();
        String arr[] = line.split(" ");

        ComboKey keyOut = new ComboKey();
        keyOut.setYear(Integer.parseInt(arr[0]));
        keyOut.setTemp(Integer.parseInt(arr[1]));
        context.write(keyOut,NullWritable.get());
    }
}

編寫Reduce

public class MaxTempReducer extends Reducer <ComboKey ,NullWritable, IntWritable ,IntWritable>{

    protected void reduce(ComboKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        int year = key.getYear();
        int temp = key.getTemp();
        context.write(new IntWritable(year),new IntWritable(temp));
    }
}

編寫App

public class MaxTempApp {
    public static void main(String args[]) throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","file:///");

        Job job = Job.getInstance(conf);

        //設定job的各種屬性
        job.setJobName("SecondarySortApp");             //作業名稱
        job.setJarByClass(MaxTempApp.class);            //搜尋類
        job.setInputFormatClass(TextInputFormat.class); //設定輸入格式

        //新增輸入路徑
        FileInputFormat.addInputPath(job,new Path(args[0]));
        //設定輸出路徑
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        job.setMapperClass(MaxTempMapper.class);             //mapper類
        job.setReducerClass(MaxTempReducer.class);           //reducer類

        //設定Map輸出型別
        job.setMapOutputKeyClass(ComboKey.class);            
        job.setMapOutputValueClass(NullWritable.class);      

        //設定ReduceOutput型別
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);         

        //設定分割槽類
        job.setPartitionerClass(YearPartitioner.class);
        //設定分組對比器
        job.setGroupingComparatorClass(YearGroupComparator.class);
        //設定排序對比器
        job.setSortComparatorClass(ComboKeyComparator.class);

        job.setNumReduceTasks(3);                           //reduce個數

        job.waitForCompletion(true);
    }
}