1. 程式人生 > >Hadoop學習之路(十九)MapReduce框架排序

Hadoop學習之路(十九)MapReduce框架排序

ati ioe extends 一個用戶 必須 idt 構造 sta gpo

流量統計項目案例

樣本示例

技術分享圖片

技術分享圖片

需求

1、 統計每一個用戶(手機號)所耗費的總上行流量、總下行流量,總流量

2、 得出上題結果的基礎之上再加一個需求:將統計結果按照總流量倒序排序

3、 將流量匯總統計結果按照手機歸屬地不同省份輸出到不同文件中

第一題

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 第一題:統計每一個用戶(手機號)所耗費的總上行流量、總下行流量,總流量
*/ public class FlowSumMR { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "FlowSumMR"); job.setJarByClass(FlowSumMR.class); job.setMapperClass(FlowSumMRMapper.
class); job.setReducerClass(FlowSumMRReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("E:/bigdata/flow/input/")); FileOutputFormat.setOutputPath(job, new Path("E:/bigdata/flow/output_sum")); boolean isDone = job.waitForCompletion(true); System.exit(isDone ? 0 : 1); } public static class FlowSumMRMapper extends Mapper<LongWritable, Text, Text, Text>{ /** * value = 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 * iface.qiyi.com 視頻網站 15 12 1527 2106 200 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); String outkey = split[1]; String outValue = split[8] + "\t" + split[9]; context.write(new Text(outkey), new Text(outValue)); } } public static class FlowSumMRReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int upFlow = 0; int downFlow = 0; int sumFlow = 0; for(Text t : values){ String[] split = t.toString().split("\t"); int upTempFlow = Integer.parseInt(split[0]); int downTempFlow = Integer.parseInt(split[1]); upFlow+=upTempFlow; downFlow += downTempFlow; } sumFlow = upFlow + downFlow; context.write(key, new Text(upFlow + "\t" + downFlow + "\t" + sumFlow)); } } }

第二題

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import comg.ghgj.mr.pojo.FlowBean;

/**
 * 需求: 第二個題目,就是對第一個題目的結果數據,進行按照總流量倒敘排序
 * 
 * 
 */
public class FlowSortMR {

    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "FlowSumMR");
        job.setJarByClass(FlowSortMR.class);
        
        job.setMapperClass(FlowSortMRMapper.class);
        job.setReducerClass(FlowSortMRReducer.class);
        
        job.setOutputKeyClass(FlowBean.class);
        job.setOutputValueClass(NullWritable.class);
        
        
        FileInputFormat.setInputPaths(job, new Path("E:/bigdata/flow/output_sum"));
        FileOutputFormat.setOutputPath(job, new Path("E:/bigdata/flow/output_sort_777"));
        
        
        boolean isDone = job.waitForCompletion(true);
        System.exit(isDone ? 0 : 1);
        
    }
    
    public static class FlowSortMRMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
        
        /**
         * value  = 13602846565    26860680    40332600    67193280
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            
            String[] split = value.toString().split("\t");
            
            FlowBean fb = new FlowBean(split[0], Long.parseLong(split[1]), Long.parseLong(split[2]));
            
            context.write(fb, NullWritable.get());
        }
        
    }
    
    public static class FlowSortMRReducer extends Reducer<FlowBean, NullWritable, FlowBean, NullWritable>{
        
        @Override
        protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {
            
            
            for(NullWritable nvl : values){
                context.write(key, nvl);
            }
            
        }
        
    }
}

FlowBean.java

技術分享圖片
  1 import java.io.DataInput;
  2 import java.io.DataOutput;
  3 import java.io.IOException;
  4 
  5 import org.apache.hadoop.io.WritableComparable;
  6 
  7 /**
  8  * 第一,定義好屬性
  9  * 第二,定義好屬性的getter 和 setter方法
 10  * 第三,定義好構造方法(有參,無參)
 11  * 第四:定義好toString();
 12  * 
 13  * 
 14  * 詳細解釋:
 15  * 
 16  * 如果一個自定義對象要作為key 必須要實現 WritableComparable 接口, 而不能實現 Writable, Comparable
 17  * 
 18  * 如果一個自定義對象要作為value,那麽只需要實現Writable接口即可
 19  */
 20 public class FlowBean implements WritableComparable<FlowBean>{
 21 //public class FlowBean implements Comparable<FlowBean>{
 22 
 23     private String phone;
 24     private long upFlow;
 25     private long downFlow;
 26     private long sumFlow;
 27     public String getPhone() {
 28         return phone;
 29     }
 30     public void setPhone(String phone) {
 31         this.phone = phone;
 32     }
 33     public long getUpFlow() {
 34         return upFlow;
 35     }
 36     public void setUpFlow(long upFlow) {
 37         this.upFlow = upFlow;
 38     }
 39     public long getDownFlow() {
 40         return downFlow;
 41     }
 42     public void setDownFlow(long downFlow) {
 43         this.downFlow = downFlow;
 44     }
 45     public long getSumFlow() {
 46         return sumFlow;
 47     }
 48     public void setSumFlow(long sumFlow) {
 49         this.sumFlow = sumFlow;
 50     }
 51     public FlowBean(String phone, long upFlow, long downFlow, long sumFlow) {
 52         super();
 53         this.phone = phone;
 54         this.upFlow = upFlow;
 55         this.downFlow = downFlow;
 56         this.sumFlow = sumFlow;
 57     }
 58     public FlowBean(String phone, long upFlow, long downFlow) {
 59         super();
 60         this.phone = phone;
 61         this.upFlow = upFlow;
 62         this.downFlow = downFlow;
 63         this.sumFlow = upFlow + downFlow;
 64     }
 65     public FlowBean() {
 66         super();
 67         // TODO Auto-generated constructor stub
 68     }
 69     @Override
 70     public String toString() {
 71         return  phone + "\t" + upFlow + "\t" + downFlow + "\t" + sumFlow;
 72     }
 73     
 74     
 75     
 76     
 77     /**
 78      * 把當前這個對象 --- 誰掉用這個write方法,誰就是當前對象
 79      * 
 80      * FlowBean bean = new FlowBean();
 81      * 
 82      * bean.write(out)    把bean這個對象的四個屬性序列化出去
 83      * 
 84      *  this = bean
 85      */
 86     @Override
 87     public void write(DataOutput out) throws IOException {
 88         // TODO Auto-generated method stub
 89         
 90         out.writeUTF(phone);
 91         out.writeLong(upFlow);
 92         out.writeLong(downFlow);
 93         out.writeLong(sumFlow);
 94         
 95     }
 96     
 97     
 98     //   序列化方法中的寫出的字段順序, 一定一定一定要和 反序列化中的 接收順序一致。 類型也一定要一致
 99     
100     
101     /**
102      * bean.readField();
103      * 
104      *             upFlow = 
105      */
106     @Override
107     public void readFields(DataInput in) throws IOException {
108         // TODO Auto-generated method stub
109         
110         phone = in.readUTF();
111         upFlow = in.readLong();
112         downFlow = in.readLong();
113         sumFlow = in.readLong();
114         
115     }
116     
117     
118     
119     /**
120      * Hadoop的序列化機制為什麽不用   java自帶的實現 Serializable這種方式?
121      * 
122      * 本身Hadoop就是用來解決大數據問題的。
123      * 
124      * 那麽實現Serializable接口這種方式,在進行序列化的時候。除了會序列化屬性值之外,還會攜帶很多跟當前這個對象的類相關的各種信息
125      * 
126      * Hadoop采取了一種全新的序列化機制;只需要序列化 每個對象的屬性值即可。
127      */
128     
129     
130     
131     /*@Override
132       public void readFields(DataInput in) throws IOException {
133         value = in.readLong();
134       }
135 
136       @Override
137       public void write(DataOutput out) throws IOException {
138         out.writeLong(value);
139       }*/
140     
141     
142     /**
143      * 用來指定排序規則
144      */
145     @Override
146     public int compareTo(FlowBean fb) {
147 
148         long diff = this.getSumFlow() - fb.getSumFlow();
149         
150         if(diff == 0){
151             return 0;
152         }else{
153             return diff > 0 ? -1 : 1;
154         }
155         
156     }
157 }
View Code

第三題

package comg.ghgj.mr.flow;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.ProvincePartitioner;

public class FlowPartitionerMR {

    public static void main(String[] args) throws Exception {
        
        
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Job job = Job.getInstance(conf, "FlowSumMR");
        job.setJarByClass(FlowPartitionerMR.class);
        
        job.setMapperClass(FlowPartitionerMRMapper.class);
        job.setReducerClass(FlowPartitionerMRReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        
        /**
         * 非常重要的兩句代碼
         */
        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(10);
        
        
        FileInputFormat.setInputPaths(job, new Path("E:\\bigdata\\flow\\input"));
        Path outputPath = new Path("E:\\bigdata\\flow\\output_ptn2");
        if(fs.exists(outputPath)){
            fs.delete(outputPath, true);
        }
        FileOutputFormat.setOutputPath(job, outputPath);
        
        
        boolean isDone = job.waitForCompletion(true);
        System.exit(isDone ? 0 : 1);
    }
    
    public static class FlowPartitionerMRMapper extends Mapper<LongWritable, Text, Text, Text>{
        
        /**
         * value  =  13502468823    101663100    1529437140    1631100240
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            
            
            String[] split = value.toString().split("\t");
            
            String outkey = split[1];
            String outValue = split[8] + "\t" + split[9];
            
            context.write(new Text(outkey), new Text(outValue));
            
        }
    }
    
    public static class FlowPartitionerMRReducer extends Reducer<Text, Text, Text, Text>{
        
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            
            int upFlow = 0;
            int downFlow = 0;
            int sumFlow = 0;
            
            for(Text t : values){
                String[] split = t.toString().split("\t");
                
                int upTempFlow = Integer.parseInt(split[0]);
                int downTempFlow = Integer.parseInt(split[1]);
                
                upFlow+=upTempFlow;
                downFlow +=  downTempFlow;
            }
            
            sumFlow = upFlow + downFlow;
            
            context.write(key, new Text(upFlow + "\t" + downFlow + "\t" + sumFlow));
        }
    }
}

Hadoop學習之路(十九)MapReduce框架排序