1. 程式人生 > >WordCount(mapreduce、yarn)

WordCount(mapreduce、yarn)

       作為一個hadoop的初學者,在經歷了一系列繁瑣複雜的hadoop叢集環境安裝配置之後,終於自主完成了一個wordcount程式。通過mapreduce進行分散式運算,並通過yarn進行執行排程。

      wordcount是一個經典的案例,相信大家都熟悉。主要任務就是計算每個單詞出現的次數並儲存。實現該過程,主要包括兩個階段:map階段: 將每一行文字資料變成<單詞,1>這樣的kv資料;reduce階段:將相同單詞的一組kv資料進行聚合,即累加所有的v。主要包括三個類的開發:WordcountMapper類開發;WordcountReducer類開發;JobSubmitter客戶端類開發。

     WordcountMapper類:

package ldp.wordcount;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


/**
 * KEYIN:是map task讀取到的資料的key型別,是一行的起始偏移量Long
 * VALUEIN:是map task讀取到的資料的value型別,是一行的內容String
 * KEYOUT:是使用者自定義map方法要返回的結果kv資料的key型別,在wordcount邏輯中,為單詞String
 * VALUEOUT:是使用者自定義map方法要返回的結果kv資料的value型別,在wordcount邏輯中,為整數Integer
 *
 * 在mapreduce中,map產生的資料需要傳輸給reduce,需要進行序列化和反序列化,所以hadoop設計了自己            
 * 的序列化機制
 * hadoop為jdk中的常用基本型別序列化介面:
 * Long     LongWritable
 * String   Text
 * Integer  IntWritable
 * Float    FloatWritable
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
		
			@Override
			protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
					throws IOException, InterruptedException {
				
				//切單詞,將每一行資料按照分割符" "切分
				String line = value.toString();
				String[] words = line.split(" ");
				for (String word : words) {
					context.write(new Text(word), new IntWritable(1));
				}				
			}			
}

  WordcountReducer類:

package ldp.wordcount;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
	
		//統計詞頻,values為一個迭代物件
		Iterator<IntWritable> iterator = values.iterator();

		int count = 0;
		
		while (iterator.hasNext()) {			
			IntWritable value = (IntWritable) iterator.next();
			count += value.get();			
		}
		context.write(key, new IntWritable(count));		
	}	
}

JobSubmitter客戶端類:

package ldp.mapreduce;

import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**用於提交mapreduce job的客戶端程式
 *功能:
 *1、封裝本次job執行時所需要的必要引數
 *2、跟yarn進行互動,將mapreduce程式成功的啟動執行
 */
public class JobSubmitter {
    	
	public static void main(String[] args) throws Exception {
				
		Configuration conf = new Configuration();
				
		Job job = Job.getInstance(conf);
		
		//封裝引數:jar包所在的位置
		job.setJarByClass(JobSubmitter.class);
		
		//封裝引數:本次job要呼叫的Mapper實現類、Reducer實現類
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);
		
		//封裝引數:本次job的Mapper實現類、Reducer實現類產生的結果資料的key、value型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
				
		//封裝引數:本次job要處理的輸入資料集所在路徑、最終結果的輸出路徑
          //注意:此時輸入路徑為hadoop叢集環境的目錄
		FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
		//注意:輸出路徑必須不存在,否則報錯
		FileOutputFormat.setOutputPath(job, new Path("/wordcount/output"));
		
		//封裝引數:想要啟動的reducer task的數量
		job.setNumReduceTasks(2);
		
		//提交job給yarn
		boolean res = job.waitForCompletion(true);
		
        //用於以後的日誌需要,可刪掉
		System.exit(res?0:-1);		
	}				
}

三個類都完成以後,就需要將工程打成一個jar包並上傳至linux伺服器。然後在hadoop叢集的機器上,用命令

hadoop jar wordcount.jar ldp.wordcount.JobSubmitter執行,hadoop jar命令會將這臺機器上的hadoop安裝目錄中的所有jar包和配置檔案全部加入執行時的classpath,此時執行結束,大功告成。

       同時,有時候需要除錯,也可以在本地執行,只需要把JobSubmitter客戶端類的

FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
FileOutputFormat.setOutputPath(job, new Path("/wordcount/output"));

修改成本地目錄,即:

FileInputFormat.setInputPaths(job, new Path("e:/wordcount/input"));
FileOutputFormat.setOutputPath(job, new Path("e:/wordcount/output"));

此時,mapreduce程式就會在本機執行,同時可以輕鬆除錯及debug。