1. 程式人生 > >Hadoop-MapReduce初步應用-統計單詞個數

Hadoop-MapReduce初步應用-統計單詞個數

參考官網的單詞統計,上傳文字檔案讀取資料,統計等,

首先準備好文字檔案,隨便寫點單詞,再看統計結果正確與否。註釋都 寫在程式碼裡了,希望能幫到入門的開發人員

專案結構如下,讀出的資料一起發出來了

package hadoop.com.test;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
	 
	public static class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable>{
		private final IntWritable one=new IntWritable(1);
		private Text word=new Text();
		
		/**
	     * map方法完成工作就是讀取檔案
	     * 將檔案中每個單詞作為key鍵,值設定為1,
	     * 然後將此鍵值對設定為map的輸出,即reduce的輸入
	     */
		public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
			String line=value.toString();
			StringTokenizer token=new StringTokenizer(line);
			while(token.hasMoreTokens()){
				word.set(token.nextToken());
				context.write(word, one);
			}
		}	
	}
	/**
     * reduce的輸入即是map的輸出,將相同鍵的單詞的值進行統計累加
     * 即可得出單詞的統計個數,最後把單詞作為鍵,單詞的個數作為值,
     * 輸出到設定的輸出檔案中儲存
     */
	public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
		public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
			int sum=0;
			for(IntWritable val:values){
				sum+=val.get();
			}
			context.write(key, new IntWritable(sum));
		}
	}
 
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		Configuration conf=new Configuration();
		Job job=Job.getInstance(conf);
		//Job job= new Job();
		job.setJarByClass(WordCount.class);
		job.setJobName("wordcount");
		
		//對應單詞字串
		job.setOutputKeyClass(Text.class);
		 //對應單詞的統計個數 int型別
		job.setOutputValueClass(IntWritable.class);
		
		//設定mapper類
		job.setMapperClass(WordCountMap.class);
		 /**
         * 設定合併函式,合併函式的輸出作為Reducer的輸入,
         * 提高效能,能有效的降低map和reduce之間資料傳輸量。 但是合併函式不能濫用。需要結合具體的業務。
         * 由於本次應用是統計單詞個數,所以使用合併函式不會對結果或者說業務邏輯結果產生影響。
         * 當對於結果產生影響的時候,是不能使用合併函式的。
         * 例如:我們統計單詞出現的平均值的業務邏輯時,就不能使用合併 函式。此時如果使用,會影響最終的結果。
         */
		//job.setCombinerClass(WordCountReduce.class);
		job.setReducerClass(WordCountReduce.class);
		 /**
         * 設定輸入格式,TextInputFormat是預設的輸入格式
         * 這裡可以不寫這句程式碼。它的值型別是Text型別(文字型別)
         */
		job.setInputFormatClass(TextInputFormat.class);
	    /**
         * 設定輸出格式,TextOutpuTFormat是預設的輸出格式
         * 每條記錄寫為文字行,它的鍵和值可以是任意型別,輸出回撥用toString()
         * 輸出字串寫入文字中。預設鍵和值使用製表符進行分割。
         */
		job.setOutputFormatClass(TextOutputFormat.class);
		
		//設定輸入資料檔案路徑
		FileInputFormat.addInputPaths(job, "hdfs://192.168.1.128:9000/input/word.txt");
		//設定輸出資料檔案路徑(該路徑不能存在,否則異常)
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.1.128:9000/output/output-out"));//hdfs://192.168.1.128:9000/wordcount-out  file:\\E:\\output-out

		job.waitForCompletion(true);
		System.exit(0);
	}
}