1. 程式人生 > >基於堆實現的優先順序佇列:PriorityQueue 解決 Top K 問題

基於堆實現的優先順序佇列:PriorityQueue 解決 Top K 問題

2、應用:求 Top K 大/小 的元素

瞭解了優先佇列之後,我們再來看它的一個應用:

在面試的時候,問到演算法,Top k 的問題是經常被問到的,網上已有很多種方法可以解決,今天來看看如何使用 PriorityQueue 構造固定容量的優先佇列,模擬大頂堆,來解決 top K 小的問題。

/**   
 * @Title: FixSizedPriorityQueue.java 
 * @Package com.collonn.algorithm 
 * @Description: TODO(用一句話描述該檔案做什麼) 
 * @author zengfh  
 * @date 2014年11月24日 上午10:44:48 
 * @version V1.0   
 */
package com.collonn.algorithm;

import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.Random;

/** 
 * @ClassName: FixSizedPriorityQueue 
 * @Description: 固定容量的優先佇列,模擬大頂堆,用於解決求topN小的問題 
 * @author zengfh 
 * @date 2014年11月24日 上午10:44:48 
 *  
 */
public class FixSizedPriorityQueue <E extends Comparable<E>>{
	private PriorityQueue<E> queue;
	private int maxSize;// 堆的最大容量
	
	public FixSizedPriorityQueue(int maxSize) {
		if(maxSize <=0){
			throw new IllegalArgumentException();
		}
		this.maxSize = maxSize;
		this.queue =new PriorityQueue<E>(maxSize,new Comparator<E>() {
			// 生成最大堆使用o2-o1,生成最小堆使用o1-o2, 並修改 e.compareTo(peek) 比較規則
			public int compare(E o1, E o2) {				
				return(o2.compareTo(o1));
			}
		});
	}
	
	public void add(E e){
		if(queue.size() < maxSize) {// 未達到最大容量,直接新增
			queue.add(e);
		}else{// 佇列已滿
			E peek = queue.peek();
			if(e.compareTo(peek) <0) {// 將新元素與當前堆頂元素比較,保留較小的元素
				queue.poll();
				queue.add(e);
			}
		}
	}
	
	/** 
	 * @Title: main 
	 * @Description: TODO(這裡用一句話描述這個方法的作用) 
	 * @param @param args    設定檔案 
	 * @return void    返回型別 
	 * @throws 
	 */
	public static void main(String[] args) {
		// TODO 自動生成的方法存根
		final FixSizedPriorityQueue<Integer> pq =new FixSizedPriorityQueue<Integer>(10);
		Random random =new Random();
		int rNum =0;
		System.out.println("100 個 0~999 之間的隨機數:-----------------------------");
		for(int i =1; i <=100; i++) {
			rNum = random.nextInt(1000);
			System.out.print(rNum +", ");
			pq.add(rNum);
		}
		System.out.println();
		System.out.println("PriorityQueue 本身的遍歷是無序的:------------------------");
		Iterable<Integer> iter =new Iterable<Integer>() {
			@Override
			public Iterator<Integer> iterator() {
				// TODO 自動生成的方法存根
				return pq.queue.iterator();
			}
			
		};
		for(Integer item : iter) {
			System.out.print(item +", ");
		}
		System.out.println();
		System.out.println(pq.queue.toString());
		System.out.println("PriorityQueue 排序後的遍歷:--------------------------");
		// 直接用內建的 poll() 方法,每次取隊首元素(堆頂的最大值)
		while(!pq.queue.isEmpty()) {
			System.out.print(pq.queue.poll() +", ");
		}
	}

}

3、PriorityQueue在 hadoop 中的應用:

最後來聊下 “基於堆實現的優先順序佇列(PriorityQueue)” 在hadoop 中的應用:

在 hadoop 中,排序是 MapReduce 的靈魂,MapTask 和 ReduceTask 均會對資料按 Key 排序,這個操作是 MR 框架的預設行為,不管你的業務邏輯上是否需要這一操作。

MapReduce 框架中,用到的排序主要有兩種:快速排序基於堆實現的優先順序佇列

Mapper 階段:

從 map 輸出到環形緩衝區的資料會被排序(這是 MR 框架中改良的快速排序),這個排序涉及 partition 和 key,當緩衝區容量佔用 80%,會spill 資料到磁碟,生成 IFile 檔案,Map 結束後,會將 IFile 檔案排序合併成一個大檔案(基於堆實現的優先順序佇列),以供不同的 reduce 來拉取相應的資料。

Reducer 階段:

從 Mapper 端取回的資料已是部分有序,Reduce Task 只需進行一次歸併排序即可保證資料整體有序。為了提高效率,Hadoop 將 sort 階段和 reduce 階段並行化,在 sort 階段,Reduce Task 為記憶體和磁碟中的檔案建立了小頂堆,儲存了指向該小頂堆根節點的迭代器,並不斷的移動迭代器,以將 key 相同的資料順次交給 reduce()函式處理,期間移動迭代器的過程實際上就是不斷調整小頂堆的過程(建堆→取堆頂元素→重新建堆→取堆頂元素...),這樣,sort 和 reduce 可以並行進行。

瞭解了這個,你就明白為什麼之前有同學提到遍歷一遍 values 之後,值都不存在了,同時你也能更加理解之前提到的 二次排序。

在 hadoop 中,用到了這一資料結構的類主要有如下:(hadoop-0.20.203.0)

core/org/apache/hadoop/io/SequenceFile.java
hdfs/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java
mapred/org/apache/hadoop/mapred/join/JoinRecordReader.java
mapred/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
mapred/org/apache/hadoop/mapred/join/OverrideRecordReader.java
mapred/org/apache/hadoop/mapred/Merger.java
tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java

可以看到,這一資料結構,在 hadoop 中用的還是比較廣泛的。

需要說明的是,求 Top k,更簡單的方法可以直接用內建的 TreeMap 或者 TreeSet,這兩者是基於紅黑樹的一種資料結構,內部維持 key 的次序,但每次新增新元素,其排序的開銷要大於堆調整的開銷。例如要找最大的10個元素,那麼建立的是小根堆。小根堆的特性是根節點是最小元素。不需要對堆進行再排序,當堆的根節點被替換成新的元素時,需要進行堆化,以保持小根堆的特性。

4、REF:

hadoop技術內幕:task 執行過程分析:P199, P219