1.MapReduce概念
1)MapReduce是一種分散式計算模型,由Google提出,主要用於搜尋領域,解決海量資料的計算問題.
2)MapReduce是分散式執行的,由兩個階段組成:Map和Reduce,Map階段是一個獨立的程式,有很多個節點同時執行,每個節點處理一部分資料。Reduce階段是一個獨立的程式,有很多個節點同時執行,每個節點處理一部分資料【在這先把reduce理解為一個單獨的聚合程式即可】。
3)MapReduce框架都有預設實現,使用者只需要覆蓋map()和reduce()兩個函式,即可實現分散式計算,非常簡單。
4)兩個函式的形參和返回值都是<key、value>,使用的時候一定要注意構造<k,v>。
2.MapReduce核心思想
(1)分散式的運算程式往往需要分成至少2個階段。
(2)第一個階段的MapTask併發例項,完全並行執行,互不相干。
(3)第二個階段的ReduceTask併發例項互不相干,但是他們的資料依賴於上一個階段的所有MapTask併發例項的輸出。
(4)MapReduce程式設計模型只能包含一個Map階段和一個Reduce階段,如果使用者的業務邏輯非常複雜,那就只能多個MapReduce程式,序列執行。
總結:分析WordCount資料流走向深入理解MapReduce核心思想。
3. MapReduce 中的shuffle
4.Mapreduce程式碼
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCount {
//分割任務
// 第一對kv,是決定資料輸入的格式
// 第二隊kv 是決定資料輸出的格式
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
/*map階段資料是一行一行過來的
每一行資料都需要執行程式碼*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
LongWritable longWritable = new LongWritable(1);
String s = value.toString();
context.write(new Text(s), longWritable);
}
}
//接收Map端資料
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
/* reduce 聚合程式 每一個k都會呼叫一次
* 預設是一個節點
* key:每一個單詞
* values:map端 當前k所對應的所有的v
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//設定統計的初始值為0
long sum = 0l;
for (LongWritable value : values) {
sum += value.get();
}
context.write(key, new LongWritable(sum));
}
} /**
* 是當前mapreduce程式入口
* 用來構建mapreduce程式
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//建立一個job任務
Job job=Job.getInstance();
//指定job名稱
job.setJobName("第一個mr程式");
//構建mr
//指定當前main所在類名(識別具體的類)
job.setJarByClass(WordCount.class);
//指定map端類
// 指定map輸出的kv型別
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定reduce端類
//指定reduce端輸出的kv型別
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class); // 指定輸入路徑
Path in = new Path("/word");
FileInputFormat.addInputPath(job,in);
//輸出路徑指定
Path out = new Path("/output");
FileSystem fs = FileSystem.get(new Configuration());
//如果檔案存在
if(fs.exists(out)){
fs.delete(out,true);
}
//存在
FileOutputFormat.setOutputPath(job,out); //啟動
job.waitForCompletion(true);
System.out.println("MapReduce正在執行");
}
}