1. 程式人生 > >MapReduce程序之二次排序與多次排序

MapReduce程序之二次排序與多次排序

大數據 Hadoop MapReduce Java

[toc]


MapReduce程序之二次排序與多次排序

需求

有下面的數據:

cookieId    time    url
2   12:12:34    2_hao123
3   09:10:34    3_baidu
1   15:02:41    1_google
3   22:11:34    3_sougou
1   19:10:34    1_baidu
2   15:02:41    2_google
1   12:12:34    1_hao123
3   23:10:34    3_soso
2   05:02:41    2_google

假如我們現在的需求是先按 cookieId 排序,然後按 time 排序,以便按 session 切分日誌,排序後的結果如下:

---------------------------------
1      12:12:34        1_hao123
1      15:02:41        1_google
1      19:10:34        1_baidu
---------------------------------
2      05:02:41        2_google
2      12:12:34        2_hao123
2      15:02:41        2_google
---------------------------------
3      09:10:34        3_baidu
3      22:11:34        3_sougou
3      23:10:34        3_soso

要求使用MapReduce程序實現。

程序思路分析

Map函數:
/**
 * Map函數,解析每一行記錄為AccessLogWritable,這樣Map輸出的時候就可以根據
 * AccessLogWritable對象中的兩個字段進行排序,從而實現前面要求的二次排序需求
 * 也就是說,排序依舊是依賴Map輸出時的排序,但是規則是我們在AccessLogWritable中定義的
 */

 Reduce函數:
/**
 * 經過shuffle後到達Reducer的數據已經是有序的,所以直接寫出即可
 */

所以為了進行多個數據的比較,我們需要自定義key來作為Map輸出的key。

MapReduce程序

關於如何進行數據的排序,思路已經在代碼註釋中有說明,不過需要註意的是,這裏使用了前面開發的Job工具類來開發驅動程序。

SecondSortJob.java

package com.uplooking.bigdata.mr.secondsort;

import com.uplooking.bigdata.common.utils.MapReduceJobUtil;
import com.uplooking.bigdata.mr.sort.SortJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

/**
 * MapReduce排序之二次排序
 */
public class SecondSortJob {

    /**
     * 驅動程序,使用工具類使用Job
     * @param args
     */
    public static void main(String[] args) throws Exception {
        if (args == null || args.length < 2) {
            System.err.println("Parameter Errors! Usages:<inputpath> <outputpath>");
            System.exit(-1);
        }

        Job job = MapReduceJobUtil.buildJob(new Configuration(),
                SecondSortJob.class,
                args[0],
                TextInputFormat.class,
                SecondSortMapper.class,
                AccessLogWritable.class,
                NullWritable.class,
                new Path(args[1]),
                TextOutputFormat.class,
                SecondSortReducer.class,
                AccessLogWritable.class,
                NullWritable.class);

        // ReduceTask必須設置為1
        job.setNumReduceTasks(1);
        job.waitForCompletion(true);
    }

    /**
     * Map函數,解析每一行記錄為AccessLogWritable,這樣Map輸出的時候就可以根據
     * AccessLogWritable對象中的兩個字段進行排序,從而實現前面要求的二次排序需求
     * 也就是說,排序依舊是依賴Map輸出時的排序,但是規則是我們在AccessLogWritable中定義的
     */
    public static class SecondSortMapper extends Mapper<LongWritable, Text, AccessLogWritable, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 解析每一行
            String[] fields = value.toString().split("\t");
            if(fields == null || fields.length < 3) {
                return;
            }
            String cookieId = fields[0];
            String time = fields[1];
            String url = fields[2];
            // 構建AccessLogWritable對象
            AccessLogWritable logLine = new AccessLogWritable(cookieId, time, url);
            // 寫出到context
            context.write(logLine, NullWritable.get());
        }
    }

    /**
     * 經過shuffle後到達Reducer的數據已經是有序的,所以直接寫出即可
     */
    public static class SecondSortReducer extends Reducer<AccessLogWritable, NullWritable, AccessLogWritable, NullWritable> {
        @Override
        protected void reduce(AccessLogWritable key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
}

AccessLogWritable.java

package com.uplooking.bigdata.mr.secondsort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 自定義Hadoop數據類型,作為key,需要實現WritableComparable接口
 * map中排序需要比較的對象為AccessLogWritable,所以泛型填寫為AccessLogWritable
 */
public class AccessLogWritable implements WritableComparable<AccessLogWritable> {

    private String cookieId;
    private String time;
    private String url;

    /**
     * 空參構造方法,必須要有,否則會有下面的異常:
     Caused by: java.lang.NoSuchMethodException: com.uplooking.bigdata.mr.secondsort.AccessLogWritable.<init>()
     at java.lang.Class.getConstructor0(Class.java:3082)
     at java.lang.Class.getDeclaredConstructor(Class.java:2178)
     at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125)
     ... 16 more
     */
    public AccessLogWritable() {

    }

    public AccessLogWritable(String cookieId, String time, String url) {
        this.cookieId = cookieId;
        this.time = time;
        this.url = url;
    }

    /**
     * 比較的方法,定義的規則為:
     * 先按 cookieId 排序,然後按 time 排序
     * @param o
     * @return
     */
    public int compareTo(AccessLogWritable o) {
        int ret = this.cookieId.compareTo(o.cookieId);
        // 如果cookieId比較結果相同,再比較time
        if(ret == 0) {
            ret = this.time.compareTo(o.time);
        }
        return ret;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(cookieId);
        out.writeUTF(time);
        out.writeUTF(url);
    }

    public void readFields(DataInput in) throws IOException {
        this.cookieId = in.readUTF();
        this.time = in.readUTF();
        this.url = in.readUTF();
    }

    @Override
    public String toString() {
        return cookieId + "\t" + time + "\t" + url;
    }
}

測試

這裏使用本地環境來運行MapReduce程序,輸入的參數如下:

/Users/yeyonghao/data/input/secondsort /Users/yeyonghao/data/output/mr/secondsort

也可以將其打包成jar包,然後上傳到Hadoop環境中運行。

運行程序後,查看輸出結果如下:

yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/secondsort$ cat part-r-00000
1   12:12:34    1_hao123
1   15:02:41    1_google
1   19:10:34    1_baidu
2   05:02:41    2_google
2   12:12:34    2_hao123
2   15:02:41    2_google
3   09:10:34    3_baidu
3   22:11:34    3_sougou
3   23:10:34    3_soso

可以看到,通過使用自定義的key,我們的MapReduce程序已經完成了二次排序的功能。

擴展:如何實現多次排序

其實如果上面的程序能夠理解清楚的話,多次排序的思路應該也是很自然就可以想到的,因為比較的規則其實是在key中定義的,而對於Map來說,是依據key來進行排序的,所以如果需要進行多次排序,我們就可以在自定義的key的compareTo方法中來實現多次排序的規則,有興趣的朋友可以自行寫出這樣的程序,這裏就不再說明。

MapReduce程序之二次排序與多次排序