1. 程式人生 > >MapReduce初級案例(3):使用MapReduce實現平均成績

MapReduce初級案例(3):使用MapReduce實現平均成績

當我們看到這個例子的時候,我們是否想過:
mapreduce是否可以完成我們傳統開發中經常遇到的一些任務。例如排序、平均數、批量word轉換等。它和我們傳統開發有什麼不同。
那麼我們可以帶著下面問題來閱讀:
1.mapreduce是如何求平均值的?
2.map在求平均值的作用是什麼?

3.reduce在求平均值的作用是什麼?

一、簡介:
"平均成績"主要目的還是在重溫經典"WordCount"例子,可以說是在基礎上的微變化版,該例項主要就是實現一個計算學生平均成績的例子。

二、例項描述

對輸入檔案中資料進行就算學生平均成績。輸入檔案中的每行內容均為一個學生的姓名和他相應的成績,如果有多門學科,則每門學科為一個檔案。要求在輸出中每行有兩個間隔的資料,其中,第一個代表學生的姓名,第二個代表其平均成績。

    樣本輸入:

    1)math:
  1. 張三    88
  2. 李四    99
  3. 王五    66
  4. 趙六    77
複製程式碼 2)chinese :
  1. 張三    78
  2. 李四    89
  3. 王五    96
  4. 趙六    67
複製程式碼 3)english:
  1. 張三    80
  2. 李四    82
  3. 王五    84
  4. 趙六    86
複製程式碼 樣本輸出:
  1. 張三    82
  2. 李四    90
  3. 王五    82
  4. 趙六    76
複製程式碼
三、設計思路


計算學生平均成績是一個仿"WordCount"例子,用來重溫一下開發MapReduce程式的流程。程式包括兩部分的內容:Map部分和Reduce部分,分別實現了map和reduce的功能。

Map處理的是一個純文字檔案,檔案中存放的資料時每一行表示一個學生的姓名和他相應一科成績。Mapper處理的資料是由InputFormat分解過的資料集,其中InputFormat的作用是將資料集切割成小資料集InputSplit,每一個InputSlit將由一個Mapper負責處理。此外,InputFormat中還提供了一個RecordReader的實現,並將一個InputSplit解析成<key,value>對提供給了map函式。InputFormat的預設值是TextInputFormat,它針對文字檔案,按行將文字切割成InputSlit,並用LineRecordReader將InputSplit解析成<key,value>對,key是行在文字中的位置,value是檔案中的一行。

Map的結果會通過partion分發到Reducer,Reducer做完Reduce操作後,將通過以格式OutputFormat輸出。

Mapper最終處理的結果對<key,value>,會送到Reducer中進行合併,合併的時候,有相同key的鍵/值對則送到同一個Reducer上。Reducer是所有使用者定製Reducer類地基礎,它的輸入是key和這個key對應的所有value的一個迭代器,同時還有Reducer的上下文。Reduce的結果由Reducer.Context的write方法輸出到檔案中。


四、程式程式碼


程式程式碼如下所示:
  1. package com.hebut.mr;
  2. import java.io.IOException;
  3. import java.util.Iterator;
  4. import java.util.StringTokenizer;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.LongWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  17. import org.apache.hadoop.util.GenericOptionsParser;
  18. public class Score {
  19.     public static class Map extends
  20.             Mapper<LongWritable, Text, Text, IntWritable> {
  21.         // 實現map函式
  22.         public void map(LongWritable key, Text value, Context context)
  23.                 throws IOException, InterruptedException {
  24.             // 將輸入的純文字檔案的資料轉化成String
  25.             String line = value.toString();
  26.             // 將輸入的資料首先按行進行分割
  27.             StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
  28.             // 分別對每一行進行處理
  29.             while (tokenizerArticle.hasMoreElements()) {
  30.                 // 每行按空格劃分
  31.                 StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
  32.                 String strName = tokenizerLine.nextToken();// 學生姓名部分
  33.                 String strScore = tokenizerLine.nextToken();// 成績部分
  34.                 Text name = new Text(strName);
  35.                 int scoreInt = Integer.parseInt(strScore);
  36.                 // 輸出姓名和成績
  37.                 context.write(name, new IntWritable(scoreInt));
  38.             }
  39.         }
  40.     }
  41.     public static class Reduce extends
  42.             Reducer<Text, IntWritable, Text, IntWritable> {
  43.         // 實現reduce函式
  44.         public void reduce(Text key, Iterable<IntWritable> values,
  45.                 Context context) throws IOException, InterruptedException {
  46.             int sum = 0;
  47.             int count = 0;
  48.             Iterator<IntWritable> iterator = values.iterator();
  49.             while (iterator.hasNext()) {
  50.                 sum += iterator.next().get();// 計算總分
  51.                 count++;// 統計總的科目數
  52.             }
  53.             int average = (int) sum / count;// 計算平均成績
  54.             context.write(key, new IntWritable(average));
  55.         }
  56.     }
  57.     public static void main(String[] args) throws Exception {
  58.         Configuration conf = new Configuration();
  59.         // 這句話很關鍵
  60.         conf.set("mapred.job.tracker", "192.168.1.2:9001");
  61.         String[] ioArgs = new String[] { "score_in", "score_out" };
  62.         String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
  63.         if (otherArgs.length != 2) {
  64.             System.err.println("Usage: Score Average <in> <out>");
  65.             System.exit(2);
  66.         }
  67.         Job job = new Job(conf, "Score Average");
  68.         job.setJarByClass(Score.class);
  69.         // 設定Map、Combine和Reduce處理類
  70.         job.setMapperClass(Map.class);
  71.         job.setCombinerClass(Reduce.class);
  72.         job.setReducerClass(Reduce.class);
  73.         // 設定輸出型別
  74.         job.setOutputKeyClass(Text.class);
  75.         job.setOutputValueClass(IntWritable.class);
  76.         // 將輸入的資料集分割成小資料塊splites,提供一個RecordReder的實現
  77.         job.setInputFormatClass(TextInputFormat.class);
  78.         // 提供一個RecordWriter的實現,負責資料輸出
  79.         job.setOutputFormatClass(TextOutputFormat.class);
  80.         // 設定輸入和輸出目錄
  81.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  82.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  83.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  84.     }
  85. }
複製程式碼 四、程式碼結果

1)準備測試資料     通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下建立輸入檔案"score_in"資料夾(備註:"score_out"不需要建立。)如圖3.4-1所示,已經成功建立。

1.png (43.71 KB, 下載次數: 5)

下載附件  儲存到相簿

2014-3-3 22:24 上傳

            

2.png (55.61 KB, 下載次數: 5)

下載附件  儲存到相簿

2014-3-3 22:24 上傳

圖3.4-1 建立"score_in"                                                       圖3.4.2 上傳三門分數
    然後在本地建立三個txt檔案,通過Eclipse上傳到"/user/hadoop/score_in"資料夾中,三個txt檔案的內容如"例項描述"那三個檔案一樣。如圖3.4-2所示,成功上傳之後。     備註:文字檔案的編碼為"UTF-8",預設為"ANSI",可以另存為時選擇,不然中文會出現亂碼。     從SecureCRT遠處檢視"Master.Hadoop"的也能證實我們上傳的三個檔案。

3.png (64.21 KB, 下載次數: 5)

下載附件  儲存到相簿

2014-3-3 22:25 上傳


檢視三個檔案的內容如圖3.4-3所示:

4.png (60.06 KB, 下載次數: 5)

下載附件  儲存到相簿

2014-3-3 22:25 上傳

圖3.4.3 三門成績的內容 2)檢視執行結果     這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"資料夾進行重新整理,這時會發現多出一個"score_out"資料夾,且裡面有3個檔案,然後開啟雙其"part-r-00000"檔案,會在Eclipse中間把內容顯示出來。如圖3.4-4所示。

5.png (73.91 KB, 下載次數: 5)

下載附件  儲存到相簿

2014-3-3 22:25 上傳

圖3.4-4 執行結果