基於堆實現的優先順序佇列:PriorityQueue 解決 Top K 問題
2、應用:求 Top K 大/小 的元素
瞭解了優先佇列之後,我們再來看它的一個應用:
在面試的時候,問到演算法,Top k 的問題是經常被問到的,網上已有很多種方法可以解決,今天來看看如何使用 PriorityQueue 構造固定容量的優先佇列,模擬大頂堆,來解決 top K 小的問題。
/** * @Title: FixSizedPriorityQueue.java * @Package com.collonn.algorithm * @Description: TODO(用一句話描述該檔案做什麼) * @author zengfh * @date 2014年11月24日 上午10:44:48 * @version V1.0 */ package com.collonn.algorithm; import java.util.Comparator; import java.util.Iterator; import java.util.PriorityQueue; import java.util.Random; /** * @ClassName: FixSizedPriorityQueue * @Description: 固定容量的優先佇列,模擬大頂堆,用於解決求topN小的問題 * @author zengfh * @date 2014年11月24日 上午10:44:48 * */ public class FixSizedPriorityQueue <E extends Comparable<E>>{ private PriorityQueue<E> queue; private int maxSize;// 堆的最大容量 public FixSizedPriorityQueue(int maxSize) { if(maxSize <=0){ throw new IllegalArgumentException(); } this.maxSize = maxSize; this.queue =new PriorityQueue<E>(maxSize,new Comparator<E>() { // 生成最大堆使用o2-o1,生成最小堆使用o1-o2, 並修改 e.compareTo(peek) 比較規則 public int compare(E o1, E o2) { return(o2.compareTo(o1)); } }); } public void add(E e){ if(queue.size() < maxSize) {// 未達到最大容量,直接新增 queue.add(e); }else{// 佇列已滿 E peek = queue.peek(); if(e.compareTo(peek) <0) {// 將新元素與當前堆頂元素比較,保留較小的元素 queue.poll(); queue.add(e); } } } /** * @Title: main * @Description: TODO(這裡用一句話描述這個方法的作用) * @param @param args 設定檔案 * @return void 返回型別 * @throws */ public static void main(String[] args) { // TODO 自動生成的方法存根 final FixSizedPriorityQueue<Integer> pq =new FixSizedPriorityQueue<Integer>(10); Random random =new Random(); int rNum =0; System.out.println("100 個 0~999 之間的隨機數:-----------------------------"); for(int i =1; i <=100; i++) { rNum = random.nextInt(1000); System.out.print(rNum +", "); pq.add(rNum); } System.out.println(); System.out.println("PriorityQueue 本身的遍歷是無序的:------------------------"); Iterable<Integer> iter =new Iterable<Integer>() { @Override public Iterator<Integer> iterator() { // TODO 自動生成的方法存根 return pq.queue.iterator(); } }; for(Integer item : iter) { System.out.print(item +", "); } System.out.println(); System.out.println(pq.queue.toString()); System.out.println("PriorityQueue 排序後的遍歷:--------------------------"); // 直接用內建的 poll() 方法,每次取隊首元素(堆頂的最大值) while(!pq.queue.isEmpty()) { System.out.print(pq.queue.poll() +", "); } } }
3、PriorityQueue在 hadoop 中的應用:
最後來聊下 “基於堆實現的優先順序佇列(PriorityQueue)” 在hadoop 中的應用:
在 hadoop 中,排序是 MapReduce 的靈魂,MapTask 和 ReduceTask 均會對資料按 Key 排序,這個操作是 MR 框架的預設行為,不管你的業務邏輯上是否需要這一操作。
MapReduce 框架中,用到的排序主要有兩種:快速排序和基於堆實現的優先順序佇列。
Mapper 階段:
從 map 輸出到環形緩衝區的資料會被排序(這是 MR 框架中改良的快速排序),這個排序涉及 partition 和 key,當緩衝區容量佔用 80%,會spill 資料到磁碟,生成 IFile 檔案,Map 結束後,會將 IFile 檔案排序合併成一個大檔案(基於堆實現的優先順序佇列),以供不同的 reduce 來拉取相應的資料。
Reducer 階段:
從 Mapper 端取回的資料已是部分有序,Reduce Task 只需進行一次歸併排序即可保證資料整體有序。為了提高效率,Hadoop 將 sort 階段和 reduce 階段並行化,在 sort 階段,Reduce Task 為記憶體和磁碟中的檔案建立了小頂堆,儲存了指向該小頂堆根節點的迭代器,並不斷的移動迭代器,以將 key 相同的資料順次交給 reduce()函式處理,期間移動迭代器的過程實際上就是不斷調整小頂堆的過程(建堆→取堆頂元素→重新建堆→取堆頂元素...),這樣,sort 和 reduce 可以並行進行。
瞭解了這個,你就明白為什麼之前有同學提到遍歷一遍 values 之後,值都不存在了,同時你也能更加理解之前提到的 二次排序。
在 hadoop 中,用到了這一資料結構的類主要有如下:(hadoop-0.20.203.0)
core/org/apache/hadoop/io/SequenceFile.java
hdfs/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java
mapred/org/apache/hadoop/mapred/join/JoinRecordReader.java
mapred/org/apache/hadoop/mapred/join/MultiFilterRecordReader.java
mapred/org/apache/hadoop/mapred/join/OverrideRecordReader.java
mapred/org/apache/hadoop/mapred/Merger.java
tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java
可以看到,這一資料結構,在 hadoop 中用的還是比較廣泛的。
需要說明的是,求 Top k,更簡單的方法可以直接用內建的 TreeMap 或者 TreeSet,這兩者是基於紅黑樹的一種資料結構,內部維持 key 的次序,但每次新增新元素,其排序的開銷要大於堆調整的開銷。例如要找最大的10個元素,那麼建立的是小根堆。小根堆的特性是根節點是最小元素。不需要對堆進行再排序,當堆的根節點被替換成新的元素時,需要進行堆化,以保持小根堆的特性。
4、REF:
hadoop技術內幕:task 執行過程分析:P199, P219