1. 程式人生 > >大資料入門(10)序列化機制,mr流量求和

大資料入門(10)序列化機制,mr流量求和


public class FlowBean implements WritableComparable<FlowBean>{

    private String phoneNB;
    private long up_flow;
    private long d_flow;
    private long s_flow;
    
    
    //在反序列化時,反射機制需要呼叫空參建構函式
    public FlowBean() {
    }

    //為了物件資料的初始化方便,加入一個帶參的建構函式
    public FlowBean(String phoneNB, long up_flow, long d_flow) {
        this.phoneNB = phoneNB;
        this.up_flow = up_flow;
        this.d_flow = d_flow;
        this.s_flow = up_flow + d_flow;
    }

    @Override
    public String toString() {
        return "" + up_flow + "\t" +d_flow + "\t" + s_flow;
    }

    //從資料流中反序列出物件的資料
    //從資料流中讀出物件欄位時,必須跟序列化時的順序保持一致
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        this.phoneNB = in.readUTF();
        this.up_flow = in.readLong();
        this.d_flow = in.readLong();
        this.s_flow = in.readLong();
    }

    //將物件資料序列化到流中
    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeUTF(phoneNB);
        out.writeLong(up_flow);
        out.writeLong(d_flow);
        out.writeLong(s_flow);
    }

    @Override
    public int compareTo(FlowBean o) {
        // TODO Auto-generated method stub
        return this.s_flow>o.getS_flow()?-1:1;
    }
    //get/set方法
}
/**********************************************************************************************************************/
/**
 * FlowBean 是自定義的一種資料型別,要在hadoop的各個節點之間傳輸,應該遵循hadoop的序列化機制
 * 就必須實現hadoop相應的序列化介面
 *
 */
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

    protected void map(LongWritable key, Text value, Context context) throws IOException ,InterruptedException {
        //一行資料
        String line = value.toString();
        //切分成各個欄位
        //String[] fields = StringUtils.split(line,"\t");
        String[] fields = line.split("\t");
        //需要的欄位
        String phoneNB = fields[1];
        long u_flow = Long.parseLong(fields[7]);
        long d_flow = Long.parseLong(fields[8]);
        
        //封裝資料為kv並輸出
        context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));
    };
}
/*********************************************************************************************************/
//框架每傳遞一組資料<1387788654,{flowbean,flowbean,flowbean,flowbean.....}>呼叫一次我們的reduce方法
//reduce中的業務邏輯就是遍歷values,然後進行累加求和再輸出
public class FlowSumReduce extends Reducer<Text, FlowBean, Text, FlowBean>{

    protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException ,InterruptedException {
        long up_flow_counter = 0;
        long d_flow_counter = 0;
        for (FlowBean bean : values){
            up_flow_counter+=bean.getUp_flow();
            d_flow_counter+=bean.getD_flow();
        }
        
        context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));
    };
}
/**************************************************************************************************/
public class FlowSumRunner extends Configured implements Tool{

    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        
        job.setJarByClass(FlowSumRunner.class);
        
        job.setMapperClass(FlowSumMapper.class);
        job.setReducerClass(FlowSumReduce.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        return job.waitForCompletion(true)?0:1;
    }
    
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new FlowSumRunner(),args);
        System.exit(res);
        
    }
    

}