MapReduce 自定義屬性類(輸出電話號對應的上行,下行流量及其總計,並排序)
阿新 • • 發佈:2018-11-09
MapReduce 自定義屬性類
注意要點:
- 無參構造方法
- 繼承 Writable類
- 重寫write() readFields()方法
相關錯誤:
-
java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodExcep(原因 沒有無參構造方法)
-
java.io.IOException: Initialization of all the collectors failed. Error in last collector was :null(導包錯誤)
-
空指標 (沒有繼承 Writable類)
-
屬性是空值(沒有重寫 write() readFields()方法,輸出000)
程式碼如下:
public class Flow implements Writable { @Override public void write(DataOutput output) throws IOException { output.writeLong(upFlow); output.writeLong(downFlow); output.writeLong(sumFlow); } @Override public void readFields(DataInput input) throws IOException { upFlow = input.readLong(); downFlow = input.readLong(); sumFlow = input.readLong(); } //排序 繼承WritableComparable<Flow>介面 @Override public int compareTo(Flow o) { return this.sumFlow > o.sumFlow ? -1 : 1; } } public class PhoneFlowMaster { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { System.setProperty("hadoop.home.dir", "C:\\Program Files\\hadoop-2.7.1"); Path inputPath = new Path("/input/flow.txt"); //處理結果存放目錄 Path outputPath = new Path("/output/181020"); //處理結果存放目錄 Configuration conf=new Configuration() ; FileSystem fs = FileSystem.get(conf); /**初始化job引數, * 1.指定job名稱 * 2.指定類 3 * 3.指定資料型別 2 * 4.指定路徑 2 * 5.result */ Job job =Job.getInstance(conf,"PhoneFlow"); //設定job執行的類 job.setJarByClass(PhoneFlowMaster.class); //設定Mapper類 job.setMapperClass(PhoneFlowMapper.class); //設定Reduce類 job.setReducerClass(PhoneFlowReduce.class); //設定Map輸出資料型別 key value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Flow.class); //Reduce輸出資料型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //set 0 part-m 中間結果map reduce 無意義 可以註釋掉輸出資料型別 //job.setNumReduceTasks(0); //設定輸入路徑 lib FileInputFormat.setInputPaths(job,inputPath); //設定輸出路徑 lib FileOutputFormat.setOutputPath(job,outputPath); //路徑如果存在必須先刪除 if(fs.exists(outputPath)){ fs.delete(outputPath,true); } boolean result=job.waitForCompletion(true); if(result){ System.out.println("Congratulations! success"); } } } public class PhoneFlowReduce extends Reducer<Text, Flow, Text,Text> { //相同時候呼叫一次 @Override protected void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException { long upflow=0; long downflow=0; long sumflow=0; for(Flow value :values){ upflow+=value.getUpFlow(); downflow+=value.getDownFlow(); sumflow+=value.getSumFlow(); } context.write(key,new Text(upflow+" "+downflow+" "+sumflow)); } } public class PhoneFlowMapper extends Mapper<LongWritable, Text, Text, Flow> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String date =value.toString(); //通過空格分割 String[] temp = date.split(" "); String phone=temp[0]; context.write(new Text(phone),new Flow(Long.parseLong(temp[1]),Long.parseLong(temp[2]))); } }
自定義排序
mapReduce自動根據key排序,所以把Flow類作為key,重寫排序方法
1.Flow類繼承 WritableComparable介面
2.重寫 compareTo方法
@Override
public int compareTo(Flow o) {
return this.sumFlow > o.sumFlow ? -1 : 1;
}
3.Mapper: 把Flow 作為key ,value 任意
context.write(new Flow(Long.parseLong(temp[0]),Long.parseLong(temp[1]),Long.parseLong(temp[2])),
new Flow());
4.Reduce: 此時已經排序完成 獲取mapper 裡 key的phoneNo屬性 輸出
context.write(new Text(String.valueOf(key.getPhoneNo())),
new Text(upflow+" “+downflow+” "+sumflow));
執行結果:按照最後一列倒序排序