1. 程式人生 > >MapReduce 統計手機使用者的上行流量,下行流量,總流量,並對輸出的結果進行倒序排序。(一)

MapReduce 統計手機使用者的上行流量,下行流量,總流量,並對輸出的結果進行倒序排序。(一)

 

首先,要知道hadoop自帶的LongWritable 是沒辦法儲存三個變數,即使用者的上行流量,下行流量,總流量。

這個時候,沒辦法,你就要去寫一個屬於你自己的介面,去實現能夠放入這三個資料。

MapReduce中傳輸自定義資料型別(Bean->setter+getter)
(1) 要在Hadoop的各個節點之間傳輸,就必須實現其序列化機制,實現 Writable介面 ,重寫兩個方法:
readFields(DataInput):將Bean序列化到傳輸流中
write(DataOutput):從傳輸流中還原為Bean

(2) Hadoop的序列化機制與原生JDK不同,只傳輸Bean本身,而不傳輸繼承結構資訊,只要資料以減少冗餘

(3)序列化還原時,底層使用反射->需要無參構造方法

(4)如果要實現自定義排序,則必須實現WritableComparable<T>介面。注:不能使用Writable+Comparable<T>組合,否則丟擲initialization異常

這裡定義為flowbean,實現WritableComparable介面

 

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @Description TODO 流量bean
 * @Date 10-17-2018
 * @ClassName:FlowBean
 */

public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow;// 上行流量
    private long downFlow;// 下行流量
    private long sumFlow;// 總流量

    // 序列化時,需要反射呼叫空的無引數構造
    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }/*
     * 序列化:將我們要傳輸的資料序列化成位元組流
     *
     * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
     */

    //@Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
     * 反序列化:從位元組流中恢復出各個欄位
     *
     * 順序和序列化的順序一致
     */

    //@Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    //@Override
    public  int compareTo(FlowBean fb) {
        //倒序排序
        // TODO Auto-generated method stub
        //返回1則交換,-1則不交換。
        return this.sumFlow > fb.getSumFlow() ? -1 : 1;
    }

}

 

然後就是mapreduce程式碼:

/**
 * @Descripition:計算使用者的上行下行流量,以及總流量,並且對總流量以倒序排序。
 * @Date:2018-10-17
 * @ClassName: Flow_log
 *
 */

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Flow_log  {

    //Map
    public static class myMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
        public void map(LongWritable key, Text value, Context context) throws IOException,InterruptedException {
            String[] line = value.toString().split("\t");
            String phone = line[1];
            int size = line.length;
            Long upRate = Long.parseLong(line[size - 3]);
            Long downRate = Long.parseLong(line[size - 2]);
            context.write(new FlowBean(upRate,downRate), new Text(phone));
        }
    }

    //Reduce

    /**
     * 在Hadoop預設的排序演算法中,只會針對key值進行排序,
     * 所以應該將Text,FlowBean的位置換一下,
     * 讓reduce排序演算法去排FlowBean的順序,
     * 並且在FlowBean中重寫compareTo方法
     */
    public static class myReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
        public void reduce (FlowBean key, Iterable<Text> values, Context context) throws IOException,InterruptedException{
            /*long upRate_count = 0;
            long downRate_count = 0;
            long sumRate = 0;
            for(FlowBean b : values){
                upRate_count += b.getUpFlow();
                downRate_count += b.getDownFlow();
                //sumRate = upRate_count + downRate_count;
            }*/
            context.write(values.iterator().next(), key);

        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf, "Flow_log");
        job.setJarByClass(Flow_log.class);
        job.setMapperClass(myMapper.class);
        job.setReducerClass(myReducer.class);

        //指定mapper輸出資料的k,v型別
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        //指定最終輸出資料的k,v型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true)? 0 :1);
    }

}

 

其次,要知道,mapreduce是對key值進行排序的,所以當我們實現了WritableComparable介面的compareTo方法之後,還需要把我們的mapper輸出k,v的值換一下位置,確保把流量的資料放在key的位置,這樣reduce過程才能將其排序,而我們重寫之後,就能對進行修改,變成倒序。