1. 程式人生 > >並發通信、生產者消費者模型

並發通信、生產者消費者模型

技術分享 strong 來講 ict ces nbsp 多進程之間通信 lease 聲明

多進程之間通信的限制

看一個例子:

import multiprocessing as mp
data=666
def func():
    global data
    data=222
p=mp.Process(target=func)
p.start()
p.join()
print(data)

>>>666

可以看到,聲明為global的data也沒有發生變化,輸出結果仍然是666,這正是多進程之間通信的限制,各個進程之間是相互獨立的,互不幹擾的內存空間。因此如果想要空想數據就必須開辟一段共享的內存空間。就要用到Manger對象。

技術分享圖片

Manger對象

我們常用的Manger對象空間有list(),dict(),Queue()三種,下面舉一個List()的簡單例子。

from multiprocessing import Process,Manager
mgr=Manager()            #創建服務器進程並返回通信的管理器
list_proxy=mgr.list()    #通過管理器在列表中開辟空間,並返回一個代理
print(list_proxy)
def func(list_ex):
    list_ex.append(a)
#把代理傳給子進程子進程就可以通過代理來訪問共享的內存空間了。 p
=Process(target=func,args=(list_proxy,)) p.start() p.join() print(list_proxy)
>>>[] [a]

線程間的共享與同步鎖

進程間如果不通過Manger對象是無法進行內存共享的,那麽對於線程呢?對於Python來講每一次只能執行一個線程,由於GIL鎖的存在。我們來看例子。

import threading
data=666
def func():
    global data
    data=222

t=threading.Thread(target=func)
t.start()
t.join()
print(data)


>>>222

我們看到結果輸出了222,也就是說全局對象更改了data的值,由此可見線程之間的內存是共享的。正是因為共享的便會出現資源競爭的問題,我們來看例子:

import threading
data=0
n=10000000     #這個n必須足夠大才能看出效果
def add(n):
    global data
    for i in range(n):
        data+=1

def sub(n):
    global data
    for i in range(n):
        data-=1

a=threading.Thread(target=add,args=(n,))
s=threading.Thread(target=sub,args=(n,))
a.start()
s.start()
a.join()
s.join()
print(data)


>>>-1561473

可以看到本來應該為0的值,在基數足夠大的時候就出現了問題,這就是由於線程之間的內存共享導致的,所以為了解決這一個問題就出現了同步鎖的概念,說白了就是加上鎖,然後控制資源的訪問權限這樣就會避免資源競爭的出現。看代碼。

import threading

lock=threading.Lock()    #生成一把鎖
data=0
n=10000000
def add(n):
    global data
    for i in range(n):
        lock.acquire()
        data+=1
        lock.release()

def sub(n):
    global data
    for i in range(n):
        lock.acquire()
        data-=1
        lock.release()

a=threading.Thread(target=add,args=(n,))
s=threading.Thread(target=sub,args=(n,))
a.start()
s.start()
a.join()
s.join()
print(data)


>>>0

這樣通過鎖來訪問就正確的得出結果了,但是要記住一點加鎖之後要記得釋放,或者通過with語法這樣會自動幫你釋放。

with lock:
    data-=1

線程與進程安全的隊列

隊列是一種常用的數據結構,原則是先進先出(FIFO)。

線程安全的隊列

主要方法包括:

  • 入隊:put(item)
  • 出隊:get()
  • 測試空:empty() #近似
  • 測試滿:full() #近似
  • 隊列長度:qsize() #近似
  • 任務結束:task_done()
  • 等待完成:join()

進程安全隊列

進程的隊列要用到之前提到的Manger對象,mgr.Queue()

主要方法包括:

  • 入隊:put(item)
  • 出隊:get()
  • 測試空:empty() #近似
  • 測試滿:full() #近似
  • 隊列長度:qsize() #近似

例子我們放到下面的生產者消費者模型中講解。

生產者消費者模型

何所謂生產者消費者模型?

技術分享圖片

就是說我們把進程之間的通信分開考慮,生產者只要往隊列裏面丟東西,消費者只要從隊列裏取東西,而二者不用考慮對方。

多線程實現

#生產者消費者模型
import queue
import threading
import random
import time

class Producer(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            #生成了一個數據
            data = random.randint(0, 99)
            self.queue.put(data)   #把數據丟進隊列中
            print(生產者: 生產了:, data)
            time.sleep(1)

class Concumer(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:
            item = self.queue.get() #從隊列中拿一個數據
            print(消費者: 從隊列中拿到:, item)


q = queue.Queue(5)  #創建一個隊列
producer = Producer(q)  #創建一個生產者
concumer = Concumer(q)  #創建一個消費者

producer.start()
concumer.start()


>>>生產者: 生產了: 46
消費者: 從隊列中拿到: 46
生產者: 生產了: 9
消費者: 從隊列中拿到: 9
生產者: 生產了: 39
消費者: 從隊列中拿到: 39
生產者: 生產了: 89
消費者: 從隊列中拿到: 89

多進程實現

import multiprocessing
import random
import time

class Producer(multiprocessing.Process):
    def __init__(self,queue):
        super().__init__()
        self.queue=queue
    def run(self):
        while True:
            data=random.randint(0,100)
            self.queue.put(data)
            print("生產者生產了數據{}".format(data))
            time.sleep(1)

class Consumer(multiprocessing.Process):
    def __init__(self,queue):
        super().__init__()
        self.queue=queue
    def run(self):
        while True:
            item=self.queue.get()
            print("消費者消費{}".format(item))
if __name__ == __main__:
    manger = multiprocessing.Manager()
    queue_m = manger.Queue()
    producer=Producer(queue_m)
    consumer=Consumer(queue_m)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()

>>>生產者生產了數據20
消費者消費20
生產者生產了數據62
消費者消費62
生產者生產了數據26
消費者消費26
生產者生產了數據36
消費者消費36
生產者生產了數據56
消費者消費56

並發通信、生產者消費者模型