1. 程式人生 > >Hadoop之——MapReduce實戰(二)

Hadoop之——MapReduce實戰(二)

MapReduce的老api寫法

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
/**
 *  author liuyazhuang
 */
 public class HelloWorldApp {
	public static void main(String[] args) throws Exception {
		final JobConf job = new JobConf(HelloWorldApp.class);
		FileInputFormat.setInputPaths(job, new Path("/input"));
		FileOutputFormat.setOutputPath(job, new Path("/output"));
		JobClient.runJob(job);
	}
}

MapReduce獲取命令列引數

/**
 *  author liuyazhuang
 */
public class CacheFileApp extends Configured implements Tool{
	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new CacheFileApp(), args);  
	}
	@Override
	public int run(String[] args) throws Exception {
		//..............................
        	Path in = new Path(args[0]);  
		Path out = new Path(args[1]);  
          
		//..............................
	}
}

計數器

hadoop計數器:可以讓開發人員以全域性的視角來審查程式的執行情況以及各項指標,及時做出錯誤診斷並進行相應處理。

內建計數器(MapReduce相關、檔案系統相關和作業排程相關)

   ... 

也可以通過http://master:50030/jobdetails.jsp檢視

Helloyou, hello me的計數器資訊

Counters: 19
   File Output Format Counters 
     Bytes Written=19	//reduce輸出到hdfs的位元組數
   FileSystemCounters
     FILE_BYTES_READ=481
     HDFS_BYTES_READ=38
     FILE_BYTES_WRITTEN=81316
     HDFS_BYTES_WRITTEN=19
   File Input Format Counters 
     Bytes Read=19	//map從hdfs讀取的位元組數
   Map-Reduce Framework
     Map output materialized bytes=49
     Map input records=2  	//map讀入的記錄行數
     Reduce shuffle bytes=0
     Spilled Records=8
     Map output bytes=35
     Total committed heap usage (bytes)=266469376
     SPLIT_RAW_BYTES=105
     Combine input records=0
     Reduce input records=4	//reduce從map端接收的記錄行數
     Reduce input groups=3	//reduce函式接收的key數量,即歸併後的k2數量
     Combine output records=0
     Reduce output records=3	//reduce輸出的記錄行數
     Map output records=4	//map輸出的記錄行數

自定義計數器與實現

Context類呼叫方法getCounter()

計數器宣告

1.通過列舉宣告

context.getCounter(Enum enum)

2.動態宣告

context.getCounter(String groupName,StringcounterName) 

計數器操作

counter.setValue(long value);//設定初始值

counter.increment(longincr);//增加計數

Combiners程式設計

 每一個map可能會產生大量的輸出,combiner的作用就是在map端對輸出先做一次合併,以減少傳輸到reducer的資料量。

   combiner最基本是實現本地key的歸併,combiner具有類似本地的reduce功能。

    如果不用combiner,那麼,所有的結果都是reduce完成,效率會相對低下。使用combiner,先完成的map會在本地聚合,提升速度。

注意:Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。所以從我的想法來看,Combiner只應該用於那種Reduce的輸入key/value與輸出key/value型別完全一致,且不影響最終結果的場景。比如累加,最大值等。

Partitioner程式設計

  1. Partitioner是partitioner的基類,如果需要定製partitioner也需要繼承該類。

         2.  HashPartitioner是mapreduce的預設partitioner。計算方法是

which reducer=(key.hashCode() & Integer.MAX_VALUE)% numReduceTasks,得到當前的目的reducer。

         3.  (例子以jar形式執行)

排序和分組

       1在map和reduce階段進行排序時,比較的是k2。v2是不參與排序比較的。如果要想讓v2也進行排序,需要把k2和v2組裝成新的類,作為k2,才能參與比較。

       2分組時也是按照k2進行比較的
/**
	 * 為什麼有這個類?因為mapper端比較時只能比較k2,不能比較v2.如果想讓v2參與比較,必須參與到k2角色中。自定義該類,包含原來的k2和v2
	 * 在哪裡呼叫本來中的compareTo方法,在1.4中呼叫
	 */
	static class TwoInt implements WritableComparable<TwoInt>{
		long first;
		long second;
		
		public TwoInt(){}
		
		public TwoInt(long first, long second){
			this.first = first;
			this.second = second;
		}
		
		@Override
		public void readFields(DataInput arg0) throws IOException {
			this.first = arg0.readLong();
			this.second = arg0.readLong();
		}
		@Override
		public void write(DataOutput arg0) throws IOException {
			arg0.writeLong(first);
			arg0.writeLong(second);	
		}
		@Override
		public int hashCode() {
			return (this.first+"").hashCode()+(this.second+"").hashCode();
		}
		
		@Override
		public boolean equals(Object obj) {
			if(obj instanceof TwoInt){
				TwoInt ob = (TwoInt)obj;
				return (this.first==ob.first && this.second==ob.second)?true:false;
			}else{
				return false;
			}
		}
		
		@Override
		public int compareTo(TwoInt o) {
			if(this.first!=o.first){
				return (int)(this.first-o.first);
			}else{
				return (int)(this.second-o.second);
			}
		}
	}
/**
	 * 分組時比較採用的比較器。比較的是原來的k2
	 *
	 */
	static class GroupComparator implements RawComparator<TwoInt>{
		@Override
		public int compare(TwoInt o1, TwoInt o2) {
			return (int)(o1.first-o2.first);
		}
		/**
		 * arg0表示第一個位元組陣列
		 * arg1表示第一個位元組陣列的參與比較的開始位置
		 * arg3表示第二個位元組陣列
		 * arg4表示第二個位元組陣列的參與比較的開始位置
		 */
		@Override
		public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
				int arg4, int arg5) {
			return WritableComparator.compareBytes(arg0, arg1, 8, arg3, arg4, 8);
		}
	}

Shuffle

  1. 每個map有一個環形記憶體緩衝區,用於儲存任務的輸出。預設大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spill.percent),一個後臺執行緒把內容寫到(spill)磁碟的指定目錄(mapred.local.dir)下的新建的一個溢位寫檔案。
  2. 寫磁碟前,要partition,sort。如果有combiner,combine排序後資料。
  3. 等最後記錄寫完,合併全部溢位寫檔案為一個分割槽且排序的檔案。
  4. Reducer通過Http方式得到輸出檔案的分割槽。
  5. TaskTracker為分割槽檔案執行Reduce任務。複製階段把Map輸出複製到Reducer的記憶體或磁碟。一個Map任務完成,Reduce就開始複製輸出。排序階段合併map輸出。然後走Reduce階段

hadoop的壓縮codec

Codec為壓縮,解壓縮的演算法實現。在Hadoop中,codec由CompressionCode的實現來表示。下面是一些實現:

MapReduce的輸出進行壓縮

輸出的壓縮屬性

MapReduce常見演算法

  • 單詞計數
  • 資料去重
  • 排序
  • Top K
  • 選擇
  • 投影
  • 分組
  • 多表連線
  • 單表關聯