1. 程式人生 > >Python中heapq與優先佇列【詳細】

Python中heapq與優先佇列【詳細】

本文始發於個人公眾號:TechFlow, 原創不易,求個關注


今天的文章來介紹Python當中一個蠻有用的庫——heapq。

heapq的全寫是heap queue,是堆佇列的意思。這裡的堆和佇列都是資料結構,在後序的文章當中我們會詳細介紹,今天只介紹heapq的用法,如果不瞭解heap和queue原理的同學可以忽略,我們並不會深入太多,會在之後的文章裡詳細闡述。

在介紹用法之前,我們需要先知道優先佇列的定義。佇列大家應該都不陌生,也是非常基礎簡單的資料結構。我們可以想象成佇列裡的所有元素排成一排,新的元素只能從隊尾加入佇列,元素要出佇列只能通過隊首,不能中途從隊列當中退出。而優先佇列呢,是給隊列當中的元素每一個都設定了優先順序,使得隊伍當中的元素會自動按照優先順序排序,優先順序高的排在前面。

也就是說Python當中的heapq就是一個維護優先佇列的library,我們通過呼叫它可以輕鬆實現優先佇列的功能。


最大或最小的K個元素


我們來看一個實際的問題,假設我們當下有N個雜亂無章的元素,但是我們只關心其中最大的K個或者是最小的K個元素。我們想從整個陣列當中將這部分抽取出來,應該怎麼辦呢?

這個問題在實際當中非常常見,隨便就可以舉出例子來。比如使用者輸入了搜尋詞,我們根據使用者的搜尋詞找到了大量的內容。我們想要根據演算法篩選出使用者最有可能點選的文字來,機器學習的模型可以給每一個文字一個預測的分數。之後,我們就需要選出分數最大的K個結果。這種類似的場景還有很多,利用heapq庫裡的nlargest和nsmallest介面可以非常方便地做到這點。

我們一起來看一個例子:

import heapq

nums = [14, 20, 5, 28, 1, 21, 16, 22, 17, 28]
heapq.nlargest(3, nums)
# [28, 28, 22]
heapq.nsmallest(3, nums)
# [1, 5, 14]

heapq的nlargest和nsmallest接受兩個引數,第一個引數是K,也就是返回的元素的數量,第二個引數是傳入的陣列,heapq返回的正是傳入的陣列當中的前K大或者是前K小。

這裡有一個問題,如果我們陣列當中的元素是一個物件呢?應該怎麼辦?

其實也很簡單,有了解過Python自定義關鍵詞排序的同學應該知道,和排序一樣,我們可以通過匿名函式實現。


匿名函式


我們都知道,在Python當中通過def可以定義一個函式。通過def定義的函式都有函式名,所以稱為有名函式。除了有名函式之外,Python還支援匿名函式。顧名思義,就是沒有函式名的函式。也就是說它其他方面都和普通函式一樣,只不過沒有名字而已。

初學者可能會納悶,函式沒有名字應該怎麼呼叫呢?

會有這個疑惑很正常,這是因為習慣了面向過程的程式設計,對面向物件理解不夠深入導致的。在許多高階語言當中,一切皆物件,一個類,一個函式,一個int都是物件。既然函式也是物件,那麼函式自然也可以用來傳遞,不僅可以用來傳遞,還可以用來返回。這是函數語言程式設計的概念了,我們這裡不多做深入。

當然,普通函式也一樣可以傳遞,起到的效果一樣。只不過在程式設計當中,有些函式我們只會使用一次,沒必要再單獨定義一個函式,使用匿名函式會非常方便。

舉個例子,比方說我有一個這樣的函式:

def operate(x, func):
  return func(x)

這個operate函式它接受兩個引數,第一個引數是變數x,第二個引數是一個函式。它會在函式內部呼叫func,返回func呼叫的結果。我現在要做這樣一件事情,我希望根據x這個整數對4取餘的餘數來判斷應該用什麼樣的func。如果對4的餘數為0,我希望求一次方,如果餘數是2,我希望求平方,以此類推。如果按照正常的方法,我們需要實現4個方法,然後依次傳遞。

這當然是可以的,不過非常麻煩,如果使用匿名函式,就可以大大簡化程式碼量:

def get_result(x):
  if x % 4 == 0:
    return operate(x, lambda x: x)
  elif x % 4 == 1:
    return operate(x, lambda x: x ** 2)
  elif x % 4 == 2:
    return operate(x, lambda x: x ** 3)
  else:
    return operate(x, lambda x: x ** 4)

在上面的程式碼當中,我們通過lambda關鍵字定義了匿名函式,避免了定義四種函式用來傳遞的情況。當然,這個問題還有更簡單的寫法,可以只用一個函式解決。

我們來看lambda定義匿名函式的語法,首先是lambda關鍵字,表示我們當下定義的是一個匿名函式。之後跟的是這個匿名函式的引數,我們只用到一個變數x,所以只需要寫一個x。如果我們需要用到多個引數,通過逗號分隔,當然也可以不用引數。寫完引數之後,我們用冒號分開,冒號後面寫的是返回的結果。

我們也可以把匿名函式賦值給一個變數,之後我們就可以和呼叫普通函式一樣來呼叫了:

square = lambda x: x ** 2

print(square(3))
print(operate(3, square))


自定義排序


回到之前的內容,如果我們想要heapq排序的是一個物件。那麼heapq並不知道應該依據物件當中的哪個引數來作為排序的衡量標準,所以這個時候,需要我們自己定義一個獲取關鍵字的函式,傳遞給heapq,這樣才可以完成排序。

比如說,我們現在有一批電腦,我們希望heapq能夠根據電腦的價格排序:

laptops = [
    {'name': 'ThinkPad', 'amount': 100, 'price': 91.1},
    {'name': 'Mac', 'amount': 50, 'price': 543.22},
    {'name': 'Surface', 'amount': 200, 'price': 21.09},
    {'name': 'Alienware', 'amount': 35, 'price': 31.75},
    {'name': 'Lenovo', 'amount': 45, 'price': 16.35},
    {'name': 'Huawei', 'amount': 75, 'price': 115.65}
]

cheap = heapq.nsmallest(3, portfolio, key=lambda s: s['price'])
expensive = heapq.nlargest(3, portfolio, key=lambda s: s['price'])

在呼叫nlargest和nsmallest的時候,我們額外傳遞了一個引數key,我們傳入的是一個匿名函式,它返回的結果是這個物件的price,也就是說我們希望heapq根據物件的price來進行排序。


優先佇列


heapq除了可以返回最大最小的K個數之外,還實現了優先佇列的介面。我們可以直接呼叫heapq.heapify方法,輸入一個數組,返回的結果是根據這個陣列生成的堆(等價於優先佇列)。

當然我們也可以從零開始,直接通過呼叫heapq的push和pop來維護這個堆。接下來,我們就通過heapq來自己動手實現一個優先佇列,程式碼非常的簡單,我想大家應該可以瞬間學會。

首先是實現優先佇列的部分:

import heapq

class PriorityQueue:
  
  def __init__(self):
    self._queue = []
    self._index =0
    
  def push(self, item, priority):
    # 傳入兩個引數,一個是存放元素的陣列,另一個是要儲存的元素,這裡是一個元組。
    # 由於heap內部預設有小到大排,所以對priority取負數
    heapq.heappush(self._queue, (-priority, self._index, item))
    self._index += 1
  
  def pop(self):
    return heapq.heappop(self._queue)[-1]

其次我們來實際看一下運用的情況:

q = PriorityQueue()

q.push('lenovo', 1)
q.push('Mac', 5)
q.push('ThinkPad', 2)
q.push('Surface', 3)

q.pop()
# Mac
q.pop()
# Surface

到這裡,關於heapq的應用方面就算是介紹完了,但是還沒有真正的結束。

我們需要分析一下heapq當中操作的複雜度,關於堆的部分我們暫時跳過,我們先來看nlargest和nsmallest。我在github當中找到了這個庫的原始碼,在方法的註釋上,作者寫下了這個方法的複雜度,和排序之後取前K個開銷五五開:

def nlargest(n, iterable, key=None):
    """Find the n largest elements in a dataset.

    Equivalent to:  sorted(iterable, key=key, reverse=True)[:n]
    """

我們都知道排序的複雜度的期望是\(O(nlogn)\),如果你瞭解堆的話,會知道堆一次插入元素的複雜度是\(logn\)。如果我們限定堆的長度是K,我們插入n次之後也只能保留K個元素。每次插入的複雜度是\(logK\),一共插入n次,所以整體的複雜度是\(nlogK\)。

如果K小一些,可能開銷會比排序稍小,但是程度有限。那麼有沒有什麼辦法可以不用排序並且儘可能快地篩選出前K大或者是前K小的元素呢?

我這裡先賣個關子,我們之後的文章當中再來講解。

今天的文章就到這裡,如果覺得有所收穫,請順手點個關注吧,你的舉手之勞對我很重要。

參考資料

Python CookBook Version3

維基百科