1. 程式人生 > >MapReduce 自定義屬性類(輸出電話號對應的上行,下行流量及其總計,並排序)

MapReduce 自定義屬性類(輸出電話號對應的上行,下行流量及其總計,並排序)

MapReduce 自定義屬性類

注意要點:

  • 無參構造方法
  • 繼承 Writable類
  • 重寫write() readFields()方法

相關錯誤:

  1. java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodExcep(原因 沒有無參構造方法)

  2. java.io.IOException: Initialization of all the collectors failed. Error in last collector was :null(導包錯誤)

  3. 空指標 (沒有繼承 Writable類)

  4. 屬性是空值(沒有重寫 write() readFields()方法,輸出000)

程式碼如下:

  public class Flow implements Writable {
    	 @Override
        public void write(DataOutput output) throws IOException {
            output.writeLong(upFlow);
            output.writeLong(downFlow);
            output.writeLong(sumFlow);
        }
        @Override
        public void readFields(DataInput input) throws IOException {
            upFlow = input.readLong();
            downFlow = input.readLong();
            sumFlow = input.readLong();
        }
        //排序  繼承WritableComparable<Flow>介面
         @Override
    public int compareTo(Flow o) {
        return this.sumFlow > o.sumFlow ? -1 : 1;
    }
   } 
    

public class PhoneFlowMaster {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        System.setProperty("hadoop.home.dir", "C:\\Program Files\\hadoop-2.7.1");

        Path inputPath = new Path("/input/flow.txt");
        //處理結果存放目錄
        Path outputPath = new Path("/output/181020");
        //處理結果存放目錄
        Configuration conf=new Configuration() ;
        FileSystem fs = FileSystem.get(conf);
        /**初始化job引數,
         * 1.指定job名稱
         * 2.指定類 3
         * 3.指定資料型別 2
         * 4.指定路徑 2
         * 5.result
         */
        Job job =Job.getInstance(conf,"PhoneFlow");
        //設定job執行的類
        job.setJarByClass(PhoneFlowMaster.class);
        //設定Mapper類
        job.setMapperClass(PhoneFlowMapper.class);
        //設定Reduce類
        job.setReducerClass(PhoneFlowReduce.class);

        //設定Map輸出資料型別 key value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Flow.class);
        //Reduce輸出資料型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //set 0 part-m  中間結果map reduce 無意義  可以註釋掉輸出資料型別
        //job.setNumReduceTasks(0);
        //設定輸入路徑 lib
        FileInputFormat.setInputPaths(job,inputPath);
        //設定輸出路徑 lib
        FileOutputFormat.setOutputPath(job,outputPath);
        //路徑如果存在必須先刪除
        if(fs.exists(outputPath)){
            fs.delete(outputPath,true);
        }
        boolean result=job.waitForCompletion(true);
        if(result){
            System.out.println("Congratulations! success");
        }

    }

}

public class PhoneFlowReduce extends Reducer<Text, Flow, Text,Text> {
    //相同時候呼叫一次
    @Override
    protected void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
        long upflow=0;
        long downflow=0;
        long sumflow=0;
        for(Flow value :values){
            upflow+=value.getUpFlow();
            downflow+=value.getDownFlow();
            sumflow+=value.getSumFlow();
        }
        context.write(key,new Text(upflow+" "+downflow+" "+sumflow));
    }
}

public class PhoneFlowMapper  extends Mapper<LongWritable, Text, Text, Flow> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String date =value.toString();
        //通過空格分割
        String[] temp = date.split(" ");
        String phone=temp[0];
        context.write(new Text(phone),new Flow(Long.parseLong(temp[1]),Long.parseLong(temp[2])));
    }
}

自定義排序

mapReduce自動根據key排序,所以把Flow類作為key,重寫排序方法

1.Flow類繼承 WritableComparable介面
2.重寫 compareTo方法
@Override
public int compareTo(Flow o) {
return this.sumFlow > o.sumFlow ? -1 : 1;
}
3.Mapper: 把Flow 作為key ,value 任意
context.write(new Flow(Long.parseLong(temp[0]),Long.parseLong(temp[1]),Long.parseLong(temp[2])),
new Flow());
4.Reduce: 此時已經排序完成 獲取mapper 裡 key的phoneNo屬性 輸出
context.write(new Text(String.valueOf(key.getPhoneNo())),
new Text(upflow+" “+downflow+” "+sumflow));
執行結果:按照最後一列倒序排序

在這裡插入圖片描述