1. 程式人生 > >hadoop中MapReduce的sort(部分排序,完全排序,二次排序)

hadoop中MapReduce的sort(部分排序,完全排序,二次排序)

1.部分排序

MapReduce預設就是在每個分割槽裡進行排序

2.完全排序

在所有的分割槽中,整體有序    

            1)使用一個reduce
            2)自定義分割槽函式

不同的key進入的到不同的分割槽之中,在每個分割槽中自動排序,實現完全分割槽..



import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class PassPartition extends Partitioner<Text, IntWritable> {

    @Override
    public int getPartition(Text text, IntWritable intWritable, int numPartitions) {

        String key = text.toString();
        if (key.compareTo("xxx") < 0) {
            return 0;
        }
        if (key.compareTo("aaaa") < 0) {
            return 1;
        } else return 2;
    }
}

         3)取樣        //對於純文字資料支援不友好,純文字的輸入輸出格式建議使用KeyValueTextInputFormat
                                     //1、設定分割槽類TotalOrderPartition(MR中存在此類 )
                    //2、初始化取樣器 => InputSampler.RandomSampler<Text,Text> sampler = new InputSampler.RandomSampler<Text,Text>(0.01,10);

SplitSampler
IntervalSampler

                          //3、設定取樣資料地址 => TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path("D:/"));
                         //4、寫入取樣資料 => InputSampler.writePartitionFile(job,sampler);
                         //5、注意1-4步必須寫在配置檔案之後,job執行之前

                1.隨機取樣
                   比較耗費資源,浪費效能

               2.切片取樣


               3. 間隔取樣 :效能最好 



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

public class PassApp {
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");

        FileSystem fs = FileSystem.get(conf);

        //通過配置檔案初始化job
        Job job = Job.getInstance(conf);

        //設定job名稱
        job.setJobName("word count");

        //job入口函式類
        job.setJarByClass(PassApp.class);

        //設定mapper類
        job.setMapperClass(PassMapper.class);

        //設定reducer類
        job.setReducerClass(PassReducer.class);


        //設定全排序取樣類TotalOrderPartitioner
        job.setPartitionerClass(TotalOrderPartitioner.class);


        //設定map的輸出k-v型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //設定reduce的輸出k-v型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //FileInputFormat.setMaxInputSplitSize(job,10);
        //FileInputFormat.setMinInputSplitSize(job,10);

        job.setInputFormatClass(KeyValueTextInputFormat.class);

        //設定輸入路徑
        FileInputFormat.addInputPath(job, new Path("D:/wc/out"));

        //設定輸出路徑
        FileOutputFormat.setOutputPath(job, new Path("D:/wc/out4"));

        if(fs.exists(new Path("D:/wc/out4"))){
            fs.delete(new Path("D:/wc/out4"),true);
        }

        //設定三個reduce
        job.setNumReduceTasks(3);


        /**
         * 隨機取樣,比較浪費效能,耗費資源
         * @param freq 每個key被選擇的概率 ,大於取樣數(2) / 所有key數量(100)
         * @param numSamples 所有切片中需要選擇的key數量
         */
        //設定取樣器型別
        InputSampler.RandomSampler<Text,Text> sampler = new InputSampler.RandomSampler<Text,Text>(0.001,8800);

        //InputSampler.SplitSampler<Text,Text> sampler = new InputSampler.SplitSampler<Text,Text>(10,3);

        //InputSampler.IntervalSampler<Text,Text> sampler = new InputSampler.IntervalSampler<Text,Text>(0.001);

        //設定取樣資料地址
        TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path("D:/wc/par/"));

        //寫入取樣資料
        InputSampler.writePartitionFile(job,sampler);

        //執行job
        boolean b = job.waitForCompletion(true);
    }
}

3.二次排序

在MapReduce完成後,在對key排序的基礎上,再對value進行排序

以年度氣溫最高統計

1901 :10  20 30 50 40

1901:30 20 10 11 -8    

對年份進行排序完成後,對氣溫再進行一個排序

實現方法:

1.自定義key,使年份_氣溫 變成一個key,自定義comkey 實現WritableComparable介面,實現自定義序列化和比較器(自定義排序演算法)



import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class CompKey implements WritableComparable<CompKey> {

    private String year;
    private int temp;

    //定義排序規則
    public int compareTo(CompKey o) {

        String oyear = o.getYear();//第一個
        String tyear = this.getYear();
        int otemp = o.getTemp();
        int ttemp = this.getTemp();

        //如果引數year 和現在的year相同,則比較temp的大小
        if (tyear.equals(oyear)) {
            return otemp - ttemp;
        }
        //不同,返回兩個year的比較值
        return tyear.compareTo(oyear);
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(year);
        out.writeInt(temp);
    }

    public void readFields(DataInput in) throws IOException {
        this.setYear(in.readUTF());
        this.setTemp(in.readInt());

    }


    @Override
    public String toString() {
        return "CompKey{" +
                "year='" + year + '\'' +
                ", temp=" + temp +
                '}';
    }

    public CompKey(String year, int temp) {
        this.year = year;
        this.temp = temp;
    }

    public CompKey() {
    }

    public String getYear() {
        return year;
    }

    public void setYear(String year) {
        this.year = year;
    }

    public int getTemp() {
        return temp;
    }

    public void setTemp(int temp) {
        this.temp = temp;
    }
}

2.自定義分組對比器,將所有指定的key變成一個key,也就是說1920 30 ,1920 40這兩個不同key識別成不同的key,這個分組對比器是在reduce端,重寫WritableComparator中的MyGroupComparator和compar



import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * reduce端 分組對比器,自定義key業務邏輯,將1902 20 和1902 30 識別為一個key
 */
public class MyGroupComparator extends WritableComparator {

    //必須寫,建立例項必須寫true
    protected MyGroupComparator() {

        super(CompKey.class, true);
    }

    //比較演算法
    //只要year相等則證明兩個key相等
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        CompKey ck1 = (CompKey) a;
        CompKey ck2 = (CompKey) b;
        return ck1.getYear().compareTo(ck2.getYear());
    }
}

在Mainapp中註冊分組對比器

job.setGroupingComparatorClass(MyGroupComparator.class);

.