MapReduce 按照Value值進行排序輸出
阿新 • • 發佈:2019-01-07
檔案輸入:
A 1
B 5
C 4
E 1
D 3
W 9
P 7
Q 2
檔案輸出:
W 9
P 7
B 5
C 4
D 3
Q 2
E 1
A 1
程式碼如下:
package comparator; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable.Comparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class comparator{ /** * @param args * @throws IOException * @throws IllegalArgumentException * @throws InterruptedException * @throws ClassNotFoundException */ public static class myComparator extends Comparator { @SuppressWarnings("rawtypes") public int compare( WritableComparable a,WritableComparable b){ return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } public static class Map extends Mapper<Object,Text,IntWritable,Text>{ public void map(Object key,Text value,Context context) throws NumberFormatException, IOException, InterruptedException{ String[] split = value.toString().split("\t"); context.write(new IntWritable(Integer.parseInt(split[1])),new Text(split[0]) ); } } public static class Reduce extends Reducer<IntWritable,Text,Text,IntWritable>{ public void reduce(IntWritable key,Iterable<Text>values,Context context) throws IOException, InterruptedException{ for (Text text : values) { context.write( text,key); } } } public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Job job = new Job(); job.setJarByClass(comparator.class); job.setNumReduceTasks(1); //設定reduce程序為1個,即output生成一個檔案 job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); //為job的輸出資料設定key類 job.setOutputValueClass(IntWritable.class); //為job的輸出設定value類 job.setSortComparatorClass( myComparator.class); //自定義排序 FileInputFormat.addInputPath(job, new Path(args[0])); //設定輸入檔案的目錄 FileOutputFormat.setOutputPath(job,new Path(args[1])); //設定輸出檔案的目錄 System.exit(job.waitForCompletion(true)?0:1); //提交任務 } }