堆的python實現及其應用 資料結構--堆的實現之深入分析
堆的概念
優先佇列(priority queue)是一種特殊的佇列,取出元素的順序是按照元素的優先權(關鍵字)大小,而不是進入佇列的順序,堆就是一種優先佇列的實現。堆一般是由陣列實現的,邏輯上堆可以被看做一個完全二叉樹(除底層元素外是完全充滿的,且底層元素是從左到右排列的)。
堆分為最大堆和最小堆,最大堆是指每個根結點的值大於左右孩子的節點值,最小堆則是根結點的值小於左右孩子的值。
下面就開始用python實現一個小根堆。
最小堆的實現
堆的架構:
堆的核心操作就是插入和刪除堆頂元素,除了這些還有一些不常用的其他方法,具體架構如下:
class BinaryHeap(object):"""一個二叉堆, 小頂堆 利用列表實現""" def __init__(self, max_size=math.inf): pass def __len__(self): """求長度""" pass def insert(self, *data): """向堆中插入元素""" pass def _siftup(self): """最後插入的元素上浮""" pass def _siftdown(self, idx):"""序號為i的元素下沉""" pass def get_min(self): pass def delete_min(self): """刪除堆頂元素""" pass def create_heap(self, data): """直接建立一個小頂堆, 接收一個可迭代物件引數,效果同insert, 效率比insert一個個插入高,時間複雜度為n""" pass def clear(self): """清空堆""" pass defupdate_key(self, idx, key): """更新指定位置的元素, idx>=1""" pass def delete_key(self, idx): """刪除指定位置的元素, idx>=1""" pass
1. 建立空堆物件和求堆的元素數量
下面初始化時預設是一個大小不限的堆,還為列表的0位置建立了一個取值無限小的哨兵,一個是保證插入的值一定比它小,在插入時減少了一次判斷,另一個就是讓元素下標從1開始。第二個方法實現雙下len方法可以統一通過len()檢視元素個數。
def __init__(self, max_size=math.inf): self._heap = [-math.inf] # 初始值設定一個無限大的哨兵 self.max_size = max_size def __len__(self): """求長度""" return len(self._heap) - 1
2.元素的插入
元素的插入,本質就是一個元素上浮的過程。先新增到列表的末尾,然後向上調整插入到合適的位置以維護最小堆的特性。由完全二叉樹的性質,根結點序號為1的堆性質是父結點的序號是子節點的1/2,依照如此,程式碼就很容易寫了。下面設定的是可以插入多個元素或是接收一個可迭代物件為引數進行插入。
def insert(self, *data): """向堆中插入元素""" if isinstance(data[0], Iterable): if len(data) > 1: print("插入失敗...第一個引數可迭代物件時引數只能有一個") return data = data[0] if not len(self) + len(data) < self.max_size: print("堆已滿, 插入失敗") return for x in data: self._heap.append(x) self._siftup() print("插入成功") def _siftup(self): """最後插入的元素上浮""" pos = len(self) # 插入的位置 x = self._heap[-1] # 獲取最後一個位置元素 while x < self._heap[pos >> 1]: # 此處可以體現出哨兵的作用了, 當下標為0時, x一定小於inf self._heap[pos] = self._heap[pos >> 1] pos >>= 1 self._heap[pos] = x
3. 元素的刪除
堆頂元素的刪除本質就是一個元素下沉的過程。如上圖要刪除0,先找到最後一個元素,並代替堆頂元素,然後調整其位置。具體過程是左右孩子比大小,然後用堆頂元素和這個小的值比,如果堆頂元素大於小的孩子則用小的孩子代替當前堆頂元素,否則當前位置就是合適的位置。迴圈以上操作到最後一個元素,也就維護了最小堆的特性。
def delete_min(self): """刪除堆頂元素""" if not len(self): print("堆為空") return _min = self._heap[1] last = self._heap.pop() if len(self): # 為空了就不需要向下了 self._heap[1] = last self._siftdown(1) return _min def _siftdown(self, idx): """序號為i的元素下沉""" temp = self._heap[idx] length = len(self) while 1: child_idx = idx << 1 if child_idx > length: break if child_idx != length and self._heap[child_idx] > self._heap[child_idx + 1]: child_idx += 1 if temp > self._heap[child_idx]: self._heap[idx] = self._heap[child_idx] else: break idx = child_idx self._heap[idx] = temp
4. 堆的快速建立
堆的建立可以用上面的insert方法一個一個建立,如此花費的時間是o(nlogn), 但是也可以直接建立,然後調整堆,最後的時間是o(n),圖示如下所示
由上圖瞭解可以直接對一個亂序的列表進行調整。當一顆完全二叉樹的所有子樹都是一個最小堆時,那麼這顆樹也就是最小堆了。因此從最後一個非葉結點開始調整,葉結點本身就是最小堆,不用調整,可以直接跳過。根據完全二叉樹的性質,最後一個非葉結點序號是最後一個元素的序號除以2。在上面的圖中,從結點8開始調整,每一次調整就是一次元素的下沉操作,一直到堆頂結束。程式碼實現很簡單,如下所示。
def create_heap(self, data): """直接建立一個小頂堆, 接收一個可迭代物件引數,效果同insert, 效率比insert一個個插入高,時間複雜度為n""" self._heap.extend(data) for idx in range(len(self) // 2, 0, -1): self._siftdown(idx)
5. 堆的其他操作
5.1 獲取堆頂元素
def get_min(self): if not len(self): print("堆為空") return self._heap[1]
5.2 堆的清空
def clear(self): """清空堆""" self._heap = [-math.inf] # 初始值設定一個無限小的哨兵
5.3 更新指定位置的元素
def update_key(self, idx, key): """更新指定位置的元素, idx>=1""" if idx > len(self) or idx < 1: print("索引超出堆的數量或小於1") return self._heap[idx] = key self._siftdown(idx)
5.4 刪除指定位置的元素
def delete_key(self, idx): """刪除指定位置的元素, idx>=1""" if idx > len(self) or idx < 1: print("索引超出堆的數量或小於1") return x = self._heap.pop() # 取出最後一個元素代替, 保持完全二叉樹, 然後調整到合適位置 if len(self): self._heap[idx] = x self._siftdown(idx)
6.堆的應用
6.1 堆排序
堆的應用之一就是堆排序,先把待排序的資料放入堆中,然後一個個刪除,整體效率為o(nlogn)
# 堆的應用之一, 堆排序 def heap_sort(data, reverse=False): """接受一個可迭代物件進行排序, 預設從小到大排序, 返回一個列表""" heap = BinaryHeap() # 新建一個堆 heap.create_heap(data) lst = [] for i in range(len(heap)): lst.append(heap.delete_min()) if reverse: return lst[::-1] return lst if __name__ == '__main__':print(heap_sort([1, 4, 56, 2, 5, 9, 1, 0, 0, 4], reverse=True)) print(heap_sort('helloworld'))
輸出如下
[56, 9, 5, 4, 4, 2, 1, 1, 0, 0] ['d', 'e', 'h', 'l', 'l', 'l', 'o', 'o', 'r', 'w']
6.2 獲取n個數中前k大的數
解決這個問題方法很多,可以利用各種排序演算法排序,然後取前k個,下面一個方法運用了最小堆的方法,把問題轉化成了從求前k大的數變成了求第k大的數什麼的問題。思想就是用前k個數建立一個k大小的最小堆,然後用剩下的數依次去和堆頂比,比堆頂小就捨棄,否則則用剛才的數代替堆頂,然後重新維護更新最小堆。
# 堆的應用之二, 查詢n個數中前K大的數, 效率為o(nlogk) def get_big_nums(data, k): """獲取前k個最大的數""" heap = BinaryHeap(k) heap.create_heap(data[:k]) # 用前k個數建立一個小頂堆, 堆頂即是第k大的數 for d in data[k:]: if heap.get_min() < d: heap.update_key(1, d) lst = [] # 獲取堆裡的元素 for i in range(k): lst.append(heap.delete_min()) return lst if __name__ == '__main__': print(get_big_nums([1, 2, 5, 2, 4, 10, 7, 1, 3, 5, 9], 3)) print("*" * 30) print(get_big_nums([0.1, 2, -1, 89, 67, 13, 55, 54.4, 67], 5))
輸出如下
[7, 9, 10] ****************************** [54.4, 55, 67, 67, 89]
下面寫一個在程式設計之美書裡介紹的實現求前k大的數的方法,與堆無關。整體思想和快速排序的思路一樣,把待求的資料通過一個key分割成兩組,一組大於等於key的列表da,一組小於key的列表db。
此時有幾種情況:
1. k<=0時, 資料已經找到了,直接返回[]
2. len(da)<=k時返回,然後在小的一組db尋找k-len(da)個剩下的數
3. 分割後len(da) > k,這時就繼續分割這個列表
如此遞迴最後返回的就是前k大的數,但是這種方法返回的數並未排序。
def partition(data): key = data[0] da = [] # 儲存大於等於key的值 db = [] # 儲存小於key的值 for d in data[1:]: da.append(d) if d >= key else db.append(d) da.append(key) if len(da) < len(db) else db.append(key) # 此處是把關鍵key放入長度小的列表中,使得分組均勻 return da, db def get_k_big_data(data, k): """利用快速排序的思想尋找前k大的數, 但是獲得的數並未排序""" if k <= 0: return [] elif len(data) <= k: return data else: da, db = partition(data) if len(da) > k: return get_k_big_data(da, k) else: return get_k_big_data(da, k) + (get_k_big_data(db, k - len(da))) if __name__ == '__main__': print(get_k_big_data([5, 2, 5, 2, 4, 10, 7, 1, 3, 5, 9], 5)) print("*" * 30) print(get_k_big_data([1, 3, 5, 3, 2, 6, 0, 9], 3))
控制檯輸出:
[5, 10, 7, 5, 9] ****************************** [6, 9, 5]
最後附上完整程式碼
1 import math 2 import pygraphviz as pgv 3 from collections import Iterable 4 5 6 class BinaryHeap(object): 7 """一個二叉堆, 小頂堆 利用列表實現""" 8 9 def __init__(self, max_size=math.inf): 10 self._heap = [-math.inf] # 初始值設定一個無限大的哨兵 11 self.max_size = max_size 12 13 def __len__(self): 14 """求長度""" 15 return len(self._heap) - 1 16 17 def insert(self, *data): 18 """向堆中插入元素""" 19 if isinstance(data[0], Iterable): 20 if len(data) > 1: 21 print("插入失敗...第一個引數可迭代物件時引數只能有一個") 22 return 23 data = data[0] 24 if not len(self) + len(data) < self.max_size: 25 print("堆已滿, 插入失敗") 26 return 27 for x in data: 28 self._heap.append(x) 29 self._siftup() 30 print("插入成功") 31 32 def _siftup(self): 33 """最後插入的元素上浮""" 34 pos = len(self) # 插入的位置 35 x = self._heap[-1] # 獲取最後一個位置元素 36 while x < self._heap[pos >> 1]: # 此處可以體現出哨兵的作用了, 當下標為0時, x一定小於inf 37 self._heap[pos] = self._heap[pos >> 1] 38 pos >>= 1 39 self._heap[pos] = x 40 41 def _siftdown(self, idx): 42 """序號為i的元素下沉""" 43 temp = self._heap[idx] 44 length = len(self) 45 while 1: 46 child_idx = idx << 1 47 if child_idx > length: 48 break 49 if child_idx != length and self._heap[child_idx] > self._heap[child_idx + 1]: 50 child_idx += 1 51 if temp > self._heap[child_idx]: 52 self._heap[idx] = self._heap[child_idx] 53 else: 54 break 55 idx = child_idx 56 self._heap[idx] = temp 57 58 def show_heap(self): 59 """除錯用,打印出陣列資料""" 60 print(self._heap[1:]) 61 62 def draw(self, filename='./heap.png'): 63 """除錯用,生成直觀二叉樹的圖片檔案""" 64 g = pgv.AGraph(strict=False, directed=True) 65 g.node_attr['shape'] = 'circle' 66 idx = 1 67 length = len(self) 68 idx_length = pow(2, int(math.log(length, 2))) - 1 69 while idx <= idx_length: 70 if idx << 1 <= length: 71 g.add_edge(self._heap[idx], self._heap[idx << 1]) 72 if (idx << 1) + 1 <= length: 73 g.add_edge(self._heap[idx], self._heap[(idx << 1) + 1]) 74 else: 75 g.add_node(self._heap[idx]) 76 idx += 1 77 g.layout('dot') 78 g.draw(filename) 79 80 def get_min(self): 81 if not len(self): 82 print("堆為空") 83 return self._heap[1] 84 85 def delete_min(self): 86 """刪除堆頂元素""" 87 if not len(self): 88 print("堆為空") 89 return 90 _min = self._heap[1] 91 last = self._heap.pop() 92 if len(self): # 為空了就不需要向下了 93 self._heap[1] = last 94 self._siftdown(1) 95 return _min 96 97 def create_heap(self, data): 98 """直接建立一個小頂堆, 接收一個可迭代物件引數,效果同insert, 效率比insert一個個插入高,時間複雜度為n""" 99 self._heap.extend(data) 100 for idx in range(len(self) // 2, 0, -1): 101 self._siftdown(idx) 102 103 def clear(self): 104 """清空堆""" 105 self._heap = [-math.inf] # 初始值設定一個無限大的哨兵 106 107 def update_key(self, idx, key): 108 """更新指定位置的元素, idx>=1""" 109 if idx > len(self) or idx < 1: 110 print("索引超出堆的數量或小於1") 111 return 112 self._heap[idx] = key 113 self._siftdown(idx) 114 115 def delete_key(self, idx): 116 """刪除指定位置的元素, idx>=1""" 117 if idx > len(self) or idx < 1: 118 print("索引超出堆的數量或小於1") 119 return 120 x = self._heap.pop() # 取出最後一個元素代替, 保持完全二叉樹, 然後調整到合適位置 121 if not len(self): 122 self._heap[idx] = x 123 self._siftdown(idx) 124完整程式碼
參考:
- 資料結構--堆的實現之深入分析
- 啊哈演算法第7章第3節