Apache Hadoop(2)---程式設計模型MapReduce
對於大量資料的處理,一般有兩種途徑:一是增大單機的效能,但是摩爾定律
總有減緩甚至失效的那天,資料量的增長遠遠大於單機效能的提升速度;另外一個就是採取分散式的做法,將資料劃分成若干可分割的塊,然後用多臺伺服器去並行的處理。如果原始資料之間沒有依賴關係,理論上只需要單機的1/N時間,而且可以線性擴充套件。
如果用一句話來概括Hadoop的程式設計模型MapReduce
,我想應該是化繁為簡,分而治之
。MapReduce有兩種元件,Mapper和Reducer。Mapper解決的是將原始任務劃分成一個一個獨立的子任務,而Reducer則負責解決這些子任務並將結果拼起來,得到原始問題的結果。
WordCount
下面通過一個簡單的WordCount示例程式來介紹一下如何通過MapRduce實現資料的處理。我們的輸入是一些文字檔案,內容是若干篇文章,要求計算出每個單詞 的出現次數。
首先用常規的做法去解決這個問題,需要經過如下步驟:
- 初始化一個計數器字典,用於儲存每個單詞的出現次數
- 對於每一個檔案,從第一行開始,分割單詞
- 對於每一個單詞,將計數器裡的對應值加一
- 迭代直到處理完所有檔案
那麼如果檔案數量很多呢,一個個處理會很慢,可以考慮啟動多個執行緒,每個執行緒處理一個檔案,最後再把結果合併。這就是分治的思想了。
相比較於多執行緒/多程序/多節點並行,Hadoop的優勢不僅僅在於將任務自動並行化了,更關鍵的是它遮蔽了很多並行任務可能會出現的故障,使用者不需要去關注各個子任務之間的關聯和子任務失敗處理等等問題。
Mapper
將上面的WordCount用MapReduce實現的話,只需要一步Map和一步Reduce操作即可完成,框架會自動啟動若干Mapper,而每一個Mapper所需要做的就是等待框架”喂入”資料,每次的資料對應就是檔案的一行,不需要去關注檔案讀寫,只關注核心任務即可,也就是分割句子。
Mapper所需要做的就是對於每次傳入的文字,分割,然後把每一個單詞傳遞出去即可。
# WCMapper.java public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String words[] = line.split(" "); for(String word: words) { context.write(new Text(word), new IntWritable(1)); } } }
有幾點需要補充一下:
- Mapper和Reducer需要繼承Hadoop框架的Mapper和Reducer類,然後過載map/reduce方法
- 在MapReduce中,所有的輸入輸出都是以KV鍵值對的形式,如這裡的Mapper輸入是(LongWritable, Text),這些都是Hadoop裡的類,對應long和string,這樣寫主要是為了實現序列化和反序列化
- 對於每一個單詞,輸出(word, 1)鍵值對,表示這個單詞出現了一次
- 最上面四個型別分別表示輸入和輸出的鍵值對型別
Reducer
MapReduce 框架會自動合併/處理Mapper的輸出,經過了若干步驟之後,Reducer的輸入看起來會是(string, list<int>)
,即自動按照每個單詞把所有的Mapper發出的鍵值對彙總了。
這樣的話,Reducer的工作也很簡單,對於每次輸入,只需要便利後面的列表,然後把值相加,最後再輸出(word, count)鍵值對即可。
# WCReducer.java public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable value: values){ count += value.get(); } context.write(key, new IntWritable(count)); } }
遍歷列表,然後把對應的加起來,考慮到Mapper輸出的都是1,所以這裡直接用size()
也可以,不過為了後面的優化,寫成這種。
Job Driver
寫完了Mapper和Reducer,還需要一個驅動器來把它們連線起來。對於Hadoop,每個MapReduce對應一個任務,需要去定義這個任務的輸入檔案,輸入檔案格式,輸出檔案路徑,輸出檔案格式,對應的Mapper和Reducer。
Job job = Job.getInstance(); job.setJarByClass(WCMapreduce.class); job.setJobName("WordCount"); job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true)?0:1);
首先新建一個Job
例項;
然後設定入口類(也就是它本身),並設定任務的名字;
接著設定Mapper和Reducer;
再是設定輸入和輸出路徑,可以是檔名也可以是路徑,框架會自動識別。這裡需要注意一個是add,一個是set,區別主要在於可以有多個輸入檔案/路徑;
接著是Mapper和Reducer的輸出鍵值對對應的型別,需要是Hadoop的型別;
最後是啟動這個任務,可以選擇等待任務完成,也可以啟動之後就退出,讓它後臺執行,一般是提交完任務之後就退出。
將程式碼打包成jar,然後通過Hadoop介面執行任務即可
bin/hadoop jar wordcount.jar com.newnius.WCMapreduce /path/to/input /path/to/output
任務輸出可以在HDFS的/path/to/output
目錄找到
bin/hdfs dfs -ls /path/to/output
Reducer
以上就可以完成基本的WordCount任務了,但是有沒有想過,如果文字檔案很多會帶來兩個問題,一個是Mapper和Reducer節點之間的網路通訊量很大,另外一個就是Reducer接收到的列表也會很大,增加了記憶體開銷和處理壓力。
針對以上問題,MapReduce提供了Combiner機制,也就是對Mapper的輸出做精簡操作。在WordCount這個例子中,因為常用的單詞就那麼多,一篇文章中肯定會出現很多重複的單詞,那麼在傳送到Reducer之前先把重複的單詞鍵值對合並一下,這樣Reducer端的壓力就能大大減小了。
作為一個可選項,為了保證Reducer的輸入是一樣的,所以Combiner的輸入輸出需要跟Reducer保持一致,如果仔細觀察就能發現,Combiner跟Reducer做的事情基本是完全一樣的,所以不需要重新寫一個Combiner,只需要複用Reducer,然後指定它為Combiner就可以了。
只需要在原有的程式碼上增加一行即可啟用Combiner。可以對比一下啟用了Combiner之後,網路傳輸和處理速度是否變快了。
job.setCombinerClass(WCReducer.class);
那麼這麼好的東西是不是每個任務都啟用呢?答案是不一定,儘管可能降低網路通訊開銷,但是由於Combiner需要啟動額外的容器和資源,也會消耗一定的時間,如果參與的節點很多而每個節點上其實沒有太多能精簡的東西,那麼Combiner所帶來的好處就很小了,反而拖慢了整體的執行速度。
所以是否啟用Combiner需要取決於具體的任務和網路環境(百兆網跟萬兆網肯定是不一樣的)等等。
完整的程式碼可以在wordcount | GitHub 找到
總結
通過分治的思想,Hadoop成功的將海量資料處理的複雜問題抽象成了Map和Reduce兩種基本操作。
為了做到簡單可靠,MapReduce只提供了Mapper和Reducer這兩種元件(Combiner更像是一種優化設計),儘管這simple but powerful
,但是過於簡單的程式設計介面增加了使用者的編碼難度,一些常用的高階用法需要自己實現,另一個是很多工可能很難用MapReduce的模式實現。