1. 程式人生 > >一句話介紹python執行緒、程序和協程

一句話介紹python執行緒、程序和協程

一、程序:

Python的os模組封裝了常見的系統呼叫,其中就包括fork。而fork是linux常用的產生子程序的方法,簡言之是一個呼叫,兩個返回

在python中,以下的兩個模組用於程序的使用。詳細就不展開。

multiprocessing:跨平臺版本的多程序模組。

Pool:程序池

QueuePipes:程序通訊

 

二、執行緒:

嚴格意義上,python的多執行緒屬於偽多執行緒,因為受限於GIL,python的多執行緒每次只能執行一個,按流水線方式執行所有任務。

threading:高階建立執行緒模組

threading.Lock(): lock.acquire()獲取   lock.release()釋放

 

三、ThreadLocal

定義全域性變數,每個thread對他都有讀寫操作,但是該全域性變數的屬性值是每個thread的區域性變數,不同thread中的區域性變數不能互相修改。

 

計算密集型 vs. IO密集型

受限於GIL,python的多執行緒屬於偽執行緒,即是每個cpu一次只能執行一個執行緒。

計算密集型:多程序

IO密集型:多執行緒,比如爬蟲,時間多花費在io操作上

 

四、分散式程序

Python的multiprocessing模組不但支援多程序,其中managers子模組還支援把多程序分佈到多臺機器上。一個服務程序可以作為排程者,將任務分佈到其他多個程序中,依靠網路通訊。

服務程序負責啟動Queue,把Queue註冊到網路上,然後往Queue裡面寫入任務:

# taskmanager.py

import random, time, Queue
from multiprocessing.managers import BaseManager

# 傳送任務的佇列:
task_queue = Queue.Queue()
# 接收結果的佇列:
result_queue = Queue.Queue()

# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把兩個Queue都註冊到網路上, callable引數關聯了Queue物件:
QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 繫結埠5000, 設定驗證碼'abc': manager = QueueManager(address=('', 5000), authkey='abc') # 啟動Queue: manager.start() # 獲得通過網路訪問的Queue物件: task = manager.get_task_queue() result = manager.get_result_queue() # 放幾個任務進去: for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) # 從result佇列讀取結果: print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' % r) # 關閉: manager.shutdown()

本機上啟動或另一臺機子上啟動:

# taskworker.py

import time, sys, Queue
from multiprocessing.managers import BaseManager

# 建立類似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由於這個QueueManager只從網路上獲取Queue,所以註冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 連線到伺服器,也就是執行taskmanager.py的機器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 埠和驗證碼注意保持與taskmanager.py設定的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey='abc')
# 從網路連線:
m.connect()
# 獲取Queue的物件:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task佇列取任務,並把結果寫入result佇列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 處理結束:
print('worker exit.')

工作如圖:

 

注意Queue的作用是用來傳遞任務和接收結果,每個任務的描述資料量要儘量小。比如傳送一個處理日誌檔案的任務,就不要傳送幾百兆的日誌檔案本身,而是傳送日誌檔案存放的完整路徑,由Worker程序再去共享的磁碟上讀取檔案。

 

 

四、協程

協程看上去也是子程式,但執行過程中,在子程式內部可中斷,然後轉而執行別的子程式,在適當的時候再返回來接著執行。

第一最大的優勢就是協程極高的執行效率。

第二大優勢就是不需要多執行緒的鎖機制,因為只有一個執行緒,也不存在同時寫變數衝突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多執行緒高很多。

import time

def consumer():
    r = ''
    while True:
        n = yield r  ##
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        time.sleep(1)
        r = '200 OK'

def produce(c):
    c.next()  ##執行一次生成
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)  ##傳給consumer,轉進consumer的yield裡面
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

if __name__=='__main__':
    c = consumer()  ##生成器
    produce(c)
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

 

注意到consumer函式是一個generator(生成器),把一個consumer傳入produce後:

  1. 首先呼叫c.next()啟動生成器;

  2. 然後,一旦生產了東西,通過c.send(n)切換到consumer執行;

  3. consumer通過yield拿到訊息,處理,又通過yield把結果傳回;

  4. produce拿到consumer處理的結果,繼續生產下一條訊息;

  5. produce決定不生產了,通過c.close()關閉consumer,整個過程結束。

整個流程無鎖,由一個執行緒執行,produce和consumer協作完成任務,所以稱為“協程”,而非執行緒的搶佔式多工。