MapReduce教程(一)基於MapReduce框架開發<轉>
1 MapReduce編程
1.1 MapReduce簡介
MapReduce是一種編程模型,用於大規模數據集(大於1TB)的並行運算,用於解決海量數據的計算問題。
MapReduce分成了兩個部分:
1、映射(Mapping)對集合裏的每個目標應用同一個操作。即,如果你想把表單裏每個單元格乘以二,那麽把這個函數單獨地應用在每個單元格上的操作就屬於mapping。
2、化簡(Reducing)遍歷集合中的元素來返回一個綜合的結果。即,輸出表單裏一列數字的和這個任務屬於reducing。
你向MapReduce框架提交一個計算作業時,它會首先把計算作業拆分成若幹個Map任務,然後分配到不同的節點上去執行,
每一個Map任務處理輸入數據中的一部分,當Map任務完成後,它會生成一些中間文件,這些中間文件將會作為Reduce任務的輸入數據。
Reduce任務的主要目標就是把前面若幹個Map的輸出匯總到一起並輸出。
MapReduce的偉大之處就在於編程人員在不會分布式並行編程的情況下,將自己的程序運行在分布式系統上。
1.2 MapReduce運行原理
MapReduce論文流程圖 - 1.1
一切都是從最上方的user program開始的,user program鏈接了MapReduce庫,實現了最基本的Map函數和Reduce函數。圖中執行的順序都用數字標記了。
1、MapReduce庫先把user program的輸入文件劃分為M份(M為用戶定義),每一份通常有16MB到64MB,如圖左方所示分成了split0~4;然後使用fork將用戶進程拷貝到集群內其它機器上。
2、user program的副本中有一個稱為master,其余稱為worker,master是負責調度的,為空閑worker分配作業(Map作業3或者Reduce作業),worker的數量也是可以由用戶指定的。
3、被分配了Map作業的worker,開始讀取對應分片的輸入數據,Map作業數量是由M決定的,和split一一對應;Map作業從輸入數據中抽取出鍵值對,每一個鍵值對都作為參數傳遞給map函數,map函數產生的中間鍵值對被緩存在內存中。
4、緩存的中間鍵值對會被定期寫入本地磁盤,而且被分為R個區,R的大小是由用戶定義的,將來每個區會對應一個Reduce作業;這些中間鍵值對的位置會被通報給master,master負責將信息轉發給Reduce worker。
5、master通知分配了Reduce作業的worker它負責的分區在什麽位置(肯定不止一個地方,每個Map作業產生的中間鍵值對都可能映射到所有R個不同分區),當Reduce worker把所有它負責的中間鍵值對都讀過來後,先對它們進行排序,使得相同鍵的鍵值對聚集在一起。因為不同的鍵可能會映射到同一個分區也就是同一個Reduce作業(誰讓分區少呢),所以排序是必須的。
6、reduce worker遍歷排序後的中間鍵值對,對於每個唯一的鍵,都將鍵與關聯的值傳遞給reduce函數,reduce函數產生的輸出會添加到這個分區的輸出文件中。
7、當所有的Map和Reduce作業都完成了,master喚醒正版的user program,MapReduce函數調用返回user program的代碼
8、所有執行完畢後,MapReduce輸出放在了R個分區的輸出文件中(分別對應一個Reduce作業)。用戶通常並不需要合並這R個文件,而是將其作為輸入交給另一個MapReduce程序處理。整個過程中,輸入數據是來自底層分布式文件系統(GFS)的,中間數據是放在本地文件系統的,最終輸出數據是寫入底層分布式文件系統(GFS)的。而且我們要註意Map/Reduce作業和map/reduce函數的區別:Map作業處理一個輸入數據的分片,可能需要調用多次map函數來處理每個輸入鍵值對;Reduce作業處理一個分區的中間鍵值對,期間要對每個不同的鍵調用一次reduce函數,Reduce作業最終也對應一個輸出文件。
HadoopMapReduce模型實現圖– 1.2
1.3 輸入與輸出
Map/Reduce框架運轉在<key, value>鍵值對上,也就是說,框架把作業的輸入看為是一組<key, value>鍵值對,同樣也產出一組 <key, value>鍵值對做為作業的輸出,這兩組鍵值對的類型可能不同。
框架需要對key和value的類(classes)進行序列化操作,因此,這些類需要實現Writable接口。另外,為了方便框架執行排序操作,key類必須實現 WritableComparable接口。
一個Map/Reduce作業的輸入和輸出類型如下所示:
(input) <k1, v1> -> map -> <k2, v2>-> combine -> <k2, v2> -> reduce -> <k3, v3> (output)。
1.4 Writable接口
Writable接口是一個實現了序列化協議的序列化對象。
在Hadoop中定義一個結構化對象都要實現Writable接口,使得該結構化對象可以序列化為字節流,字節流也可以反序列化為結構化對象。
Java基本類型 |
Writable使用序列化大小 |
字節 |
布爾型 |
BooleanWritable |
1 |
字節型 |
ByteWritable |
1 |
整型 |
IntWritable |
4 |
整型 |
VIntWritable |
1-5 |
浮點型 |
FloatWritable |
4 |
長整型 |
LongWritable |
8 |
長整型 |
VLongWritable |
1-9 |
雙精度浮點型 |
DoubleWritable |
8 |
Text類型對應 |
java的string |
|
2 MapReduce編程
2.1 準備數據
1、 在/home路徑下,新建words.txt文檔,文檔內容如下:
hello tom
hello jerry
hello kitty
hello world
hello tom
2、 上傳到hdfs文件服務器/hadoop目錄下:
執行命令:hadoop fs -put /home/words.txt /hadoop/words.txt
執行命令:hadoop fs -cat /hadoop/words.txt
2.2 WordCount v1.0代碼編寫
WordCount是一個簡單的應用,它可以計算出指定數據集中每一個單詞出現的次數。
1、 在pom.xml引入Jar包:
[html] view plain copy- <!-- 引入hadoop-common Jar包 -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>2.7.1</version>
- </dependency>
- <!-- 引入hadoop-mapreduce-client-core Jar包 -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>2.7.1</version>
- </dependency>
2、 WCMapper代碼編寫:
[java] view plain copy- package com.hadoop.mapreduce;
- import java.io.IOException;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- /*
- * 繼承Mapper類需要定義四個輸出、輸出類型泛型:
- * 四個泛型類型分別代表:
- * KeyIn Mapper的輸入數據的Key,這裏是每行文字的起始位置(0,11,...)
- * ValueIn Mapper的輸入數據的Value,這裏是每行文字
- * KeyOut Mapper的輸出數據的Key,這裏是每行文字中的單詞"hello"
- * ValueOut Mapper的輸出數據的Value,這裏是每行文字中的出現的次數
- *
- * Writable接口是一個實現了序列化協議的序列化對象。
- * 在Hadoop中定義一個結構化對象都要實現Writable接口,使得該結構化對象可以序列化為字節流,字節流也可以反序列化為結構化對象。
- * LongWritable類型:Hadoop.io對Long類型的封裝類型
- */
- public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
- /**
- * 重寫Map方法
- */
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
- throws IOException, InterruptedException {
- // 獲得每行文檔內容,並且進行折分
- String[] words = value.toString().split(" ");
- // 遍歷折份的內容
- for (String word : words) {
- // 每出現一次則在原來的基礎上:+1
- context.write(new Text(word), new LongWritable(1));
- }
- }
- }
3、 WCReducer代碼編寫:
[java] view plain copy- package com.hadoop.mapreduce;
- import java.io.IOException;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- /*
- * 繼承Reducer類需要定義四個輸出、輸出類型泛型:
- * 四個泛型類型分別代表:
- * KeyIn Reducer的輸入數據的Key,這裏是每行文字中的單詞"hello"
- * ValueIn Reducer的輸入數據的Value,這裏是每行文字中的次數
- * KeyOut Reducer的輸出數據的Key,這裏是每行文字中的單詞"hello"
- * ValueOut Reducer的輸出數據的Value,這裏是每行文字中的出現的總次數
- */
- public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
- /**
- * 重寫reduce方法
- */
- @Override
- protected void reduce(Text key, Iterable<LongWritable> values,
- Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
- long sum = 0;
- for (LongWritable i : values) {
- // i.get轉換成long類型
- sum += i.get();
- }
- // 輸出總計結果
- context.write(key, new LongWritable(sum));
- }
- }
4、 WordCount代碼編寫:
[java] view plain copy- package com.hadoop.mapreduce;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- public class WordCount {
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- // 創建job對象
- Job job = Job.getInstance(new Configuration());
- // 指定程序的入口
- job.setJarByClass(WordCount.class);
- // 指定自定義的Mapper階段的任務處理類
- job.setMapperClass(WCMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
- // 數據HDFS文件服務器讀取數據路徑
- FileInputFormat.setInputPaths(job, new Path("/hadoop/words.txt"));
- // 指定自定義的Reducer階段的任務處理類
- job.setReducerClass(WCReducer.class);
- // 設置最後輸出結果的Key和Value的類型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- // 將計算的結果上傳到HDFS服務
- FileOutputFormat.setOutputPath(job, new Path("/hadoop/wordsResult"));
- // 執行提交job方法,直到完成,參數true打印進度和詳情
- job.waitForCompletion(true);
- System.out.println("Finished");
- }
- }
2.3 生成JAR包
1、 選擇hdfs項目->右擊菜單->Export…,在彈出的提示框中選擇Java下的JAR file:
2、 設置導出jar名稱和路徑,選擇Next>:
3、 設置程序的入口,設置完成後,點擊Finish:
4、 成生wc.jar如下文件,如下圖:
2.4 執行JAR運行結果
1、 在開Xft軟件,將D:盤的wc.jar上傳到Linux/home路徑下:
2、 執行命令
切換目錄命令:cd /home/
執行JAR包命令:hadoop jar wc.jar
3、 查看執行結果
執行命令:hadoop fs -ls /hadoop/wordsResult
執行命令:hadoop fs -cat /hadoop/wordsResult/part-r-00000
--以上為《MapReduce教程(一)基於MapReduce框架開發》,如有不當之處請指出,我後續逐步完善更正,大家共同提高。謝謝大家對我的關註。
轉自 http://blog.csdn.net/yuan_xw/article/details/50532368
MapReduce教程(一)基於MapReduce框架開發<轉>