1. 程式人生 > >MapReduce程式設計:詞頻統計

MapReduce程式設計:詞頻統計

首先在專案的src檔案中需要加入以下檔案,log4j的內容為:

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File
=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

 

 1     package org.apache.hadoop.examples;
 2      
 3     import java.io.IOException;
 4     import java.util.Iterator;
 5     import java.util.StringTokenizer;
6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.util.GenericOptionsParser; 16 17 public class WordCount { 18 public WordCount() { 19 } 20 21 public static void main(String[] args) throws Exception { 22 Configuration conf = new Configuration(); 23 24 //String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs(); 25 String[] otherArgs = new String[]{"input","output"}; 26 if(otherArgs.length < 2) { 27 System.err.println("Usage: wordcount <in> [<in>...] <out>"); 28 System.exit(2); 29 } 30 31 Job job = Job.getInstance(conf, "word count"); 32 job.setJarByClass(WordCount.class); 33 job.setMapperClass(WordCount.TokenizerMapper.class); 34 job.setCombinerClass(WordCount.IntSumReducer.class); 35 job.setReducerClass(WordCount.IntSumReducer.class); 36 job.setOutputKeyClass(Text.class); 37 job.setOutputValueClass(IntWritable.class); 38 39 for(int i = 0; i < otherArgs.length - 1; ++i) { 40 FileInputFormat.addInputPath(job, new Path(otherArgs[i])); 41 } 42 43 FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); 44 System.exit(job.waitForCompletion(true)?0:1); 45 } 46 47 public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 48 private IntWritable result = new IntWritable(); 49 50 public IntSumReducer() { 51 } 52 53 public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { 54 int sum = 0; 55 56 IntWritable val; 57 for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) { 58 val = (IntWritable)i$.next(); 59 } 60 61 this.result.set(sum); 62 context.write(key, this.result); 63 } 64 } 65 66 67 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { 68 private static final IntWritable one = new IntWritable(1); 69 private Text word = new Text(); 70 71 public TokenizerMapper() { 72 } 73 74 public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { 75 StringTokenizer itr = new StringTokenizer(value.toString()); 76 77 while(itr.hasMoreTokens()) { 78 this.word.set(itr.nextToken()); 79 context.write(this.word, one); 80 } 81 82 } 83 } 84 }