1. 程式人生 > >MapReduce實戰一手寫WordCount案例

MapReduce實戰一手寫WordCount案例

需求: 在一堆給定的文字檔案中統計輸出每一個單詞出現的總次數 如下圖所示為MapReduce統計WordCount的分析圖:

map階段從檔案中讀取資料,行號作為key,讀取的每行值作為value,將每個key/value對輸出給reduce階段,reduce階段將map階段所有執行完的結果進行reduce操作,每個相同的key執行一次reduce方法。

程式碼如下:

WordCountMapper.java

package com.lxj.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//Map階段:輸入的行號作為key,每行讀取的值作為value
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

	private Text k  = new Text();
	private IntWritable v = new IntWritable(1);
	
	@Override
	protected void map(LongWritable key, Text value,Context context) throws java.io.IOException, java.lang.InterruptedException {
	     
		// 1 將每次讀入的一行進行分割
		String line = value.toString();
		
		// 2 轉換成String型別進行分割
		String[] words = line.split(" ");
		
		// 3 將每個鍵值對都寫出
		for (String word : words) {
			String trim = word.trim();
			if(!" ".equals(trim)){
				k.set(trim);
				// 4 map階段將單詞拆分,並不合併,所以固定值為1
				context.write(k, v);
			}
		}
	}
	
}

WordCountReducer.java

package com.lxj.wc;

import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

//Reduce階段是以Map階段的輸出結果作為Reduce階段的輸入資料
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

	
	//同一個key有且僅只執行一次reduce方法
	@Override
	protected void reduce(Text text, Iterable<IntWritable> iterable, Context context) throws java.io.IOException, java.lang.InterruptedException {
	    
		// 1. 將map階段同一個key對應的value值求和
		int sum = 0;
		Iterator<IntWritable> iterator = iterable.iterator();
		while(iterator.hasNext()){
			 sum += iterator.next().get();
		}
		if(!text.toString().trim().equals("")){
			//將結果輸出
			context.write(text, new IntWritable(sum));
		}
	}
	
}

WordCountDriver.java

package com.lxj.wc;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//驅動類,將map與reduce進行關聯
public class WordCountDriver {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// 1.獲取配置資訊
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);
		
		// 2.設定載入jar的位置路徑,直接傳入當前Class物件
		job.setJarByClass(WordCountDriver.class);
		
		// 3.設定map和reduce類
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		
		// 4.設定map的輸出型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		
		// 5.設定最終的輸出
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		// 6.設定輸入和輸出路徑
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		// 7.提交
		boolean result = job.waitForCompletion(true);
		System.exit( result ? 0 : 1);
		
	}
}

準備如下檔案:

一 本地方法測試結果如下:

Astonished	1
At	1
But	1
Fate	1
He	2
Immediately	1
Many	1
O	1
Phoenix	1
a	1
admired,	1
again	1
ages	1
al	1
amongst	1
an	1
and	5
animals,	1
appeared	1
around	1
at	1
away	1
beasts,	1
beauty,	1
been	2
began	1
being	1
birds	1
both	1
broke	1
compassion,	1
different	1
elasticserach	1
euraka	1
eye	1
flocked	1
friend	1
great	1
had	2
hadoop	1
hard	1
has	2
he	1
him	3
his	1
in	2
into	1
javaee	1
kinds	1
know	1
last	1
look	1
loved	1
loving	1
map	1
mate	1
most	1
mysql	1
neither	1
never	1
nor	1
now	1
of	4
or	1
out	1
passed	1
phoenix	1
pleasure	1
praise.	1
prudent	1
redis	2
reduce	1
seen	1
shiro	1
short	1
sighed	1
since	1
spark	1
ssh	1
ssm	1
stared	1
the	5
them	1
they	2
time,	1
to	2
unhappy	1
upon	1
will	1
wisest	1
with	1
world.	1
yarn	1
zookeeper	1

二 Hadoop叢集上執行如下:

首先將專案打成jar包,然後上傳到HDFS上面進行分析,並執行以下命令:

執行成功之後檢視結果:

當然也可以直接在web端下載檢視: