1. 程式人生 > >在hadoop上進行編寫mapreduce程式,統計關鍵詞在text出現次數

在hadoop上進行編寫mapreduce程式,統計關鍵詞在text出現次數

mapreduce的處理過程分為2個階段,map階段,和reduce階段。在要求統計指定檔案中的所有單詞的出現次數時,
map階段把每個關鍵詞寫到一行上以逗號進行分隔,並初始化數量為1(相同的單詞hadoop中的map會自動放到一行中)
reduce階段是把每個單詞出現的頻率統計出來重新寫回去。

如程式碼:

package com.clq.hadoop2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	final Text key2 = new Text();
	// value2 表示單詞在該行中的出現次數
	final IntWritable value2 = new IntWritable(1);
	// key 表示文字行的起始位置
	// value 表示文字行
	protected void map(LongWritable key, Text value, Context context)
			throws java.io.IOException, InterruptedException {
		final String[] splited = value.toString().split(",");
		for (String word : splited) {
			key2.set(word);
			// 把key2、value2寫入到context中
			context.write(key2, value2);
		}
	}
}

package com.clq.hadoop2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
	// value3表示單詞出現的總次數
	final IntWritable value3 = new IntWritable(0);
	/**
	 * key 表示單詞 values 表示map方法輸出的1的集合 context 上下文物件
	 */
	protected void reduce(Text key, java.lang.Iterable<IntWritable> values,
			Context context) throws java.io.IOException, InterruptedException {
		int sum = 0;
		for (IntWritable count : values) {
			sum += count.get();
		}
		// 執行到這裡,sum表示該單詞出現的總次數
		// key3表示單詞,是最後輸出的key
		final Text key3 = key;
		// value3表示單詞出現的總次數,是最後輸出的value
		value3.set(sum);
		context.write(key3, value3);
	}
}

package com.clq.hadoop2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MapperReducer {

	public static void main(String[] args) throws IOException,
			InterruptedException, ClassNotFoundException {
	        //指定輸入和輸出路徑
		final String INPUT_PATH = "hdfs://ubuntu:9000/Input";
		final String OUTPUT_PATH = "hdfs://ubuntu:9000/output";
		//建立一個job物件封裝執行時所需要的資訊
		final Job job = new Job(new Configuration(),"MapperReducer");
		//打成jar執行
		job.setJarByClass(MapperReducer.class);
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		//指定自己自定義的mapper類
		job.setMapperClass(MyMapper.class);
		//指定執行mapper型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		//指定自己定義的reducer類
		job.setReducerClass(MyReducer.class);
		//指定reducer的key和value型別
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.waitForCompletion(true);
		
		 
	}
}