1. 程式人生 > >白話Hadoop入門-WordCount詳細講解(2)

白話Hadoop入門-WordCount詳細講解(2)

     前一篇部落格講述瞭如何進行Hadoop壞境的搭建,以及第一個傳輸檔案程式的編寫,通過第一個檔案可能大概對Hadoop有一個瞭解了,但是Hadoop的精髓在於mapreduce,下面我們就來看看如何編寫Hadoop的第一個“hello world”程式--也就是WordCount程式。

    有很多的部落格講述Wordcount是什麼,但是沒有對裡面的程式碼進行詳細講解,導致很多的入門者卡在了這塊

    1、MapReduce工作流程

     mapreduce是如何讓工作的,以Wordcount為例,首先mapreduce分為兩個階段,分別是map階段和reduce階段,其中比如我們的data檔案是這樣的,一共三行。

hadoop is a good tool
python is a good language
python is a bad language

map階段:主要為分別讀取檔案的每一行,然後進行單詞的劃分,形成<key,value>對,eg: <hadoop,1>,<is,1>........等等這樣的形式,然後進行傳送,reduce進行接收,自至於傳送的細節,這都是Hadoop封裝好的。

reduce階段:因為當我們有一個較大的檔案時,會啟動多個map節點,因此reduce會接到多個map發過來的資料,並且自動將相同的key進行整合,所以reduce接到的就是這樣形式的<key,iter<value,value,...>, eg: <hadoop,1>,<is,{1,1,1}>,<python,{1,1}>....這樣形式。因此reduce程式只需要做的就是講這些iter裡面的value進行累加。

所以很簡單的邏輯,下面是實現的具體過程和程式碼。

2、總體架構

     總體分別三個檔案一個map.class(map邏輯處理程式) 一個reduce.class(reduce邏輯處理程式) 一個runner.class(老大,調配前面兩個程式,並進行job的提交)

    提前準備配置:右鍵工程在property裡面的java build path裡面新增我們需要的jar包,否則後邊無法進行,並且最好講配置好的core-site.xml hdfs-site.xml檔案新增進行,以供conf進行載入。

3、map邏輯處理程式

package wyk_firsthadoop;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import io.netty.util.internal.StringUtil;
//其中LongWritable和Text對應著long和string,這裡就是Hadoop定義的一種序列化的型別,方便在網路間//進行傳輸
//繼承來自Hadoop的mapper類,四個引數分別為keyin.valuein,keyout,valueout.這裡僅僅是定義其型別
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
	@Override
        //map接受的引數,key value,和一個配置引數
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		//將Text轉到string格式
    		String line=value.toString();
		//line就是data中的一行,接下來進行切分
    		String[] words=StringUtil.split(line,' ');
	        //context.write將我們的統計資訊傳送出去,reduce進行接收,還是鍵值對的形式
		for(String word : words) {
			//new Text(word)將string再次轉換為Text格式
			context.write(new Text(word), new LongWritable(1));
		}
	}
}

4、reduce邏輯處理程式

package wyk_firsthadoop;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
//還是繼承來自Hadoop的reducer類,其中四個引數分別為keyin,valuein,keyout,valueout.
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
	
	@Override
        //這裡的引數分別為value就是前面說的是一個迭代器
	protected void reduce(Text key, Iterable<LongWritable> values,
			Context context) throws IOException, InterruptedException {
                //宣告一個計數器
		long count =0;
		//進行累加
		for(LongWritable value:values) {
			count+=value.get();   //get方法是進行資料型別的轉換to long
		}
		//繼續呼叫context.write,進行傳送
		context.write(key, new LongWritable(count));
		
	}
}

5、總體調配程式

     mapreduce的提交需要以job形式來提交,前面定義好的map  reduce的處理邏輯,那麼如何執行這兩個程式呢?怎麼才能及交給Hadoop?這裡就要統一進行調配,形成job提交,並執行。

package wyk_firsthadoop;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;

public class runner {

	public static void main(String[] args) throws IOException, Exception, InterruptedException {
// mapreduce的提交需要以job形式來提交,前面定義好的mapreduce的處理邏輯,這裡            
//就要統一進行調配,形成job提交,病執行

    		Configuration conf=new Configuration();
		//建立job例項
		Job job=Job.getInstance(conf);
		//設定生成的jar包 所對應的檔案
		job.setJarByClass(runner.class);
		//設定哪一個是map處理程式,哪一個是reduce程式
		job.setMapperClass(WCMapper.class);
		job.setReducerClass(WCReducer.class);
		//設定輸出資料的格式,這裡分別設定了key value的格式
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		//設定輸入資料的路徑,檔案位於HDFS的根目錄下
		FileInputFormat.setInputPaths(job, new Path("/test_data.txt"));
		//設定輸出檔案的路徑,注意這裡一定要是不存在的資料夾,不然會報錯-已存在
		FileOutputFormat.setOutputPath(job, new Path("/wyk_out"));
		//提交job,並執行
		job.waitForCompletion(true);
		
	}

}

6、NEXT

     (1)將上述的程式所在的工程打成一個jar包。

     (2)自己寫一個簡單的data檔案,並上傳至HDFS,命令:hadoop fs -put xxxx.txt  /    放置根目錄下

     (3)執行haoop jar worcount_wyk.jar wyk_firsthadoop.runner

       (4)檢視輸出結果 hadoop fs -cat  /wyk_out/part-5-0000      大家視情況而定,可能檔名不一樣

輸出結果類似:

hadoop 1
python 2
is     3
tool   1
.....

小tips: 當不知道命令的如何使用時,比如hadoop,可以直接打出hadoop回車,會自動給出提示的,按照提示就可以一步步進行的,並且hadoop的很多指令和linux下的一樣,只不過要hadoop fs 字首就行。