1. 程式人生 > >MapReduce的執行原理 MapReduce的原理及執行過程 Combiner

MapReduce的執行原理 MapReduce的原理及執行過程 Combiner

MapReduce的原理及執行過程

 

MapReduce簡介

  1. MapReduce是一種分散式計算模型,是Google提出的,主要用於搜尋領域,解決海量資料的計算問題。
  2. MR有兩個階段組成:Map和Reduce,使用者只需實現map()和reduce()兩個函式,即可實現分散式計算。

MapReduce執行流程

 

 MapReduce原理

 

 

 MapReduce的執行步驟:

1、Map任務處理

  1.1 讀取HDFS中的檔案。每一行解析成一個<k,v>。每一個鍵值對呼叫一次map函式。                <0,hello you>   <10,hello me>

                    

  1.2 覆蓋map(),接收1.1產生的<k,v>,進行處理,轉換為新的<k,v>輸出。          <hello,1> <you,1> <hello,1> <me,1>

  1.3 對1.2輸出的<k,v>進行分割槽。預設分為一個區。詳見《Partitioner

  1.4 對不同分割槽中的資料進行排序(按照k)、分組。分組指的是相同key的value放到一個集合中。 排序後:<hello,1> <hello,1> <me,1> <you,1>

  分組後:<hello,{1,1}><me,{1}><you,{1}>

  1.5 (可選)對分組後的資料進行歸約。詳見《Combiner

2、Reduce任務處理

  2.1 多個map任務的輸出,按照不同的分割槽,通過網路copy到不同的reduce節點上。(shuffle)詳見《shuffle過程分析

  2.2 對多個map的輸出進行合併、排序。覆蓋reduce函式,接收的是分組後的資料,實現自己的業務邏輯, <hello,2> <me,1> <you,1>

    處理後,產生新的<k,v>輸出。

  2.3 對reduce輸出的<k,v>寫到HDFS中。

 

Java程式碼實現

注:要匯入org.apache.hadoop.fs.FileUtil.java。

1、先建立一個hello檔案,上傳到HDFS中

 

2、然後再編寫程式碼,實現檔案中的單詞個數統計(程式碼中被註釋掉的程式碼,是可以省略的,不省略也行)

 

複製程式碼
  1 package mapreduce;
  2 
  3 import java.net.URI;
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.fs.FileSystem;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.io.LongWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Job;
 10 import org.apache.hadoop.mapreduce.Mapper;
 11 import org.apache.hadoop.mapreduce.Reducer;
 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 16 
 17 public class WordCountApp {
 18     static final String INPUT_PATH = "hdfs://chaoren:9000/hello";
 19     static final String OUT_PATH = "hdfs://chaoren:9000/out";
 20 
 21     public static void main(String[] args) throws Exception {
 22         Configuration conf = new Configuration();
 23         FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
 24         Path outPath = new Path(OUT_PATH);
 25         if (fileSystem.exists(outPath)) {
 26             fileSystem.delete(outPath, true);
 27         }
 28 
 29         Job job = new Job(conf, WordCountApp.class.getSimpleName());
 30 
 31         // 1.1指定讀取的檔案位於哪裡
 32         FileInputFormat.setInputPaths(job, INPUT_PATH);
 33         // 指定如何對輸入的檔案進行格式化,把輸入檔案每一行解析成鍵值對
 34         //job.setInputFormatClass(TextInputFormat.class);
 35 
 36         // 1.2指定自定義的map類
 37         job.setMapperClass(MyMapper.class);
 38         // map輸出的<k,v>型別。如果<k3,v3>的型別與<k2,v2>型別一致,則可以省略
 39         //job.setOutputKeyClass(Text.class);
 40         //job.setOutputValueClass(LongWritable.class);
 41 
 42         // 1.3分割槽
 43         //job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
 44         // 有一個reduce任務執行
 45         //job.setNumReduceTasks(1);
 46 
 47         // 1.4排序、分組
 48 
 49         // 1.5歸約
 50 
 51         // 2.2指定自定義reduce類
 52         job.setReducerClass(MyReducer.class);
 53         // 指定reduce的輸出型別
 54         job.setOutputKeyClass(Text.class);
 55         job.setOutputValueClass(LongWritable.class);
 56 
 57         // 2.3指定寫出到哪裡
 58         FileOutputFormat.setOutputPath(job, outPath);
 59         // 指定輸出檔案的格式化類
 60         //job.setOutputFormatClass(TextOutputFormat.class);
 61 
 62         // 把job提交給jobtracker執行
 63         job.waitForCompletion(true);
 64     }
 65 
 66     /**
 67      * 
 68      * KEYIN     即K1     表示行的偏移量 
 69      * VALUEIN     即V1     表示行文字內容 
 70      * KEYOUT     即K2     表示行中出現的單詞 
 71      * VALUEOUT 即V2        表示行中出現的單詞的次數,固定值1
 72      * 
 73      */
 74     static class MyMapper extends
 75             Mapper<LongWritable, Text, Text, LongWritable> {
 76         protected void map(LongWritable k1, Text v1, Context context)
 77                 throws java.io.IOException, InterruptedException {
 78             String[] splited = v1.toString().split("\t");
 79             for (String word : splited) {
 80                 context.write(new Text(word), new LongWritable(1));
 81             }
 82         };
 83     }
 84 
 85     /**
 86      * KEYIN     即K2     表示行中出現的單詞 
 87      * VALUEIN     即V2     表示出現的單詞的次數 
 88      * KEYOUT     即K3     表示行中出現的不同單詞
 89      * VALUEOUT 即V3     表示行中出現的不同單詞的總次數
 90      */
 91     static class MyReducer extends
 92             Reducer<Text, LongWritable, Text, LongWritable> {
 93         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s,
 94                 Context ctx) throws java.io.IOException,
 95                 InterruptedException {
 96             long times = 0L;
 97             for (LongWritable count : v2s) {
 98                 times += count.get();
 99             }
100             ctx.write(k2, new LongWritable(times));
101         };
102     }
103 }
複製程式碼

 

3、執行成功後,可以在Linux中檢視操作的結果

 

本文章來自於:https://www.cnblogs.com/ahu-lichang/p/6645074.html 如有疑問可與作者聯絡。

MapReduce簡介

  1. MapReduce是一種分散式計算模型,是Google提出的,主要用於搜尋領域,解決海量資料的計算問題。
  2. MR有兩個階段組成:Map和Reduce,使用者只需實現map()和reduce()兩個函式,即可實現分散式計算。

MapReduce執行流程

 

 MapReduce原理

 

 

 MapReduce的執行步驟:

1、Map任務處理

  1.1 讀取HDFS中的檔案。每一行解析成一個<k,v>。每一個鍵值對呼叫一次map函式。                <0,hello you>   <10,hello me>                    

  1.2 覆蓋map(),接收1.1產生的<k,v>,進行處理,轉換為新的<k,v>輸出。          <hello,1> <you,1> <hello,1> <me,1>

  1.3 對1.2輸出的<k,v>進行分割槽。預設分為一個區。詳見《Partitioner

  1.4 對不同分割槽中的資料進行排序(按照k)、分組。分組指的是相同key的value放到一個集合中。 排序後:<hello,1> <hello,1> <me,1> <you,1>  分組後:<hello,{1,1}><me,{1}><you,{1}>

  1.5 (可選)對分組後的資料進行歸約。詳見《Combiner

2、Reduce任務處理

  2.1 多個map任務的輸出,按照不同的分割槽,通過網路copy到不同的reduce節點上。(shuffle)詳見《shuffle過程分析

  2.2 對多個map的輸出進行合併、排序。覆蓋reduce函式,接收的是分組後的資料,實現自己的業務邏輯, <hello,2> <me,1> <you,1>

    處理後,產生新的<k,v>輸出。

  2.3 對reduce輸出的<k,v>寫到HDFS中。

 

Java程式碼實現

注:要匯入org.apache.hadoop.fs.FileUtil.java。

1、先建立一個hello檔案,上傳到HDFS中

 

2、然後再編寫程式碼,實現檔案中的單詞個數統計(程式碼中被註釋掉的程式碼,是可以省略的,不省略也行)

 

複製程式碼
  1 package mapreduce;
  2 
  3 import java.net.URI;
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.fs.FileSystem;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.io.LongWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Job;
 10 import org.apache.hadoop.mapreduce.Mapper;
 11 import org.apache.hadoop.mapreduce.Reducer;
 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 16 
 17 public class WordCountApp {
 18     static final String INPUT_PATH = "hdfs://chaoren:9000/hello";
 19     static final String OUT_PATH = "hdfs://chaoren:9000/out";
 20 
 21     public static void main(String[] args) throws Exception {
 22         Configuration conf = new Configuration();
 23         FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
 24         Path outPath = new Path(OUT_PATH);
 25         if (fileSystem.exists(outPath)) {
 26             fileSystem.delete(outPath, true);
 27         }
 28 
 29         Job job = new Job(conf, WordCountApp.class.getSimpleName());
 30 
 31         // 1.1指定讀取的檔案位於哪裡
 32         FileInputFormat.setInputPaths(job, INPUT_PATH);
 33         // 指定如何對輸入的檔案進行格式化,把輸入檔案每一行解析成鍵值對
 34         //job.setInputFormatClass(TextInputFormat.class);
 35 
 36         // 1.2指定自定義的map類
 37         job.setMapperClass(MyMapper.class);
 38         // map輸出的<k,v>型別。如果<k3,v3>的型別與<k2,v2>型別一致,則可以省略
 39         //job.setOutputKeyClass(Text.class);
 40         //job.setOutputValueClass(LongWritable.class);
 41 
 42         // 1.3分割槽
 43         //job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class);
 44         // 有一個reduce任務執行
 45         //job.setNumReduceTasks(1);
 46 
 47         // 1.4排序、分組
 48 
 49         // 1.5歸約
 50 
 51         // 2.2指定自定義reduce類
 52         job.setReducerClass(MyReducer.class);
 53         // 指定reduce的輸出型別
 54         job.setOutputKeyClass(Text.class);
 55         job.setOutputValueClass(LongWritable.class);
 56 
 57         // 2.3指定寫出到哪裡
 58         FileOutputFormat.setOutputPath(job, outPath);
 59         // 指定輸出檔案的格式化類
 60         //job.setOutputFormatClass(TextOutputFormat.class);
 61 
 62         // 把job提交給jobtracker執行
 63         job.waitForCompletion(true);
 64     }
 65 
 66     /**
 67      * 
 68      * KEYIN     即K1     表示行的偏移量 
 69      * VALUEIN     即V1     表示行文字內容 
 70      * KEYOUT     即K2     表示行中出現的單詞 
 71      * VALUEOUT 即V2        表示行中出現的單詞的次數,固定值1
 72      * 
 73      */
 74     static class MyMapper extends
 75             Mapper<LongWritable, Text, Text, LongWritable> {
 76         protected void map(LongWritable k1, Text v1, Context context)
 77                 throws java.io.IOException, InterruptedException {
 78             String[] splited = v1.toString().split("\t");
 79             for (String word : splited) {
 80                 context.write(new Text(word), new LongWritable(1));
 81             }
 82         };
 83     }
 84 
 85     /**
 86      * KEYIN     即K2     表示行中出現的單詞 
 87      * VALUEIN     即V2     表示出現的單詞的次數 
 88      * KEYOUT     即K3     表示行中出現的不同單詞
 89      * VALUEOUT 即V3     表示行中出現的不同單詞的總次數
 90      */
 91     static class MyReducer extends
 92             Reducer<Text, LongWritable, Text, LongWritable> {
 93         protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s,
 94                 Context ctx) throws java.io.IOException,
 95                 InterruptedException {
 96             long times = 0L;
 97             for (LongWritable count : v2s) {
 98                 times += count.get();
 99             }
100             ctx.write(k2, new LongWritable(times));
101         };
102     }
103 }
複製程式碼

 

3、執行成功後,可以在Linux中檢視操作的結果