1. 程式人生 > >MapReduce教程(一)基於MapReduce框架開發<轉>

MapReduce教程(一)基於MapReduce框架開發<轉>

mat 路徑 重寫 combine 自定義 單詞 tools 必須 www.

1 MapReduce編程

1.1 MapReduce簡介

MapReduce是一種編程模型,用於大規模數據集(大於1TB)的並行運算,用於解決海量數據的計算問題。

MapReduce分成了兩個部分:

1、映射(Mapping)對集合裏的每個目標應用同一個操作。即,如果你想把表單裏每個單元格乘以二,那麽把這個函數單獨地應用在每個單元格上的操作就屬於mapping。

2、化簡(Reducing)遍歷集合中的元素來返回一個綜合的結果。即,輸出表單裏一列數字的和這個任務屬於reducing。

你向MapReduce框架提交一個計算作業時,它會首先把計算作業拆分成若幹個Map任務,然後分配到不同的節點上去執行,

每一個Map任務處理輸入數據中的一部分,當Map任務完成後,它會生成一些中間文件,這些中間文件將會作為Reduce任務的輸入數據。

Reduce任務的主要目標就是把前面若幹個Map的輸出匯總到一起並輸出。

MapReduce的偉大之處就在於編程人員在不會分布式並行編程的情況下,將自己的程序運行在分布式系統上。

1.2 MapReduce運行原理

MapReduce論文流程圖 - 1.1

一切都是從最上方的user program開始的,user program鏈接了MapReduce庫,實現了最基本的Map函數和Reduce函數。圖中執行的順序都用數字標記了。

1、MapReduce庫先把user program的輸入文件劃分為M份(M為用戶定義),每一份通常有16MB到64MB,如圖左方所示分成了split0~4;然後使用fork將用戶進程拷貝到集群內其它機器上。

2、user program的副本中有一個稱為master,其余稱為worker,master是負責調度的,為空閑worker分配作業(Map作業3或者Reduce作業),worker的數量也是可以由用戶指定的。

3、被分配了Map作業的worker,開始讀取對應分片的輸入數據,Map作業數量是由M決定的,和split一一對應;Map作業從輸入數據中抽取出鍵值對,每一個鍵值對都作為參數傳遞給map函數,map函數產生的中間鍵值對被緩存在內存中。

4、緩存的中間鍵值對會被定期寫入本地磁盤,而且被分為R個區,R的大小是由用戶定義的,將來每個區會對應一個Reduce作業;這些中間鍵值對的位置會被通報給master,master負責將信息轉發給Reduce worker。

5、master通知分配了Reduce作業的worker它負責的分區在什麽位置(肯定不止一個地方,每個Map作業產生的中間鍵值對都可能映射到所有R個不同分區),當Reduce worker把所有它負責的中間鍵值對都讀過來後,先對它們進行排序,使得相同鍵的鍵值對聚集在一起。因為不同的鍵可能會映射到同一個分區也就是同一個Reduce作業(誰讓分區少呢),所以排序是必須的。

6、reduce worker遍歷排序後的中間鍵值對,對於每個唯一的鍵,都將鍵與關聯的值傳遞給reduce函數,reduce函數產生的輸出會添加到這個分區的輸出文件中。

7、當所有的Map和Reduce作業都完成了,master喚醒正版的user program,MapReduce函數調用返回user program的代碼

8、所有執行完畢後,MapReduce輸出放在了R個分區的輸出文件中(分別對應一個Reduce作業)。用戶通常並不需要合並這R個文件,而是將其作為輸入交給另一個MapReduce程序處理。整個過程中,輸入數據是來自底層分布式文件系統(GFS)的,中間數據是放在本地文件系統的,最終輸出數據是寫入底層分布式文件系統(GFS)的。而且我們要註意Map/Reduce作業和map/reduce函數的區別:Map作業處理一個輸入數據的分片,可能需要調用多次map函數來處理每個輸入鍵值對;Reduce作業處理一個分區的中間鍵值對,期間要對每個不同的鍵調用一次reduce函數,Reduce作業最終也對應一個輸出文件。

HadoopMapReduce模型實現圖– 1.2

1.3 輸入與輸出

Map/Reduce框架運轉在<key, value>鍵值對上,也就是說,框架把作業的輸入看為是一組<key, value>鍵值對,同樣也產出一組 <key, value>鍵值對做為作業的輸出,這兩組鍵值對的類型可能不同。

框架需要對key和value的類(classes)進行序列化操作,因此,這些類需要實現Writable接口。另外,為了方便框架執行排序操作,key類必須實現 WritableComparable接口。

一個Map/Reduce作業的輸入和輸出類型如下所示:

(input) <k1, v1> -> map -> <k2, v2>-> combine -> <k2, v2> -> reduce -> <k3, v3> (output)。

1.4 Writable接口

Writable接口是一個實現了序列化協議的序列化對象。

在Hadoop中定義一個結構化對象都要實現Writable接口,使得該結構化對象可以序列化為字節流,字節流也可以反序列化為結構化對象。

Java基本類型

Writable使用序列化大小

字節

布爾型

BooleanWritable

1

字節型

ByteWritable

1

整型

IntWritable

4

整型

VIntWritable

1-5

浮點型

FloatWritable

4

長整型

LongWritable

8

長整型

VLongWritable

1-9

雙精度浮點型

DoubleWritable

8

Text類型對應

java的string

2 MapReduce編程

2.1 準備數據

1、 在/home路徑下,新建words.txt文檔,文檔內容如下:

hello tom

hello jerry

hello kitty

hello world

hello tom

2、 上傳到hdfs文件服務器/hadoop目錄下:

執行命令:hadoop fs -put /home/words.txt /hadoop/words.txt

執行命令:hadoop fs -cat /hadoop/words.txt

2.2 WordCount v1.0代碼編寫

WordCount是一個簡單的應用,它可以計算出指定數據集中每一個單詞出現的次數。

1、 在pom.xml引入Jar包:

[html] view plain copy
  1. <!-- 引入hadoop-common Jar包 -->
  2. <dependency>
  3. <groupId>org.apache.hadoop</groupId>
  4. <artifactId>hadoop-common</artifactId>
  5. <version>2.7.1</version>
  6. </dependency>
  7. <!-- 引入hadoop-mapreduce-client-core Jar包 -->
  8. <dependency>
  9. <groupId>org.apache.hadoop</groupId>
  10. <artifactId>hadoop-mapreduce-client-core</artifactId>
  11. <version>2.7.1</version>
  12. </dependency>

2、 WCMapper代碼編寫:

[java] view plain copy
  1. package com.hadoop.mapreduce;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. /*
  7. * 繼承Mapper類需要定義四個輸出、輸出類型泛型:
  8. * 四個泛型類型分別代表:
  9. * KeyIn Mapper的輸入數據的Key,這裏是每行文字的起始位置(0,11,...)
  10. * ValueIn Mapper的輸入數據的Value,這裏是每行文字
  11. * KeyOut Mapper的輸出數據的Key,這裏是每行文字中的單詞"hello"
  12. * ValueOut Mapper的輸出數據的Value,這裏是每行文字中的出現的次數
  13. *
  14. * Writable接口是一個實現了序列化協議的序列化對象。
  15. * 在Hadoop中定義一個結構化對象都要實現Writable接口,使得該結構化對象可以序列化為字節流,字節流也可以反序列化為結構化對象。
  16. * LongWritable類型:Hadoop.io對Long類型的封裝類型
  17. */
  18. public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
  19. /**
  20. * 重寫Map方法
  21. */
  22. @Override
  23. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
  24. throws IOException, InterruptedException {
  25. // 獲得每行文檔內容,並且進行折分
  26. String[] words = value.toString().split(" ");
  27. // 遍歷折份的內容
  28. for (String word : words) {
  29. // 每出現一次則在原來的基礎上:+1
  30. context.write(new Text(word), new LongWritable(1));
  31. }
  32. }
  33. }

3、 WCReducer代碼編寫:

[java] view plain copy
  1. package com.hadoop.mapreduce;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Reducer;
  6. /*
  7. * 繼承Reducer類需要定義四個輸出、輸出類型泛型:
  8. * 四個泛型類型分別代表:
  9. * KeyIn Reducer的輸入數據的Key,這裏是每行文字中的單詞"hello"
  10. * ValueIn Reducer的輸入數據的Value,這裏是每行文字中的次數
  11. * KeyOut Reducer的輸出數據的Key,這裏是每行文字中的單詞"hello"
  12. * ValueOut Reducer的輸出數據的Value,這裏是每行文字中的出現的總次數
  13. */
  14. public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
  15. /**
  16. * 重寫reduce方法
  17. */
  18. @Override
  19. protected void reduce(Text key, Iterable<LongWritable> values,
  20. Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
  21. long sum = 0;
  22. for (LongWritable i : values) {
  23. // i.get轉換成long類型
  24. sum += i.get();
  25. }
  26. // 輸出總計結果
  27. context.write(key, new LongWritable(sum));
  28. }
  29. }

4、 WordCount代碼編寫:

[java] view plain copy
  1. package com.hadoop.mapreduce;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. public class WordCount {
  11. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  12. // 創建job對象
  13. Job job = Job.getInstance(new Configuration());
  14. // 指定程序的入口
  15. job.setJarByClass(WordCount.class);
  16. // 指定自定義的Mapper階段的任務處理類
  17. job.setMapperClass(WCMapper.class);
  18. job.setMapOutputKeyClass(Text.class);
  19. job.setMapOutputValueClass(LongWritable.class);
  20. // 數據HDFS文件服務器讀取數據路徑
  21. FileInputFormat.setInputPaths(job, new Path("/hadoop/words.txt"));
  22. // 指定自定義的Reducer階段的任務處理類
  23. job.setReducerClass(WCReducer.class);
  24. // 設置最後輸出結果的Key和Value的類型
  25. job.setOutputKeyClass(Text.class);
  26. job.setOutputValueClass(LongWritable.class);
  27. // 將計算的結果上傳到HDFS服務
  28. FileOutputFormat.setOutputPath(job, new Path("/hadoop/wordsResult"));
  29. // 執行提交job方法,直到完成,參數true打印進度和詳情
  30. job.waitForCompletion(true);
  31. System.out.println("Finished");
  32. }
  33. }

2.3 生成JAR包

1、 選擇hdfs項目->右擊菜單->Export…,在彈出的提示框中選擇Java下的JAR file:

2、 設置導出jar名稱和路徑,選擇Next>:

3、 設置程序的入口,設置完成後,點擊Finish:

4、 成生wc.jar如下文件,如下圖:

2.4 執行JAR運行結果

1、 在開Xft軟件,將D:盤的wc.jar上傳到Linux/home路徑下:

2、 執行命令

切換目錄命令:cd /home/

執行JAR包命令:hadoop jar wc.jar

3、 查看執行結果

執行命令:hadoop fs -ls /hadoop/wordsResult

執行命令:hadoop fs -cat /hadoop/wordsResult/part-r-00000

--以上為《MapReduce教程(一)基於MapReduce框架開發》,如有不當之處請指出,我後續逐步完善更正,大家共同提高。謝謝大家對我的關註。

轉自 http://blog.csdn.net/yuan_xw/article/details/50532368

MapReduce教程(一)基於MapReduce框架開發<轉>