MapReduce 統計手機使用者的上行流量,下行流量,總流量,並對輸出的結果進行倒序排序。
阿新 • • 發佈:2018-12-15
首先,要知道hadoop自帶的LongWritable 是沒辦法儲存三個變數,即使用者的上行流量,下行流量,總流量。
這個時候,沒辦法,你就要去寫一個屬於你自己的介面,去實現能夠放入這三個資料。
這裡定義為flowbean,實現WritableComparable介面
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @Description TODO 流量bean * @Date 10-17-2018 * @ClassName:FlowBean */ public class FlowBean implements WritableComparable<FlowBean> { private long upFlow;// 上行流量 private long downFlow;// 下行流量 private long sumFlow;// 總流量 // 序列化時,需要反射呼叫空的無引數構造 public FlowBean() { super(); } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; }/* * 序列化:將我們要傳輸的資料序列化成位元組流 * * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) */ //@Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化:從位元組流中恢復出各個欄位 * * 順序和序列化的順序一致 */ //@Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } //@Override public int compareTo(FlowBean fb) { //倒序排序 // TODO Auto-generated method stub //返回1則交換,-1則不交換。 return this.sumFlow > fb.getSumFlow() ? -1 : 1; } }
然後就是mapreduce程式碼:
/** * @Descripition:計算使用者的上行下行流量,以及總流量,並且對總流量以倒序排序。 * @Date:2018-10-17 * @ClassName: Flow_log * */ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class Flow_log { //Map public static class myMapper extends Mapper<LongWritable, Text, FlowBean, Text>{ public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException { String[] line = value.toString().split("\t"); String phone = line[1]; int size = line.length; Long upRate = Long.parseLong(line[size - 3]); Long downRate = Long.parseLong(line[size - 2]); context.write(new FlowBean(upRate,downRate), new Text(phone)); } } //Reduce /** * 在Hadoop預設的排序演算法中,只會針對key值進行排序, * 所以應該將Text,FlowBean的位置換一下, * 讓reduce排序演算法去排FlowBean的順序, * 並且在FlowBean中重寫compareTo方法 */ public static class myReducer extends Reducer<FlowBean, Text, Text, FlowBean>{ public void reduce (FlowBean key, Iterable<Text> values, Context context) throws IOException,InterruptedException{ /*long upRate_count = 0; long downRate_count = 0; long sumRate = 0; for(FlowBean b : values){ upRate_count += b.getUpFlow(); downRate_count += b.getDownFlow(); //sumRate = upRate_count + downRate_count; }*/ context.write(values.iterator().next(), key); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Flow_log"); job.setJarByClass(Flow_log.class); job.setMapperClass(myMapper.class); job.setReducerClass(myReducer.class); //指定mapper輸出資料的k,v型別 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); //指定最終輸出資料的k,v型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)? 0 :1); } }
其次,要知道,mapreduce是對key值進行排序的,所以當我們實現了WritableComparable介面的compareTo方法之後,還需要把我們的mapper輸出k,v的值換一下位置,確保把流量的資料放在key的位置,這樣reduce過程才能將其排序,而我們重寫之後,就能對進行修改,變成倒序。