1. 程式人生 > >好程式設計師大資料學習路線之mapreduce概述

好程式設計師大資料學習路線之mapreduce概述

  與HDFS解決問題的原理類似,HDFS是將大的檔案切分成若干小檔案,然後將它們分別儲存到叢集中各個主機中。

  同樣原理,mapreduce是將一個複雜的運算切分成若個子運算,然後將它們分別交給叢集中各個主機,由各個主機並行運算。

  1.1 mapreduce產生的背景

  海量資料在單機上處理因為硬體資源限制,無法勝任。

  而一旦將單機版程式擴充套件到叢集來分散式執行,將極大增加程式的複雜度和開發難度。

  引入mapreduce框架後,開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分散式計算中的複雜交由框架來處理。

  1.2 mapreduce程式設計模型

  一種分散式計算模型。

  MapReduce將這個平行計算過程抽象到兩個函式。

  Map(對映):對一些獨立元素組成的列表的每一個元素進行指定的操作,可以高度並行。

  Reduce(化簡 歸約):對一個列表的元素進行合併。

  一個簡單的MapReduce程式只需要指定map()、reduce()、input和output,剩下的事由框架完成。

Mapreduce的幾個關鍵名詞

  Job :使用者的每一個計算請求稱為一個作業。

  Task:每一個作業,都需要拆分開了,交由多個主機來完成,拆分出來的執行單位就是任務。

  Task又分為如下三種類型的任務:

  Map:負責map階段的整個資料處理流程

  Reduce:負責reduce階段的整個資料處理流程

  MRAppMaster:負責整個程式的過程排程及狀態協調

1.4 mapreduce程式執行流程

具體流程說明:

一個mr程式啟動的時候,最先啟動的是MRAppMaster,MRAppMaster啟動後根據本次job的描述資訊,計算出需要的maptask例項數量,然後向叢集申請機器啟動相應數量的maptask程序

maptask程序啟動之後,根據給定的資料切片範圍進行資料處理,主體流程為:

– 利用客戶指定的inputformat來獲取RecordReader讀取資料,形成輸入KV對。

– 將輸入KV(k是檔案的行號,v是檔案一行的資料)對傳遞給客戶定義的map()方法,做邏輯運算,並將map()方法輸出的KV對收集到快取。

– 將快取中的KV對按照K分割槽排序後不斷溢寫到磁碟檔案

MRAppMaster監控到所有maptask程序任務完成之後,會根據客戶指定的引數啟動相應數量的reducetask程序,並告知reducetask程序要處理的資料範圍(資料分割槽)

Reducetask程序啟動之後,根據MRAppMaster告知的待處理資料所在位置,從若干臺maptask執行所在機器上獲取到若干個maptask輸出結果檔案,並在本地進行重新歸併排序,然後按照相同key的KV為一個組,呼叫客戶定義的reduce()方法進行邏輯運算,並收集運算輸出的結果KV,然後呼叫客戶指定的outputformat將結果資料輸出到外部儲存

1.5 編寫MapReduce程式

  • 基於MapReduce 計算模型編寫分散式並行程式非常簡單,程式設計師的主要編碼工作就是實現Map 和Reduce函式。
  • 其它的並行程式設計中的種種複雜問題,如分散式儲存,工作排程,負載平衡,容錯處理,網路通訊等,均由YARN框架負責處理。
  • MapReduce中,map和reduce函式遵循如下常規格式:

 map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)

  • Mapper的介面:

 protected void map(KEY key, VALUE value, Context context) 
    throws IOException, InterruptedException {  
}

  • Reduce的介面:

 protected void reduce(KEY key, Iterable<VALUE> values,
 Context context) throws IOException, InterruptedException { 
}

  • Mapreduce程式程式碼基本結構

 maprecue例項開發

2.1 程式設計步驟

使用者編寫的程式分成三個部分:Mapper,Reducer,Driver(提交執行mr程式的客戶端)

Mapper的輸入資料是KV對的形式(KV的型別可自定義)

Mapper的輸出資料是KV對的形式(KV的型別可自定義)

Mapper中的業務邏輯寫在map()方法中

map()方法(maptask程序)對每一個<K,V>呼叫一次

Reducer的輸入資料型別對應Mapper的輸出資料型別,也是KV

Reducer的業務邏輯寫在reduce()方法中

Reducetask程序對每一組相同k的<k,v>組呼叫一次reduce()方法

使用者自定義的Mapper和Reducer都要繼承各自的父類

整個程式需要一個Drvier來進行提交,提交的是一個描述了各種必要資訊的job物件

2.2 經典的wordcount程式編寫

需求:有一批檔案(規模為TB級或者PB級),如何統計這些檔案中所有單詞出現次數

 如有三個檔案,檔名是qfcourse.txt、qfstu.txt 和 qf_teacher

 qf_course.txt內容:

 php java linux
bigdata VR
C C++ java web
linux shell

 qf_stu.txt內容:

 tom jim lucy
lily sally
andy
tom jim sally

 qf_teacher內容:

 jerry Lucy tom
jim

方案

– 分別統計每個檔案中單詞出現次數 - map()

– 累加不同檔案中同一個單詞出現次數 - reduce()

實現程式碼

– 建立一個簡單的maven專案

– 新增hadoop client依賴的jar,pom.xml主要內容如下:

 <dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>

 

– 編寫程式碼

– 自定義一個mapper類

 import java.io.IOException;

  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Mapper;

  /**
   * Maper裡面的泛型的四個型別從左到右依次是:
   * 
   * LongWritable KEYIN: 預設情況下,是mr框架所讀到的一行文字的起始偏移量,Long,  類似於行號但是在hadoop中有自己的更精簡的序列化介面,所以不直接用Long,而用LongWritable 
   * Text VALUEIN:預設情況下,是mr框架所讀到的一行文字的內容,String,同上,用Text
   *
   * Text KEYOUT:是使用者自定義邏輯處理完成之後輸出資料中的key,在此處是單詞,String,同上,用Text
   * IntWritable VALUEOUT:是使用者自定義邏輯處理完成之後輸出資料中的value,在此處是單詞次數,Integer,同上,用IntWritable
   */
  public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

   /**
    * map階段的業務邏輯就寫在自定義的map()方法中
    * maptask會對每一行輸入資料呼叫一次我們自定義的map()方法
    */
   @Override
   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  
   //將maptask傳給我們的一行的文字內容先轉換成String
   String line = value.toString();
   //根據空格將這一行切分成單詞
   String[] words = line.split(" ");
  
   /**
    *將單詞輸出為<單詞,1> 
    *<lily,1> <lucy,1>  <c,1> <c++,1> <tom,1> 
    */
   for(String word:words){
   //將單詞作為key,將次數1作為value,以便於後續的資料分發,可以根據單詞分發,以便於相同單詞會到相同的reduce task
   context.write(new Text(word), new IntWritable(1));
   }
   }
  }

 

– 自定義一個reduce類

  import java.io.IOException;

  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Reducer;

  /**
   * Reducer裡面的泛型的四個型別從左到右依次是:
   *  Text KEYIN: 對應mapper輸出的KEYOUT
   *  IntWritable VALUEIN: 對應mapper輸出的VALUEOUT
   * 
   *  KEYOUT, 是單詞
   *  VALUEOUT 是自定義reduce邏輯處理結果的輸出資料型別,是總次數
   */
  public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

   /**
    * <tom,1>
    * <tom,1>
    * <linux,1>
    * <banana,1>
    * <banana,1>
    * <banana,1>
    * 入參key,是一組相同單詞kv對的key
    * values是若干相同key的value集合
    * 如 <tom,[1,1]>   <linux,[1]>   <banana,[1,1,1]>
    */
   @Override
   protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

   int count=0;  //累加單詞的出現的次數
  
   for(IntWritable value:values){
   count += value.get();
   }
   context.write(key, new IntWritable(count));
   }
  }

 

– 編寫一個Driver類

   import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  /**
   * 相當於一個yarn叢集的客戶端
   * 需要在此封裝我們的mr程式的相關執行引數,指定jar包
   * 最後提交給yarn
   */
  public class WordcountDriver {
   /**
    * 該類是執行在hadoop客戶端的,main一執行,yarn客戶端就啟動起來了,與yarn伺服器端通訊
    * yarn伺服器端負責啟動mapreduce程式並使用WordcountMapper和WordcountReducer類
    */
   public static void main(String[] args) throws Exception {
   //此程式碼需要兩個輸入引數  第一個引數支援要處理的原始檔;第二個引數是處理結果的輸出路徑
   if (args == null || args.length == 0) {
   args = new String[2];
             //路徑都是 hdfs系統的檔案路徑
   args[0] = "hdfs://192.168.18.64:9000/wordcount/input/";
   args[1] = "hdfs://192.168.18.64:9000/wordcount/output";
   }
   /**
    * 什麼也不設定時,如果在安裝了hadoop的機器上執行時,自動讀取
    * /home/hadoop/app/hadoop-2.7.1/etc/hadoop/core-site.xml
    * 檔案放入Configuration中
    */
   Configuration conf = new Configuration();
   Job job = Job.getInstance(conf);
  
   //指定本程式的jar包所在的本地路徑
   job.setJarByClass(WordcountDriver.class);
  
   //指定本業務job要使用的mapper業務類
   job.setMapperClass(WordcountMapper.class);
   //指定mapper輸出資料的kv型別
   job.setMapOutputKeyClass(Text.class);
   job.setMapOutputValueClass(IntWritable.class);
        
         //指定本業務job要使用的Reducer業務類
         job.setReducerClass(WordcountReducer.class);
   //指定最終輸出的資料的kv型別
   job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(IntWritable.class);
  
   //指定job的輸入原始檔案所在目錄
   FileInputFormat.setInputPaths(job, new Path(args[0]));
   //指定job的輸出結果所在目錄
   FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
   //將job中配置的相關引數,以及job所用的java類所在的jar包,提交給yarn去執行
   /*job.submit();*/
   boolean res = job.waitForCompletion(true);
   System.exit(res?0:1);
   }
  }

wordcount處理過程

將檔案拆分成splits,由於測試用的檔案較小,所以每個檔案為一個split,並將檔案按行分割形成<key,value>對,下圖所示。這一步由MapReduce框架自動完成,其中偏移量(即key值)包括了回車所佔的字元數(Windows/Linux環境不同)。

將分割好的<key,value>對交給使用者定義的map方法進行處理,生成新的<key,value>對,下圖所示。

得到map方法輸出的<key,value>對後,Mapper會將它們按照key值進行排序,並執行Combine過程,將key至相同value值累加,得到Mapper的最終輸出結果。下圖所示。

Reducer先對從Mapper接收的資料進行排序,再交由使用者自定義的reduce方法進行處理,得到新的<key,value>對,並作為WordCount的輸出結果,下圖所示。