MapReduce的執行原理 MapReduce的原理及執行過程 Combiner
MapReduce的原理及執行過程
MapReduce簡介
- MapReduce是一種分散式計算模型,是Google提出的,主要用於搜尋領域,解決海量資料的計算問題。
- MR有兩個階段組成:Map和Reduce,使用者只需實現map()和reduce()兩個函式,即可實現分散式計算。
MapReduce執行流程
MapReduce原理
MapReduce的執行步驟:
1、Map任務處理
1.1 讀取HDFS中的檔案。每一行解析成一個<k,v>。每一個鍵值對呼叫一次map函式。 <0,hello you> <10,hello me>
1.2 覆蓋map(),接收1.1產生的<k,v>,進行處理,轉換為新的<k,v>輸出。 <hello,1> <you,1> <hello,1> <me,1>
1.3 對1.2輸出的<k,v>進行分割槽。預設分為一個區。詳見《Partitioner》
1.4 對不同分割槽中的資料進行排序(按照k)、分組。分組指的是相同key的value放到一個集合中。 排序後:<hello,1> <hello,1> <me,1> <you,1>
1.5 (可選)對分組後的資料進行歸約。詳見《Combiner》
2、Reduce任務處理
2.1 多個map任務的輸出,按照不同的分割槽,通過網路copy到不同的reduce節點上。(shuffle)詳見《shuffle過程分析》
2.2 對多個map的輸出進行合併、排序。覆蓋reduce函式,接收的是分組後的資料,實現自己的業務邏輯, <hello,2> <me,1> <you,1>
處理後,產生新的<k,v>輸出。
2.3 對reduce輸出的<k,v>寫到HDFS中。
Java程式碼實現
注:要匯入org.apache.hadoop.fs.FileUtil.java。
1、先建立一個hello檔案,上傳到HDFS中
2、然後再編寫程式碼,實現檔案中的單詞個數統計(程式碼中被註釋掉的程式碼,是可以省略的,不省略也行)
1 package mapreduce; 2 3 import java.net.URI; 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.FileSystem; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 17 public class WordCountApp { 18 static final String INPUT_PATH = "hdfs://chaoren:9000/hello"; 19 static final String OUT_PATH = "hdfs://chaoren:9000/out"; 20 21 public static void main(String[] args) throws Exception { 22 Configuration conf = new Configuration(); 23 FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 24 Path outPath = new Path(OUT_PATH); 25 if (fileSystem.exists(outPath)) { 26 fileSystem.delete(outPath, true); 27 } 28 29 Job job = new Job(conf, WordCountApp.class.getSimpleName()); 30 31 // 1.1指定讀取的檔案位於哪裡 32 FileInputFormat.setInputPaths(job, INPUT_PATH); 33 // 指定如何對輸入的檔案進行格式化,把輸入檔案每一行解析成鍵值對 34 //job.setInputFormatClass(TextInputFormat.class); 35 36 // 1.2指定自定義的map類 37 job.setMapperClass(MyMapper.class); 38 // map輸出的<k,v>型別。如果<k3,v3>的型別與<k2,v2>型別一致,則可以省略 39 //job.setOutputKeyClass(Text.class); 40 //job.setOutputValueClass(LongWritable.class); 41 42 // 1.3分割槽 43 //job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class); 44 // 有一個reduce任務執行 45 //job.setNumReduceTasks(1); 46 47 // 1.4排序、分組 48 49 // 1.5歸約 50 51 // 2.2指定自定義reduce類 52 job.setReducerClass(MyReducer.class); 53 // 指定reduce的輸出型別 54 job.setOutputKeyClass(Text.class); 55 job.setOutputValueClass(LongWritable.class); 56 57 // 2.3指定寫出到哪裡 58 FileOutputFormat.setOutputPath(job, outPath); 59 // 指定輸出檔案的格式化類 60 //job.setOutputFormatClass(TextOutputFormat.class); 61 62 // 把job提交給jobtracker執行 63 job.waitForCompletion(true); 64 } 65 66 /** 67 * 68 * KEYIN 即K1 表示行的偏移量 69 * VALUEIN 即V1 表示行文字內容 70 * KEYOUT 即K2 表示行中出現的單詞 71 * VALUEOUT 即V2 表示行中出現的單詞的次數,固定值1 72 * 73 */ 74 static class MyMapper extends 75 Mapper<LongWritable, Text, Text, LongWritable> { 76 protected void map(LongWritable k1, Text v1, Context context) 77 throws java.io.IOException, InterruptedException { 78 String[] splited = v1.toString().split("\t"); 79 for (String word : splited) { 80 context.write(new Text(word), new LongWritable(1)); 81 } 82 }; 83 } 84 85 /** 86 * KEYIN 即K2 表示行中出現的單詞 87 * VALUEIN 即V2 表示出現的單詞的次數 88 * KEYOUT 即K3 表示行中出現的不同單詞 89 * VALUEOUT 即V3 表示行中出現的不同單詞的總次數 90 */ 91 static class MyReducer extends 92 Reducer<Text, LongWritable, Text, LongWritable> { 93 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, 94 Context ctx) throws java.io.IOException, 95 InterruptedException { 96 long times = 0L; 97 for (LongWritable count : v2s) { 98 times += count.get(); 99 } 100 ctx.write(k2, new LongWritable(times)); 101 }; 102 } 103 }
3、執行成功後,可以在Linux中檢視操作的結果
本文章來自於:https://www.cnblogs.com/ahu-lichang/p/6645074.html 如有疑問可與作者聯絡。
MapReduce簡介
- MapReduce是一種分散式計算模型,是Google提出的,主要用於搜尋領域,解決海量資料的計算問題。
- MR有兩個階段組成:Map和Reduce,使用者只需實現map()和reduce()兩個函式,即可實現分散式計算。
MapReduce執行流程
MapReduce原理
MapReduce的執行步驟:
1、Map任務處理
1.1 讀取HDFS中的檔案。每一行解析成一個<k,v>。每一個鍵值對呼叫一次map函式。 <0,hello you> <10,hello me>
1.2 覆蓋map(),接收1.1產生的<k,v>,進行處理,轉換為新的<k,v>輸出。 <hello,1> <you,1> <hello,1> <me,1>
1.3 對1.2輸出的<k,v>進行分割槽。預設分為一個區。詳見《Partitioner》
1.4 對不同分割槽中的資料進行排序(按照k)、分組。分組指的是相同key的value放到一個集合中。 排序後:<hello,1> <hello,1> <me,1> <you,1> 分組後:<hello,{1,1}><me,{1}><you,{1}>
1.5 (可選)對分組後的資料進行歸約。詳見《Combiner》
2、Reduce任務處理
2.1 多個map任務的輸出,按照不同的分割槽,通過網路copy到不同的reduce節點上。(shuffle)詳見《shuffle過程分析》
2.2 對多個map的輸出進行合併、排序。覆蓋reduce函式,接收的是分組後的資料,實現自己的業務邏輯, <hello,2> <me,1> <you,1>
處理後,產生新的<k,v>輸出。
2.3 對reduce輸出的<k,v>寫到HDFS中。
Java程式碼實現
注:要匯入org.apache.hadoop.fs.FileUtil.java。
1、先建立一個hello檔案,上傳到HDFS中
2、然後再編寫程式碼,實現檔案中的單詞個數統計(程式碼中被註釋掉的程式碼,是可以省略的,不省略也行)
1 package mapreduce; 2 3 import java.net.URI; 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.FileSystem; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.Mapper; 11 import org.apache.hadoop.mapreduce.Reducer; 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 17 public class WordCountApp { 18 static final String INPUT_PATH = "hdfs://chaoren:9000/hello"; 19 static final String OUT_PATH = "hdfs://chaoren:9000/out"; 20 21 public static void main(String[] args) throws Exception { 22 Configuration conf = new Configuration(); 23 FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 24 Path outPath = new Path(OUT_PATH); 25 if (fileSystem.exists(outPath)) { 26 fileSystem.delete(outPath, true); 27 } 28 29 Job job = new Job(conf, WordCountApp.class.getSimpleName()); 30 31 // 1.1指定讀取的檔案位於哪裡 32 FileInputFormat.setInputPaths(job, INPUT_PATH); 33 // 指定如何對輸入的檔案進行格式化,把輸入檔案每一行解析成鍵值對 34 //job.setInputFormatClass(TextInputFormat.class); 35 36 // 1.2指定自定義的map類 37 job.setMapperClass(MyMapper.class); 38 // map輸出的<k,v>型別。如果<k3,v3>的型別與<k2,v2>型別一致,則可以省略 39 //job.setOutputKeyClass(Text.class); 40 //job.setOutputValueClass(LongWritable.class); 41 42 // 1.3分割槽 43 //job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class); 44 // 有一個reduce任務執行 45 //job.setNumReduceTasks(1); 46 47 // 1.4排序、分組 48 49 // 1.5歸約 50 51 // 2.2指定自定義reduce類 52 job.setReducerClass(MyReducer.class); 53 // 指定reduce的輸出型別 54 job.setOutputKeyClass(Text.class); 55 job.setOutputValueClass(LongWritable.class); 56 57 // 2.3指定寫出到哪裡 58 FileOutputFormat.setOutputPath(job, outPath); 59 // 指定輸出檔案的格式化類 60 //job.setOutputFormatClass(TextOutputFormat.class); 61 62 // 把job提交給jobtracker執行 63 job.waitForCompletion(true); 64 } 65 66 /** 67 * 68 * KEYIN 即K1 表示行的偏移量 69 * VALUEIN 即V1 表示行文字內容 70 * KEYOUT 即K2 表示行中出現的單詞 71 * VALUEOUT 即V2 表示行中出現的單詞的次數,固定值1 72 * 73 */ 74 static class MyMapper extends 75 Mapper<LongWritable, Text, Text, LongWritable> { 76 protected void map(LongWritable k1, Text v1, Context context) 77 throws java.io.IOException, InterruptedException { 78 String[] splited = v1.toString().split("\t"); 79 for (String word : splited) { 80 context.write(new Text(word), new LongWritable(1)); 81 } 82 }; 83 } 84 85 /** 86 * KEYIN 即K2 表示行中出現的單詞 87 * VALUEIN 即V2 表示出現的單詞的次數 88 * KEYOUT 即K3 表示行中出現的不同單詞 89 * VALUEOUT 即V3 表示行中出現的不同單詞的總次數 90 */ 91 static class MyReducer extends 92 Reducer<Text, LongWritable, Text, LongWritable> { 93 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, 94 Context ctx) throws java.io.IOException, 95 InterruptedException { 96 long times = 0L; 97 for (LongWritable count : v2s) { 98 times += count.get(); 99 } 100 ctx.write(k2, new LongWritable(times)); 101 }; 102 } 103 }
3、執行成功後,可以在Linux中檢視操作的結果