1. 程式人生 > >MapReduce的wordcount實現

MapReduce的wordcount實現

MapReduce的wordcount主要分為兩個階段,Map和Reduce階段,具體流程如下圖。

(1)MapReduce有一個預設的排序規則,是按照字典順序排序的(大寫字母順序->小寫字母順序->數字順序)

(2)part-r-00000檔案 中的part是分割槽的意思,MapReduce預設只有一個分割槽

(3)_SUCCESS  是執行MapReduce成功的標誌檔案

在yarn上檢視MapReduce的執行過程,分為三個階段

 

map階段的程式

package wc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable ,Text ,Text,IntWritable>{
	
	protected void map(LongWritable key1,Text value1,Context context)
		throws IOException,InterruptedException {
		/*
		 * context  表示Mapper的上下文
		 * 上文:HDFS
		 * 下文:Mapper
		 */
		//資料:I love Beijing
		String data = value1.toString();
		
		//分詞
		String[] words = data.split(" ");
		
		//輸出K2  V2
		for(String w:words) {
			context.write(new Text(w), new IntWritable(1));
		}
	}
}

Reduce階段的程式

package wc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
	
	protected void reduce(Text k3,Iterable<IntWritable> v3,Context context) throws IOException, InterruptedException {
		/*
		 * context是reduce的上下文
		 * 上文
		 * 下文
		 */
		//對v3求和
		int total = 0;
		for(IntWritable v:v3) {
			total += v.get();
		}
		
		//輸出        k4   單詞         v4   頻率
		context.write(k3, new IntWritable(total));
	}

}

主程式

package wc;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountMain {

	public static void main(String[] args) throws Exception {
		
		//建立一個job和任務入口
		Job job = Job.getInstance(new Configuration());
		//main方法所在的class
		job.setJarByClass(WordCountMain.class);
		
		//指定job的mapper和輸出的型別<k2 v2>
		job.setMapperClass(WordCountMapper.class);
		job.setMapOutputKeyClass(Text.class);   //k2的型別
		job.setMapOutputValueClass(IntWritable.class);   //v2的型別
		
		//指定job的Reducer和輸出的型別<k4 v4>
		job.setReducerClass(WordCountReducer.class);
		job.setOutputKeyClass(Text.class);    //k4的型別
		job.setOutputValueClass(IntWritable.class);   //v4的型別
		
		//指定job的輸入和輸出
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//執行job
		job.waitForCompletion(true);
		
		
	}

}

 

在程式的過程中出現的錯誤

第一行多出現了一個數字1,是因為在test.txt檔案中,最後多寫了空格

這種情況是reducer出現了問題

 

多寫了一個r字母

正確的結果為