一句話介紹python執行緒、程序和協程
一、程序:
Python的os
模組封裝了常見的系統呼叫,其中就包括fork。而fork是linux常用的產生子程序的方法,簡言之是一個呼叫,兩個返回。
在python中,以下的兩個模組用於程序的使用。詳細就不展開。
multiprocessing:跨平臺版本的多程序模組。
Pool:程序池
Queue
、Pipes:程序通訊
二、執行緒:
嚴格意義上,python的多執行緒屬於偽多執行緒,因為受限於GIL,python的多執行緒每次只能執行一個,按流水線方式執行所有任務。
threading:高階建立執行緒模組
threading.Lock(): lock.acquire()獲取 lock.release()釋放
三、ThreadLocal
定義全域性變數,每個thread對他都有讀寫操作,但是該全域性變數的屬性值是每個thread的區域性變數,不同thread中的區域性變數不能互相修改。
計算密集型 vs. IO密集型
受限於GIL,python的多執行緒屬於偽執行緒,即是每個cpu一次只能執行一個執行緒。
計算密集型:多程序
IO密集型:多執行緒,比如爬蟲,時間多花費在io操作上
四、分散式程序
Python的multiprocessing
模組不但支援多程序,其中managers
子模組還支援把多程序分佈到多臺機器上。一個服務程序可以作為排程者,將任務分佈到其他多個程序中,依靠網路通訊。
服務程序負責啟動Queue
,把Queue
註冊到網路上,然後往Queue
裡面寫入任務:
# taskmanager.py import random, time, Queue from multiprocessing.managers import BaseManager # 傳送任務的佇列: task_queue = Queue.Queue() # 接收結果的佇列: result_queue = Queue.Queue() # 從BaseManager繼承的QueueManager: class QueueManager(BaseManager): pass # 把兩個Queue都註冊到網路上, callable引數關聯了Queue物件:QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 繫結埠5000, 設定驗證碼'abc': manager = QueueManager(address=('', 5000), authkey='abc') # 啟動Queue: manager.start() # 獲得通過網路訪問的Queue物件: task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務進去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 從result佇列讀取結果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 關閉: manager.shutdown()
本機上啟動或另一臺機子上啟動:
# taskworker.py import time, sys, Queue from multiprocessing.managers import BaseManager # 建立類似的QueueManager: class QueueManager(BaseManager): pass # 由於這個QueueManager只從網路上獲取Queue,所以註冊時只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 連線到伺服器,也就是執行taskmanager.py的機器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 埠和驗證碼注意保持與taskmanager.py設定的完全一致: m = QueueManager(address=(server_addr, 5000), authkey='abc') # 從網路連線: m.connect() # 獲取Queue的物件: task = m.get_task_queue() result = m.get_result_queue() # 從task佇列取任務,並把結果寫入result佇列: for i in range(10): try: n = task.get(timeout=1) print('run task %d * %d...' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') # 處理結束: print('worker exit.')
工作如圖:
注意Queue的作用是用來傳遞任務和接收結果,每個任務的描述資料量要儘量小。比如傳送一個處理日誌檔案的任務,就不要傳送幾百兆的日誌檔案本身,而是傳送日誌檔案存放的完整路徑,由Worker程序再去共享的磁碟上讀取檔案。
四、協程
協程看上去也是子程式,但執行過程中,在子程式內部可中斷,然後轉而執行別的子程式,在適當的時候再返回來接著執行。
第一最大的優勢就是協程極高的執行效率。
第二大優勢就是不需要多執行緒的鎖機制,因為只有一個執行緒,也不存在同時寫變數衝突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多執行緒高很多。
import time def consumer(): r = '' while True: n = yield r ## if not n: return print('[CONSUMER] Consuming %s...' % n) time.sleep(1) r = '200 OK' def produce(c): c.next() ##執行一次生成 n = 0 while n < 5: n = n + 1 print('[PRODUCER] Producing %s...' % n) r = c.send(n) ##傳給consumer,轉進consumer的yield裡面 print('[PRODUCER] Consumer return: %s' % r) c.close() if __name__=='__main__': c = consumer() ##生成器 produce(c)
[PRODUCER] Producing 1... [CONSUMER] Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 2... [CONSUMER] Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 3... [CONSUMER] Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 4... [CONSUMER] Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] Producing 5... [CONSUMER] Consuming 5... [PRODUCER] Consumer return: 200 OK
注意到consumer函式是一個generator(生成器),把一個consumer傳入produce後:
-
首先呼叫c.next()啟動生成器;
-
然後,一旦生產了東西,通過c.send(n)切換到consumer執行;
-
consumer通過yield拿到訊息,處理,又通過yield把結果傳回;
-
produce拿到consumer處理的結果,繼續生產下一條訊息;
-
produce決定不生產了,通過c.close()關閉consumer,整個過程結束。
整個流程無鎖,由一個執行緒執行,produce和consumer協作完成任務,所以稱為“協程”,而非執行緒的搶佔式多工。