1. 程式人生 > >hadoop 二次排序和一個java實現

hadoop 二次排序和一個java實現

需要二次排序的原因:mapreduce架構自動對對映器生成的鍵進行排序,即歸約器啟動之前,所有鍵是有序的,但是值是隨機的,二次排序指的是對值進行排序。歸約器輸入形如:(key, list(value)), list(value) = (V_{1}, V_{2},...,V_{n}),即一個key對應多個值,這些值是無序的,排序後得到有序的值,如下:

SORT(V_{1},V_{2},...,V_{n}) = (S_{1},S_{2},...,S_{n})

list(value) = (S_{1},S_{2},...,S_{n})

其中,S按照升序或者降序排列

歸約器對於二次排序的兩種解決方案:

1.讓歸約器讀取和快取給定鍵的所有值,完成歸約器中排序,特點是不可伸縮,依賴歸約器記憶體

2.使用mapreduce框架對歸約器值排序,方法是建立組合鍵,例如,A是鍵,B和C是值,選擇B作為次鍵,這樣A和B作為組合鍵,day作為值,將排序交給MapReduce框架完成,這樣不用在記憶體中排序,是可伸縮的方案

定製外掛:

1.分割槽器:根據對映器的輸出鍵決定將哪個對映器的輸出傳送到哪個歸約器,其本質是利用一致性hash演算法

2.比較器:按照自然鍵對一個歸約器中的資料分組,程式碼如下:

以year,month,temperature為例,MapReduce框架對於二次排序整體的處理流程是:

1.對映器建立(K,V)對,其中K是組合鍵(year,month,temperature),V是temperature的值,組合鍵的(year,month)部分是自然鍵

2.通過分割槽器外掛,將所有自然鍵傳送給同一個歸約器

3.通過分組比較器,保證溫度按照順序到達歸約器

顯然,MapReduce框架完成了排序,而不用在記憶體中操作

程式碼如下,除了mapper,reducer,主作業流程以外,還有其餘3個檔案,一個分割槽器、一個比較器、一箇中間件類:

DateTemperatureGroupingComparator.java分組比較器:

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class DateTemperatureGroupingComparator extends WritableComparator {
    public DateTemperatureGroupingComparator() {
        /* 呼叫父類建構函式 */
        super(DateTemperaturePair.class, true);
    }
    @Override
    /* 決定輸出鍵和歸約器的對應關係,保證相同鍵傳送到同一個歸約器 */
    public int compare(WritableComparable a, WritableComparable b) {
        DateTemperaturePair pair1 = (DateTemperaturePair) a;
        DateTemperaturePair pair2 = (DateTemperaturePair) b;
        return pair1.getYearMonth().compareTo(pair2.getYearMonth());
    }
}

DateTemperaturePartitioner.java分割槽器:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class DateTemperaturePartitioner extends
    Partitioner<DateTemperaturePair, Text> {
    @Override
    /* 入參是mapper的輸出鍵和輸出值的型別,是一個String類的內建hash演算法 */
    public int getPartition(DateTemperaturePair dataTemperaturePair, Text text,
        int i) {
        return Math.abs(dataTemperaturePair.getYearMonth().hashCode() % i);
    }
}

DateTemperaturePair.java中間鍵類

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class DateTemperaturePair
    /* java不支援多重繼承,使用implements繼承介面,不同介面之間用逗號隔開 */
    implements Writable, WritableComparable<DateTemperaturePair> {
    private String yearMonth;   //自然鍵
    private String day;
    protected Integer temperature;  //次鍵

    /* 用這個方法指出如何對DateTemperaturePair排序 */
    public int compareTo(DateTemperaturePair o) {
        /* 呼叫String的字串比較方法compareTo */
        int compareValue = this.yearMonth.compareTo(o.getYearMonth());
        if (compareValue == 0) {
            compareValue = temperature.compareTo(o.getTemperature());
        }
        /* 這樣實現降序排列 */
        return -1 * compareValue;
    }

    /* DataOutput用於將java基本型別轉換成二進位制字元流 */
    public void write(DataOutput dataOutput) throws IOException {
        Text.writeString(dataOutput, yearMonth);
        dataOutput.writeInt(temperature);
    }

    public void readFields(DataInput dataInput) throws IOException {
        this.yearMonth = Text.readString(dataInput);
        this.temperature = dataInput.readInt();
    }

    @Override
    public String toString() {
        return yearMonth.toString();
    }

    public String getYearMonth() {
        return yearMonth;
    }

    public void setYearMonth(String text) {
        this.yearMonth = text;
    }

    public void setDay(String day) {
        this.day = day;
    }

    public Integer getTemperature() {
        return temperature;
    }

    public void setTemperature(Integer temperature) {
        this.temperature = temperature;
    }
}

SecondarySortingMapper.java對映器:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/* 注意輸出鍵的型別是DateTemperaturePair,是自定義的組合鍵 */
public class SecondarySortingMapper extends
        Mapper<LongWritable, Text, DateTemperaturePair, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
        String[] tokens = value.toString().split(",");
        String yearMonth = tokens[0] + "-" + tokens[1];
        String day = tokens[2];
        int temperature = Integer.parseInt(tokens[3]);
        DateTemperaturePair reduceKey = new DateTemperaturePair();
        reduceKey.setYearMonth(yearMonth);
        reduceKey.setDay(day);
        reduceKey.setTemperature(temperature);
        context.write(reduceKey, new IntWritable(temperature));
    }
}

SecondarySortingReducer.java歸約器:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/* 輸入鍵和輸出鍵的型別都是自定義的中間鍵 */
public class SecondarySortingReducer extends
        Reducer<DateTemperaturePair, IntWritable, DateTemperaturePair, Text> {
    @Override
    protected void reduce(DateTemperaturePair key,
            Iterable<IntWritable> values, Context context) throws IOException,
            InterruptedException {
        /* java的字串變數,在修改場景下執行速度和效率比string高 */
        StringBuilder sortedTemperatureList = new StringBuilder();
        for (IntWritable temperature : values) {
            sortedTemperatureList.append(temperature);
            sortedTemperatureList.append(",");
        }
        sortedTemperatureList.deleteCharAt(sortedTemperatureList.length() - 1);
        context.write(key, new Text(sortedTemperatureList.toString()));
    }
}

SecondarySort.java主作業流程:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class SecondarySort extends Configured implements Tool {
    public int run(String[] args) throws Exception {

        /* 設定作業,指導hadoop獲取jar包 */
        Job job = new Job();
        job.setJarByClass(SecondarySort.class);
        job.setJobName("SecondarySort");

        /* 獲取input路徑和output路徑 */
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        /* 設定mapper的輸出鍵和輸出值 */
        job.setMapOutputKeyClass(DateTemperaturePair.class);
        job.setMapOutputValueClass(IntWritable.class);
       
        /* 設定reducer的輸出鍵和輸出值 */
        job.setOutputKeyClass(DateTemperaturePair.class);
        job.setOutputValueClass(IntWritable.class);
       
        /* 指定要使用的mapper,reducer,分割槽器,比較器 */
        job.setMapperClass(SecondarySortingMapper.class);
        job.setReducerClass(SecondarySortingReducer.class);
        job.setPartitionerClass(DateTemperaturePartitioner.class);
        job.setGroupingComparatorClass(DateTemperatureGroupingComparator.class);

        boolean status = job.waitForCompletion(true);
        return status ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
             throw new IllegalArgumentException(
                "!!!!!!!!!!!!!! Usage!!!!!!!!!!!!!!: SecondarySort"
                + "<input-path> <output-path>");
        }
        int returnStatus = ToolRunner.run(new SecondarySort(), args);
        System.exit(returnStatus);
    }
}

輸入檔案:

[[email protected] ~]# hdfs dfs -cat /sample_input.txt
2000,12,04,10
2000,11,01,20
2000,12,02,-20
2000,11,02,30
2000,11,24,-40
2012,12,21,30
2012,12,22,-20
2012,12,23,60
2012,12,24,70
2012,12,25,10
2013,01,22,80
2013,01,23,90
2013,01,24,70
2013,01,20,-10

執行作業命令:

hadoop jar SecondarySort.jar SecondarySort /sample_input.txt output

作業執行結果如下,可以看到已經按照temperature欄位降序排序了:

[[email protected] ~]# hdfs dfs -cat output/*
2013-01	90,80,70,-10
2012-12	70,60,30,10,-20
2000-12	10,-20
2000-11	30,20,-40