1. 程式人生 > >Hadoop: MapReduce2多個job序列處理 複雜的MapReduce處理中,往往需要將複雜的處理過程,分解成多個簡單的Job來執行,第1個Job的輸出做為第2個Job的輸入,相互之間有一

Hadoop: MapReduce2多個job序列處理 複雜的MapReduce處理中,往往需要將複雜的處理過程,分解成多個簡單的Job來執行,第1個Job的輸出做為第2個Job的輸入,相互之間有一

複雜的MapReduce處理中,往往需要將複雜的處理過程,分解成多個簡單的Job來執行,第1個Job的輸出做為第2個Job的輸入,相互之間有一定依賴關係。以上一篇中的求平均數為例,可以分解成三個步驟:

1. 求Sum

2. 求Count

3. 計算平均數

每1個步驟看成一個Job,其中Job3必須等待Job1、Job2完成,並將Job1、Job2的輸出結果做為輸入,下面的程式碼演示瞭如何將這3個Job串起來

複製程式碼
  1 package yjmyzz.mr.job.link;
  2 
  3 import org.apache.hadoop.conf.Configuration;
  4 import
org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.DoubleWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Job; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.apache.hadoop.mapreduce.Reducer; 11
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 import yjmyzz.util.HDFSUtil; 14 15 import java.io.IOException; 16 17 18 public class Avg2 { 19 20 private static final Text TEXT_SUM = new Text("SUM");
21 private static final Text TEXT_COUNT = new Text("COUNT"); 22 private static final Text TEXT_AVG = new Text("AVG"); 23 24 //計算Sum 25 public static class SumMapper 26 extends Mapper<LongWritable, Text, Text, LongWritable> { 27 28 public long sum = 0; 29 30 public void map(LongWritable key, Text value, Context context) 31 throws IOException, InterruptedException { 32 sum += Long.parseLong(value.toString()); 33 } 34 35 protected void cleanup(Context context) throws IOException, InterruptedException { 36 context.write(TEXT_SUM, new LongWritable(sum)); 37 } 38 39 } 40 41 public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 42 43 public long sum = 0; 44 45 public void reduce(Text key, Iterable<LongWritable> values, Context context) 46 throws IOException, InterruptedException { 47 for (LongWritable v : values) { 48 sum += v.get(); 49 } 50 context.write(TEXT_SUM, new LongWritable(sum)); 51 } 52 53 } 54 55 //計算Count 56 public static class CountMapper 57 extends Mapper<LongWritable, Text, Text, LongWritable> { 58 59 public long count = 0; 60 61 public void map(LongWritable key, Text value, Context context) 62 throws IOException, InterruptedException { 63 count += 1; 64 } 65 66 protected void cleanup(Context context) throws IOException, InterruptedException { 67 context.write(TEXT_COUNT, new LongWritable(count)); 68 } 69 70 } 71 72 public static class CountReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 73 74 public long count = 0; 75 76 public void reduce(Text key, Iterable<LongWritable> values, Context context) 77 throws IOException, InterruptedException { 78 for (LongWritable v : values) { 79 count += v.get(); 80 } 81 context.write(TEXT_COUNT, new LongWritable(count)); 82 } 83 84 } 85 86 //計算Avg 87 public static class AvgMapper 88 extends Mapper<LongWritable, Text, LongWritable, LongWritable> { 89 90 public long count = 0; 91 public long sum = 0; 92 93 public void map(LongWritable key, Text value, Context context) 94 throws IOException, InterruptedException { 95 String[] v = value.toString().split("\t"); 96 if (v[0].equals("COUNT")) { 97 count = Long.parseLong(v[1]); 98 } else if (v[0].equals("SUM")) { 99 sum = Long.parseLong(v[1]); 100 } 101 } 102 103 protected void cleanup(Context context) throws IOException, InterruptedException { 104 context.write(new LongWritable(sum), new LongWritable(count)); 105 } 106 107 } 108 109 110 public static class AvgReducer extends Reducer<LongWritable, LongWritable, Text, DoubleWritable> { 111 112 public long sum = 0; 113 public long count = 0; 114 115 public void reduce(LongWritable key, Iterable<LongWritable> values, Context context) 116 throws IOException, InterruptedException { 117 sum += key.get(); 118 for (LongWritable v : values) { 119 count += v.get(); 120 } 121 } 122 123 protected void cleanup(Context context) throws IOException, InterruptedException { 124 context.write(TEXT_AVG, new DoubleWritable(new Double(sum) / count)); 125 } 126 127 } 128 129 130 public static void main(String[] args) throws Exception { 131 132 Configuration conf = new Configuration(); 133 134 String inputPath = "/input/duplicate.txt"; 135 String maxOutputPath = "/output/max/"; 136 String countOutputPath = "/output/count/"; 137 String avgOutputPath = "/output/avg/"; 138 139 //刪除輸出目錄(可選,省得多次執行時,總是報OUTPUT目錄已存在) 140 HDFSUtil.deleteFile(conf, maxOutputPath); 141 HDFSUtil.deleteFile(conf, countOutputPath); 142 HDFSUtil.deleteFile(conf, avgOutputPath); 143 144 Job job1 = Job.getInstance(conf, "Sum"); 145 job1.setJarByClass(Avg2.class); 146 job1.setMapperClass(SumMapper.class); 147 job1.setCombinerClass(SumReducer.class); 148 job1.setReducerClass(SumReducer.class); 149 job1.setOutputKeyClass(Text.class); 150 job1.setOutputValueClass(LongWritable.class); 151 FileInputFormat.addInputPath(job1, new Path(inputPath)); 152 FileOutputFormat.setOutputPath(job1, new Path(maxOutputPath)); 153 154 155 Job job2 = Job.getInstance(conf, "Count"); 156 job2.setJarByClass(Avg2.class); 157 job2.setMapperClass(CountMapper.class); 158 job2.setCombinerClass(CountReducer.class); 159 job2.setReducerClass(CountReducer.class); 160 job2.setOutputKeyClass(Text.class); 161 job2.setOutputValueClass(LongWritable.class); 162 FileInputFormat.addInputPath(job2, new Path(inputPath)); 163 FileOutputFormat.setOutputPath(job2, new Path(countOutputPath)); 164 165 166 Job job3 = Job.getInstance(conf, "Average"); 167 job3.setJarByClass(Avg2.class); 168 job3.setMapperClass(AvgMapper.class); 169 job3.setReducerClass(AvgReducer.class); 170 job3.setMapOutputKeyClass(LongWritable.class); 171 job3.setMapOutputValueClass(LongWritable.class); 172 job3.setOutputKeyClass(Text.class); 173 job3.setOutputValueClass(DoubleWritable.class); 174 175 //將job1及job2的輸出為做job3的輸入 176 FileInputFormat.addInputPath(job3, new Path(maxOutputPath)); 177 FileInputFormat.addInputPath(job3, new Path(countOutputPath)); 178 FileOutputFormat.setOutputPath(job3, new Path(avgOutputPath)); 179 180 //提交job1及job2,並等待完成 181 if (job1.waitForCompletion(true) && job2.waitForCompletion(true)) { 182 System.exit(job3.waitForCompletion(true) ? 0 : 1); 183 } 184 185 } 186 187 188 }
複製程式碼

輸入文字在上一篇可以找到,上面這段程式碼的主要思路:

1. Sum和Count均採用相同的輸入/input/duplicate.txt,然後將各自的處理結果分別輸出到/output/max/及/output/count/下

2. Avg從/output/max及/output/count獲取結果做為輸入,然後根據Key值不同,拿到sum和count的值,最終計算並輸出到/output/avg/下