hadoop[10]-對彙總結果進行排序
阿新 • • 發佈:2018-12-05
FlowBean:
package com.wange.flowcountsort; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable<FlowBean>{ private long upflow; private long dflow;View Codeprivate long sumflow; // 因為反射機制的需要,必須定義一個無參的建構函式 public FlowBean() {} public FlowBean(long upflow, long dflow) { this.upflow = upflow; this.dflow = dflow; this.sumflow = upflow + dflow; } public long getUpflow() { return upflow; } public voidsetUpflow(long upflow) { this.upflow = upflow; } public long getDflow() { return dflow; } public void setDflow(long dflow) { this.dflow = dflow; } public void set(long upflow, long dflow){ this.upflow = upflow; this.dflow = dflow;this.sumflow = upflow + dflow; } public long getSumflow() { return sumflow; } public void setSumflow(long sumflow) { this.sumflow = sumflow; } // 序列化方法 將我們要傳輸的資料序列化成位元組流 @Override public void write(DataOutput out) throws IOException { out.writeLong(upflow); out.writeLong(dflow); out.writeLong(sumflow); } // 反序列化的方法 從資料位元組流中恢復出各個欄位 @Override public void readFields(DataInput in) throws IOException { upflow = in.readLong(); dflow = in.readLong(); sumflow = in.readLong(); } @Override public String toString() { return upflow + "\t" + dflow + "\t" + sumflow; } @Override public int compareTo(FlowBean o) { // 倒序 return this.sumflow > o.getSumflow() ? -1 : 1; } }
由於邏輯比較簡單,所以把mapper、reducer都寫在一個類中:
package com.wange.flowcountsort; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 流量彙總排序 */ public class FlowCountSortSetpTwo { public static class FlowCountSortSetpTwoMapper extends Mapper<LongWritable, Text, FlowBean, Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, "\t"); String phone = fields[0]; long upFlow = Long.parseLong(fields[1]); long dFlow = Long.parseLong(fields[2]); // 流量資訊作為key context.write(new FlowBean(upFlow, dFlow), new Text(phone)); } } public static class FlowCountSortSetpTwoReducer extends Reducer<FlowBean, Text, Text, FlowBean>{ @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(values.iterator().next(), key); } } public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(FlowCountSortSetpTwo.class); job.setMapperClass(FlowCountSortSetpTwoMapper.class); job.setReducerClass(FlowCountSortSetpTwoReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }View Code
上傳jar包並執行:
# 執行 hadoop jar hadoop-mapreduce-partition-1.0.jar com.wange.flowcountsort.FlowCountSortSetpTwo /flow/partition /flow/sortoutput2 # 檢視結果 hadoop fs -ls /flow/sortoutput2 hadoop fs -cat /flow/sortoutput2/part-r-00000
結果如果能正常排序就成功了:
13502468823 7335 110349 117684 13925057413 11058 48243 59301 13726230503 2481 24681 27162 13726238888 2481 24681 27162 18320173382 9531 2412 11943 13560439658 2034 5892 7926 13660577991 6960 690 7650 15013685858 3659 3538 7197 13922314466 3008 3720 6728