1. 程式人生 > >hadoop[10]-對彙總結果進行排序

hadoop[10]-對彙總結果進行排序

FlowBean:

package com.wange.flowcountsort;

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean>{
    private long upflow;
    private long dflow;
    
private long sumflow; // 因為反射機制的需要,必須定義一個無參的建構函式 public FlowBean() {} public FlowBean(long upflow, long dflow) { this.upflow = upflow; this.dflow = dflow; this.sumflow = upflow + dflow; } public long getUpflow() { return upflow; } public void
setUpflow(long upflow) { this.upflow = upflow; } public long getDflow() { return dflow; } public void setDflow(long dflow) { this.dflow = dflow; } public void set(long upflow, long dflow){ this.upflow = upflow; this.dflow = dflow;
this.sumflow = upflow + dflow; } public long getSumflow() { return sumflow; } public void setSumflow(long sumflow) { this.sumflow = sumflow; } // 序列化方法 將我們要傳輸的資料序列化成位元組流 @Override public void write(DataOutput out) throws IOException { out.writeLong(upflow); out.writeLong(dflow); out.writeLong(sumflow); } // 反序列化的方法 從資料位元組流中恢復出各個欄位 @Override public void readFields(DataInput in) throws IOException { upflow = in.readLong(); dflow = in.readLong(); sumflow = in.readLong(); } @Override public String toString() { return upflow + "\t" + dflow + "\t" + sumflow; } @Override public int compareTo(FlowBean o) { // 倒序 return this.sumflow > o.getSumflow() ? -1 : 1; } }
View Code

由於邏輯比較簡單,所以把mapper、reducer都寫在一個類中:

package com.wange.flowcountsort;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

/**
 * 流量彙總排序
 */
public class FlowCountSortSetpTwo {
    public static class FlowCountSortSetpTwoMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = StringUtils.split(line, "\t");
            String phone = fields[0];
            long upFlow = Long.parseLong(fields[1]);
            long dFlow = Long.parseLong(fields[2]);
            // 流量資訊作為key
            context.write(new FlowBean(upFlow, dFlow), new Text(phone));
        }
    }

    public static class FlowCountSortSetpTwoReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
        @Override
        protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            context.write(values.iterator().next(), key);
        }
    }

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(FlowCountSortSetpTwo.class);

        job.setMapperClass(FlowCountSortSetpTwoMapper.class);
        job.setReducerClass(FlowCountSortSetpTwoReducer.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}
View Code

上傳jar包並執行:

# 執行
hadoop jar hadoop-mapreduce-partition-1.0.jar com.wange.flowcountsort.FlowCountSortSetpTwo /flow/partition /flow/sortoutput2
# 檢視結果
hadoop fs -ls /flow/sortoutput2
hadoop fs -cat /flow/sortoutput2/part-r-00000

結果如果能正常排序就成功了:

13502468823     7335    110349  117684
13925057413     11058   48243   59301
13726230503     2481    24681   27162
13726238888     2481    24681   27162
18320173382     9531    2412    11943
13560439658     2034    5892    7926
13660577991     6960    690     7650
15013685858     3659    3538    7197
13922314466     3008    3720    6728