1. 程式人生 > >10-多執行緒、多程序和執行緒池程式設計

10-多執行緒、多程序和執行緒池程式設計

一、多執行緒、多程序和執行緒池程式設計

1.1、Python中的GIL鎖

  CPython中,global interpreter lock(簡稱GIL)是一個互斥體,用於保護對Python物件的訪問,從而防止多個執行緒一次執行Python位元組碼(也就是說,GIL鎖每次只能允許一個執行緒工作,無法多個執行緒同時在CPU上工作)。鎖定是必要的,主要是因為CPython的記憶體管理不是執行緒安全的。(但是,由於存在GIL,因此其他功能已經變得越來越依賴於它所執行的保證。)CPython擴充套件必須支援GIL,以避免破壞執行緒。GIL之所以引起爭議,是因為它在某些情況下阻止多執行緒CPython程式充分利用多處理器系統。請注意,潛在的阻塞或長時間執行的操作(例如I / O,影象處理和NumPy數字運算)發生在GIL 之外。因此,只有在GIL內部花費大量時間來解釋CPython位元組碼的多執行緒程式中,GIL才成為瓶頸。但是即使不是瓶頸,GIL也會降低效能。總結這些:系統呼叫開銷很大,尤其是在多核硬體上。兩個執行緒呼叫一個函式所花的時間可能是單個執行緒兩次呼叫該函式所花時間的兩倍。GIL可以導致I / O繫結執行緒被排程在CPU繫結執行緒之前。並且它阻止了訊號的傳遞。

import threading

num = 0

def add():
    global num
    for i in range(1000000):
        num += 1
def desc():
    global num
    for i in range(1000000):
        num -= 1

thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()#執行執行緒1
thread2.start()

thread1.join() #讓執行緒1執行完畢才進行下一步
thread2.join()#讓執行緒2執行完畢才進行下一步
print(num) #-197054   列印的結果應該是0,但是每次列印的結果都不一樣,這說明執行緒沒有按照要求一個一個執行完畢才進行下一個
#GIL會根據執行的位元組碼行數以及時間片釋放GIL,GIL在遇到I/O的操作時候主動釋放GIL
# (也就是說,當執行緒1遇到I/O操作的時候,會釋放GIL切換到執行緒2執行)

1.2、多執行緒程式設計

  執行緒:是指程序內的一個執行單元,同時也是作業系統即CPU執行任務的最小單位。因為Cpython的直譯器上有一把全域性鎖即上面提到的GIL鎖,一個程序中同一時間只允許一個執行緒執行,遇到I/O阻塞的時候,快速切換到另一個執行緒,執行緒也可以理解就是每當遇到I/O操作的時候,就會切換,節約的時間就是I/O操作的時間。

1.2.1:通過Thread類例項化:

threading.currentThread(): 返回當前的執行緒變數。
threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。

Thread類提供了以下方法:

run(): 用以表示執行緒活動的方法。
start():啟動執行緒活動。
join([time]): 等待至執行緒中止。這阻塞呼叫執行緒直至執行緒的join() 方法被呼叫中止-正常退出或者丟擲未處理的異常-或者是可選的超時發生。
isAlive(): 返回執行緒是否活動的。
getName(): 返回執行緒名。
setName(): 設定執行緒名。

Thread類的例項化:

import time
import threading

def get_detail_html(url):
    print("get detail html started")
    time.sleep(2)
    print("get detail html end")


def get_detail_url(url):
    print("get detail url started")
    time.sleep(4)
    print("get detail url end")

if  __name__ == "__main__":

    thread1 = threading.Thread(target=get_detail_html,args=("www.baidu.com",))
    thread2 = threading.Thread(target=get_detail_url,args=("www.baidu.com",))
    start_time = time.time()
    thread1.start()
    thread2.start()
    print ("last time: {}".format(time.time()-start_time)) #0.00099945068359375

"""

get detail html started
get detail url startedlast time: 0.00099945068359375


get detail html end
get detail url end

"""

  為什麼上面的時間接近0秒,因為現在這個程式總共有三個執行緒,那三個執行緒呢?執行緒1、2以及主執行緒,按照main下面執行,就會發現執行緒1、2執行之後繼續執行下面的print語句。

  有沒有就是當主執行緒執行完成之後就終止所有執行緒的呢?建立守護執行緒(setDaemon)這樣主程式結束就會kill子執行緒。

if  __name__ == "__main__":

    thread1 = threading.Thread(target=get_detail_html,args=("www.baidu.com",))
    thread2 = threading.Thread(target=get_detail_url,args=("www.baidu.com",))
    thread1.setDaemon(True)  # 是否守護執行緒
    thread2.setDaemon(True)
    start_time = time.time()
    thread1.start()
    thread2.start()
    
    print ("last time: {}".format(time.time()-start_time)) 
"""
setDaemon()
引數一個布林值,指示此執行緒是否是守護執行緒(真)(假)。必須在start()呼叫之前設定此引數,
否則RuntimeError引發該引數。它的初始值是從建立執行緒繼承的;主執行緒不是守護程式執行緒,
因此在主執行緒中建立的所有執行緒預設為daemonic = False。 當沒有活動的非守護執行緒時,整個Python程式將退出。 """

當所有執行緒執行結束後,主執行緒才結束:

if  __name__ == "__main__":

    thread1 = threading.Thread(target=get_detail_html,args=("www.baidu.com",))
    thread2 = threading.Thread(target=get_detail_url,args=("www.baidu.com",))
    start_time = time.time()
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

    print ("last time: {}".format(time.time()-start_time))

1.2.2:通過繼承Thread來實現多執行緒:(繼承之後重寫run方法,邏輯在run中進行)

import time
import threading


class GetDetailHtml(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")


class GetDetailUrl(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print("get detail url started")
        time.sleep(4)
        print("get detail url end")

if  __name__ == "__main__":
    thread1 = GetDetailHtml("get_detail_html")
    thread2 = GetDetailUrl("get_detail_url")
    start_time = time.time()
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print ("last time: {}".format(time.time()-start_time))

1.3、執行緒間通訊-共享變數和Queue

  執行緒間的通訊方式第一種就是共享變數,共享變數就像上面第一個例子那樣共享一個全域性變數,但這種共享變數有缺點,這是執行緒不安全的狀態。結果與預期值不一樣。那麼有沒有一種執行緒安全的方式呢?當然有,那就是queue--同步佇列類,Python 的 Queue 模組中提供了同步的、執行緒安全的佇列類,包括FIFO(先入先出)佇列Queue,LIFO(後入先出)佇列LifoQueue,和優先順序佇列 PriorityQueue。該queue模組實現了多生產者,多消費者佇列(生產者消費者模型)。當必須在多個執行緒之間安全地交換資訊時,它線上程程式設計中特別有用。這些佇列都實現了鎖原語,能夠在多執行緒中直接使用,可以使用佇列來實現執行緒間的同步。

   在這裡就只設計到queue.Queue(maxsize = 0 )maxsize是一個整數,用於設定可以放入佇列中的專案數的上限。一旦達到此大小,插入將被阻塞,直到消耗佇列專案為止。如果maxsize小於或等於零,則佇列大小為無限。其他另外兩個佇列(後入先出)LifoQueue、(優先順序佇列)PriorityQueue根據需求來選擇佇列。

Queue.qsize() #返回佇列的大小
Queue.empty() #如果佇列為空,返回True,反之False
Queue.full() #如果佇列滿了,返回True,反之False
Queue.full #與 maxsize 大小對應
Queue.get([block[, timeout]]) #獲取佇列,timeout等待時間
Queue.get_nowait() #相當Queue.get(False)
Queue.put(item) #寫入佇列,timeout等待時間
Queue.put_nowait(item) #相當Queue.put(item, False)
Queue.task_done() #在完成一項工作之後,Queue.task_done()函式向任務已經完成的佇列傳送一個訊號
Queue.join() #實際上意味著等到佇列阻塞執行完畢為空,再執行別的操作
#通過queue的方式進行執行緒間同步
from queue import Queue
import time
import threading


def get_detail_html(queue):
    while True:
    #文章詳情頁

        url = queue.get()
        print("get detail html started")
        time.sleep(2)
        print("get url:{url}".format(url=url))


def get_detail_url(queue):
    # 文章列表頁
    print("get detail url started")
    time.sleep(1)
    for i in range(20):
        queue.put("http://cnblogs.com/lishuntao/{id}".format(id=i))
        queue.task_done()#需要配合queue.join()使用
    print("get detail url end")


if  __name__ == "__main__":
    queue = Queue(maxsize=1000)# maxsize是一個整數,用於設定可以放入佇列中的專案數的上限。一旦達到此大小,插入將被阻塞,直到消耗佇列專案為止。

    thread_detail_url = threading.Thread(target=get_detail_url,args=(queue,))#將例項化的Queue作為引數傳入
    thread_detail_url.start()
    for i in range(5):  #五個執行緒共用資料
        html_thread = threading.Thread(target=get_detail_html,args=(queue,))
        html_thread.start()
    start_time = time.time()
    queue.join()#阻塞等待佇列中任務全部處理完畢,需要配合queue.task_done使用
    print("last time: {}".format(time.time()-start_time))

1.4、執行緒同步-Lock、RLock

  什麼叫執行緒同步?執行緒同步就是一個執行緒執行完成之後在進入下一個執行緒。上面第一個例子為啥num值不等於零?按理來說,先加100萬次,再減去100萬次,最後的結果是0。不等於0的原因是,將程式碼編譯成位元組碼,前面load值以及運算值都沒有出現問題,因為GIL鎖會在I/O操作釋放切換到其他執行緒,或者在特定的執行位元組碼行數的時候進行切換,然而上一個函式位元組碼的num值,很有可能兩個執行緒會被共用值,賦值給desc函式的num值,因此會出此這樣的情況,每次值都會不一樣。那怎麼解決這種情況呢?為了保證資料的正確性,需要對多個執行緒進行同步。使用 Thread 物件的 Lock 和 Rlock 可以實現簡單的執行緒同步,這兩個物件都有 acquire 方法和 release 方法,對於那些需要每次只允許一個執行緒操作的資料,可以將其操作放到 acquire 和 release 方法之間。但是用鎖也會有缺點:1、用鎖會影響效能。2、用鎖會造成死鎖(死鎖迴圈以及兩次acquire鎖,沒有釋放鎖)

from threading import Lock
import threading

total = 0
lock = Lock()

def add():
    global lock
    global total
    for i in range(1000000):
        lock.acquire()
        total += 1
        lock.release()

def desc():
    global total
    global lock
    for i in range(1000000):
        lock.acquire()
        total -= 1
        lock.release()

thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total) #0
from threading import RLock
import threading

total = 0
lock = RLock() #在同一個執行緒裡面,Rlock可以連續的呼叫acquire多次。一定要注意acquire的次數要和release的次數相等
def add():
    global lock
    global total
    for i in range(1000000):
        lock.acquire()
        lock.acquire()
        total += 1
        lock.release()
        lock.release()

def desc():
    global total
    global lock
    for i in range(1000000):
        lock.acquire()
        total -= 1
        lock.release()

thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)

#1. 用鎖會影響效能
#2. 鎖會引起死鎖
#死鎖的情況 1、A(a,b)  2、就是兩次acquire ,第一次acquire沒有將鎖釋放,第二次就不能獲取到鎖
"""
A(a、b)         當A先獲取a,然後B獲取到b,A就不能獲取到b,B不能獲取到A
acquire (a)     就這樣進入到死迴圈即死鎖的一種。
acquire (b)

B(a、b)
acquire (b)
acquire (a)
"""

1.5、執行緒同步-Condition使用

  Condition(條件變數)裡面有enter和exit兩個魔法方法,因此可以利用with語句實現。with語句相當於就是一次獲取鎖以及釋放鎖的過程。條件變數總是與某種鎖定相關聯。可以傳入,也可以預設建立一個。(當多個條件變數必須共享相同的鎖時,傳遞一個輸入很有用。)條件變數具有acquire()release() 方法,它們呼叫關聯鎖的相應方法。它還具有一個wait()方法以及notify()和 notifyAll()方法。只有在呼叫執行緒獲得了鎖之後(with self.condition),才必須呼叫這三個物件。

wait() #方法釋放鎖,然後塊直到其被喚醒一個notify()或notifyAll的()呼叫在另一個執行緒相同的條件變數。喚醒後,它將重新獲取鎖並返回。也可以指定超時。

notify() #方法喚醒等待條件變數的執行緒中的一個,如果有的話正在等待。所述notifyAll的() 方法喚醒等待條件變數的所有執行緒。

注意:notify()notifyAll()方法不會釋放鎖;

  一個執行緒將獲得獨佔資源的鎖去訪問共享資源,通過生產者/消費者可以更好的描述這種方式,生產者新增一個隨機數字到公共列表,而消費者將這個數字在公共列表中清除。看一下生產者類,生產者獲得一個鎖,新增一個數字,然後通知消費者執行緒有一些東西可以來清除,最後釋放鎖定。在各項動作不段切換的時候,將會觸發不定期的隨機暫停。

import threading
#通過condition完成協同讀詩


class XiaoAi(threading.Thread):
    def __init__(self, cond):
        super().__init__(name="小愛")
        self.cond = cond

    def run(self):
        self.cond.acquire()#與with self.cond:語句一樣的效果__enter__,:(self.cond.acquire()) __exit__:(self.cond.release())
        self.cond.wait()
        print("{} : 在 ".format(self.name))
        self.cond.notify()

        self.cond.wait()
        print("{} : 好啊 ".format(self.name))
        self.cond.notify()

        self.cond.wait()
        print("{} : 君住長江尾 ".format(self.name))
        self.cond.notify()

        self.cond.wait()
        print("{} : 共飲長江水 ".format(self.name))
        self.cond.notify()

        self.cond.wait()
        print("{} : 此恨何時已 ".format(self.name))
        self.cond.notify()

        self.cond.wait()
        print("{} : 定不負相思意 ".format(self.name))
        self.cond.notify()
        self.cond.release()


class TianMao(threading.Thread):
    def __init__(self, cond):
        super().__init__(name="天貓精靈")
        self.cond = cond

    def run(self):
        with self.cond:
            print("{} : 小愛同學 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 我們來對古詩吧 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 我住長江頭 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 日日思君不見君 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 此水幾時休 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 只願君心似我心 ".format(self.name))
            self.cond.notify()
            self.cond.wait()


if __name__ == "__main__":
    condition = threading.Condition()
    xiaoai = XiaoAi(condition)
    tianmao = TianMao(condition)
    #在呼叫with condition之後才能呼叫wait或者notify方法
    #condition有兩層鎖, 一把底層鎖會線上程呼叫了wait方法的時候釋放,
    # 上面的鎖會在每次呼叫wait的時候分配一把並放入到condition的等待佇列中,等到notify方法的喚醒
   #啟動順序很重要
   xiaoai.start()
   tianmao.start()

1.6、執行緒同步-Semaphore使用

  訊號量管理一個內部計數器,該計數器由每個acquire()呼叫遞減, 並由每個release() 呼叫遞增。計數器永遠不能低於零。當acquire() 發現它為零時,它將阻塞,等待其他執行緒呼叫release()。即可以利用它來控制爬蟲每次的請求次數,一次請求過多,會被禁止,為了反反爬就可以設定執行緒請求的併發數。

#Semaphore 是用於控制進入數量的鎖
#檔案, 讀、寫, 寫一般只是用於一個執行緒寫,讀可以允許有多個

import threading
import time

class HtmlSpider(threading.Thread):
    def __init__(self, url, sem):
        super().__init__()
        self.url = url
        self.sem = sem

    def run(self):
        time.sleep(2)
        print("got html text success")
        self.sem.release()#釋放一個訊號量,使內部計數器增加一。當它在進入時為零
        # 並且另一個執行緒正在等待它再次變得大於零時,喚醒該執行緒。

class UrlProducer(threading.Thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem

    def run(self):
        for i in range(20):
            self.sem.acquire() #不帶引數呼叫時:如果內部計數器在輸入時大於零,則將其遞減1並立即返回。
            # 如果在進入時為零,則阻塞,等待其他執行緒呼叫 release使之大於零。(要互鎖需要的程式碼塊)
            html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
            html_thread.start()

if __name__ == "__main__":
    sem = threading.Semaphore(3) #設定每次執行數是3
    url_producer = UrlProducer(sem)
    url_producer.start()

1.7、ThreadPoolExecutor執行緒池

  為什麼要使用ThreadPoolExecutor? ThreadPoolExecutor提供了一個簡單的抽象,圍繞多個執行緒並使用這些執行緒以併發方式執行任務。在正確的上下文中使用執行緒時,向您的應用程式新增執行緒可以幫助極大地提高應用程式的速度。通過使用多個執行緒,我們可以加快面對基於I/O操作型的應用程式,網路爬蟲就是一個很好的例子。Web爬取網頁程式通常會執行很多繁重的基於I / O的任務,例如獲取和解析網站,如果我們要以同步方式獲取每個頁面,您會發現程式的主要瓶頸就是從網際網路上獲取這些頁面 。通過使用諸如ThreadPoolExecutor之類的東西,我們可以通過同時執行多個讀取並在返回每個頁面時對其進行處理來有效地緩解此瓶頸。

建立ThreadPoolExecutor例項:

import time
from concurrent.futures import ThreadPoolExecutor

#未來物件,task的返回容器
#主執行緒中可以獲取某一個執行緒的狀態或者某一個任務的狀態,以及返回值
#當一個執行緒完成的時候我們主執行緒能立即知道
#futures可以讓多執行緒和多程序編碼介面一致

def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times


pool = ThreadPoolExecutor(max_workers=2)  #max_workers 最大同時併發數,預設是作業系統的核的數量
#通過submit函式提交執行的函式到執行緒池中, submit 是立即返回
task1 = pool.submit(get_html, (3))
task2 = pool.submit(get_html, (2))

#要獲取已經成功的task的返回
#done方法用於判定某個任務是否完成,完成返回True,沒有完成返回False
print(task1.done())   #False
print(task2.cancel()) #False
time.sleep(4)
print(task1.done())  #True
#result方法可以獲取task的執行結果(即函式的返回結果)
print(task1.result())

上下文管理器例項化:

import time
from concurrent.futures import ThreadPoolExecutor

def task(n):
    time.sleep(3)
    print("Processing {}".format(n))

def main():
    print("Starting ThreadPoolExecutor")
    #上下文管理器例項化ThreadPoolExecutor(執行緒池)物件
    with ThreadPoolExecutor(max_workers=3) as executor:
        for i in range(4):
            future = executor.submit(task,(i))
    print("All tasks complete")

if __name__ == '__main__':
    main()
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import time

def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
#要獲取已經成功的task的返回
urls = [3,2,4]
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=FIRST_COMPLETED) #當第一個執行緒完成的時候,繼續執行主執行緒
print("main")

更多方法呼叫詳情在官網https://www.python.org/dev/peps/pep-3148/

1.8、多執行緒和多程序對比

  多執行緒與多程序的對比,當遇到I/O操作的時候(例如爬蟲,讀檔案等),多執行緒的速度優於多程序。當遇到計算密集型操作的時候(耗費CPU的操作,例如計算),多程序優於多執行緒。

import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor

#1、計算密集型(運用多程序)
def fib(n):
    if n<=2:
        return 1
    return fib(n-1)+fib(n-2)

if __name__ == "__main__":
    with ProcessPoolExecutor(3) as executor:
        all_task = [executor.submit(fib, (num)) for num in range(25,40)]
        start_time = time.time()
        for future in as_completed(all_task):#返回一個迭代器,跟map()不同,這個迭代器的迭代順序依照all_task返回(執行緒結束)的順序。
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))

#2. 對於io操作來說,多執行緒優於多程序
def random_sleep(n):
    time.sleep(n)
    return n

if __name__ == "__main__":
    with ThreadPoolExecutor(3) as executor:
        all_task = [executor.submit(random_sleep, (num)) for num in [2]*30]
        start_time = time.time()
        for future in as_completed(all_task):
            data = future.result()
            print("exe result: {}".format(data))

        print("last time is: {}".format(time.time()-start_time))

    """
    executor.map(func, list)
    第一個引數為只接受一個引數的函式,後一個為可迭代物件。
    這個map方法會把對函式的呼叫對映到到多個執行緒中。並返回一個future的迭代器。
    """

1.9、multiprocessing多程序程式設計

import multiprocessing
import time


def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n


if __name__ == "__main__":
    progress = multiprocessing.Process(target=get_html, args=(2,))
    print(progress.pid) #None
    progress.start()
    print(progress.pid) #10940
    progress.join()
    print("main progress end")

多程序程式設計:

import multiprocessing

#多程序程式設計
import time
def get_html(n):
    time.sleep(n)
    print("sub_progress success")
    return n


if __name__ == "__main__":
    #使用程序池
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    result = pool.apply_async(get_html, args=(3,)) #不用等待當前程序執行完畢,隨時根據系統排程來進行程序切換。

    #等待所有任務完成
    pool.close() #告訴主程序,你等著所有子程序執行完畢後在執行剩餘部分。
    pool.join()  #close必須在join前呼叫

    print(result.get()) #3

imap/imap_unordered:

if __name__ == "__main__":
    #使用執行緒池
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    for result in pool.imap(get_html, [1,5,3]): #按照對映的順序輸出
        print("{} sleep success".format(result))

    # for result in pool.imap_unordered(get_html, [1,5,3]): #誰先執行完成就執行誰
    #     print("{} sleep success".format(result))

2.0、程序間通訊-Queue、Pipe,Manager(共享記憶體)

  多程序之間的通訊,不能運用多執行緒提供的queue.Queue。為了解決這個,多程序自己提供了一個multiprocessing.Queue。程序的用法和執行緒用法類似。多程序之間是不能共享全域性變數的,然而多執行緒是可以共享全域性變數的。多程序的資料是完全隔離的,當在linux/unix中fork資料的時候,在程序中的變數完全複製一份,複製到子程序中,這樣兩邊的資料是互不影響的。還有multiprocessing中的queue不能用於pool程序池的。那麼誰能用於程序池中呢?multiprocessing.Manager.Queue提供了可以用於程序池間的通訊。

mutiprocessing.Queue(程序間的通訊Queue):

import time
from multiprocessing import Process, Queue


def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == "__main__":
    queue = Queue(10)
    my_producer = Process(target=producer, args=(queue,))
    my_consumer = Process(target=consumer, args=(queue,))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

multiprocessing.Manager.Queue(程序池間通訊Manager):

import time
from multiprocessing import Pool, Manager


#multiprocessing中的queue不能用於pool程序池
#pool中的程序間通訊需要使用manager中的queue

def producer(queue):
    queue.put("a")
    time.sleep(2)

def consumer(queue):
    time.sleep(2)
    data = queue.get()
    print(data)

if __name__ == "__main__":
    queue = Manager().Queue(10)
    pool = Pool(2)
    
    pool.apply_async(producer, args=(queue,))
    pool.apply_async(consumer, args=(queue,))
    #等待完成
    pool.close()
    pool.join()

Pipe(只能適用於兩個程序之間的通訊):pipe效能高於queue

from multiprocessing import Process, Pipe


#通過pipe實現程序間通訊
#pipe的效能高於queue

def producer(pipe):
    pipe.send("lishuntao") #pipe傳送資料

def consumer(pipe):
    print(pipe.recv())  #pipe接收資料

if __name__ == "__main__":
    recevie_pipe, send_pipe = Pipe() #例項化的時候是兩個引數
#pipe只能適用於兩個程序 my_producer = Process(target=producer, args=(send_pipe, )) my_consumer = Process(target=consumer, args=(recevie_pipe,)) my_producer.start() my_consumer.start() my_producer.join() my_consumer.join()

程序間通訊其他方式(程序間共享記憶體):

from multiprocessing import Process, Manager


def add_data(p_dict, key, value):
    p_dict[key] = value

if __name__ == "__main__":
    progress_dict = Manager().dict()

    first_progress = Process(target=add_data, args=(progress_dict, "lishuntao", 22))
    second_progress = Process(target=add_data, args=(progress_dict, "lishun", 18))

    first_progress.start()
    second_progress.start()
    first_progress.join()
    second_progress.join()

    print(progress_dict) #{'lishun': 18, 'lishuntao': 22}