1. 程式人生 > >併發程式設計(五)

併發程式設計(五)

前言

上篇部落格的內容是守護程序,對於作業系統來說可以在後臺執行一些程式.這篇的內容是互斥鎖,在上上篇部落格上說到程序記憶體空間互相隔離,所以可以通過共享檔案來操作同一個檔案,那麼這樣操作的話會發生什麼呢?

互斥鎖

多個程序需要共享資料時,先將其鎖定,此時資源狀態為'鎖定',其他程序不能更改;知道該程序釋放資源,將資源的狀態變成非'鎖定',其他的執行緒才能再次鎖定該資源.互斥鎖保證了每次只有一個程序進入寫入操作,從而保證了多程序情況下資料的正確性.

我們使用一個demo 來模擬多個程序操作同一個檔案:

import json
import time,random
from multiprocessing import Process

def show_tickets(name):
    time.sleep(random.randint(1,3))
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        data = json.load(f)
        print('%s 檢視 剩餘票數: %s' % (name, data['count']))

def buy_ticket(name):
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        dic = json.load(f)

        if dic['count'] > 0:
            dic['count'] -= 1

            time.sleep(random.randint(1,3))

            with open('ticket.json', 'wt', encoding='utf-8') as f:
                json.dump(dic, f)
                print('%s: 購票成功' % name)

def task(name):
    show_tickets(name)
    buy_ticket(name)

if __name__ == '__main__':
    for i in range(1,11):
        p = Process(target=task, args=(i,))
        p.start()

執行結果:

在 ticket.json 裡面只有一張票,結果卻造成多個使用者購買成功,這很顯然是不符合實際情況的.
那麼怎麼解決呢?如果多個程序對同一個檔案進行讀操作可以不進行限制,但是對同一個檔案進行寫操作就必要要進行限制,不可以同時多個人對同一個檔案進行寫操作.python 在多程序模組裡提供一個類, Lock 類,當程序獲取到鎖的時候其他的程序就必須要等待鎖釋放才可以進行爭搶,在這個例子裡面就可以加上一把鎖來保護資料安全.

from multiprocessing import Process, Lock
import json,time,random


def show_tickets(name):
    time.sleep(random.randint(1,3))
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        data = json.load(f)
        print('%s 檢視 剩餘票數: %s' % (name, data['count']))

def buy_ticket(name):
    time.sleep(random.randint(1,3))
    with open('ticket.json', 'rt', encoding='utf-8') as f:
        dic = json.load(f)

        if dic['count'] > 0:
            dic['count'] -= 1

            time.sleep(random.randint(1,3))

            with open('ticket.json', 'wt', encoding='utf-8') as f:
                json.dump(dic, f)
                print('%s: 購票成功' % name)

def task(name,lock):
    show_tickets(name)
    lock.acquire()
    buy_ticket(name)
    lock.release()

if __name__ == '__main__':
    mutex = Lock()
    for i in range(1,11):
        p = Process(target=task, args=(i,mutex))
        p.start()

執行結果:

這樣加了鎖(互斥鎖)就可以解決同時操作同一個檔案造成的資料混亂問題了.

當使用多程序開發時,如果多個程序同時讀寫同一個資源,可能會造成資料的混亂,為了防止發生問題,使用鎖,或者使用 Process 的方法 join 將並行變為序列.

join 和鎖的區別

  1. join 人為控制程序的執行順序
  2. join 把整個程序全部序列,而鎖可以指定部分程式碼序列

一旦序列,效率就會降低,一旦並行,資料就可能會出錯.

程序間通訊

程序間通訊( internal-process communication),我們在開啟子程序是希望子程序幫助完成任務,很多情況下需要將資料返回給父程序,然而程序間記憶體是物理隔離的.

解決辦法:

  1. 將共享資料放到檔案中
  2. 管道 多程序模組中的一個類,需要有父子關係
  3. 共享一快記憶體區域 需要作業系統分配

管道通訊

Pipe類返回一個由管道連線的連線物件,預設情況下為雙工:

from multiprocessing import Process,Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()
    
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()

執行結果:

[42, None, 'hello']

例項化 Pipe 類會返回兩個連線物件表示管道的兩端.每個連線物件都有 send() 和 recv() 方法(及其他).請注意,如果兩個程序同時嘗試讀寫管道的同一端,則管道中的資料可能會損壞.當然,同時使用管道的不同端部的過程不存在損壞的風險.

共享記憶體通訊

Queue 通訊

Queue類會生成一個先進先出的容器,通過往佇列中存取資料而進行程序間通訊.

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])
    
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join()

執行結果:

[42, None, 'hello']

佇列其他特性

# 阻塞操作 必須掌握
q = Queue(3)
# # 存入資料
q.put("hello",block=False)
q.put(["1","2","3"],block=False)
q.put(1,block=False)
# 當容量滿的時候 再執行put 預設會阻塞直到執行力了get為止
# 如果修改block=False 直接報錯 因為沒地方放了
# q.put({},block=False)

# # # 取出資料
print(q.get(block=False))
print(q.get(block=False))
print(q.get(block=False))
# 對於get   當佇列中中沒有資料時預設是阻塞的  直達執行了put
# 如果修改block=False 直接報錯 因為沒資料可取了
print(q.get(block=False))



# 瞭解
q = Queue(3)
q.put("q",timeout=3)
q.put("q2",timeout=3)
q.put("q3",timeout=3)
# 如果滿了 願意等3秒  如果3秒後還存不進去 就炸
# q.put("q4",timeout=3)

print(q.get(timeout=3))
print(q.get(timeout=3))
print(q.get(timeout=3))
# 如果沒了 願意等3秒  如果3秒後還取不到資料 就炸
print(q.get(timeout=3))

Manager 通訊

demo

from multiprocessing import  Process,Manager
import time

def task(dic):
    print("子程序xxxxx")
    # li[0] = 1
    # print(li[0])
    dic["name"] = "xx"

if __name__ == '__main__':
    m = Manager()
    # li = m.list([100])
    dic = m.dict({})
    # 開啟子程序
    p = Process(target=task,args=(dic,))
    p.start()
    time.sleep(3)

可以建立一片共享記憶體區域用來存取資料.

生產者消費者模型

什麼是生產者消費者模型

在軟體開發過程中,經常碰到這樣的場景:

某些模組負責生產資料,這些資料由其他模組來負責處理(此處的模組可能是:函式,執行緒,程序等).生產資料的模組稱為生產者,而處理資料的模組稱為消費者.在生產者與消費者之間的緩衝區稱之為倉庫.生產者負責往倉庫運輸商品,而消費者負責從倉庫裡取出商品,這就構成了生產者消費者模型.

結構圖如下:

為了便於理解,我們舉一個寄信的例子。假設你要寄一封信,大致過程如下:

  1. 你把信寫好——相當於生產者生產資料;
  2. 你把信放入郵箱——相當於生產者把資料放入緩衝區;
  3. 郵遞員把信從郵箱取出,做相應處理——相當於消費者把資料取出緩衝區,處理資料.

生產者消費者模型的優點

  • 解耦

假設生產者和消費者分別是兩個執行緒.如果讓生產者直接呼叫消費者的某個方法,那麼生產者對於消費者就會產生依賴(耦合).如果未來消費者的程式碼發生改變,可能會影響到生產者的程式碼.而如果兩者都依賴於某個緩衝區,兩者之間不直接依賴,耦合也就相應降低了.

舉個例子,我們去郵局投遞信件,如果不使用郵箱(也就是緩衝區,你必須得把信直接交給郵遞員.有同學會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須 得認識誰是郵遞員,才能把信給他.這就產生了你和郵遞員之間的依賴(相當於生產者和消費者的強耦合).萬一哪天郵遞員換人了,你還要重新認識一下(相當於消費者變化導致修改生產者程式碼).而郵箱相對來說比較固定,你依賴它的成本就比較低(相當於和緩衝區之間的弱耦合).

  • 併發

由於生產者與消費者是兩個獨立的併發體,它們之間是使用緩衝區通訊的,生產者只需要往緩衝區裡丟資料,就可以接著生產下一個資料了,而消費者只需要從緩衝區拿資料即可,這樣就不會因為彼此的處理速度而發生阻塞.

繼續上面的例子,如果沒有郵箱,就得在郵局等郵遞員,知道他回來,把信交給他,這期間我們什麼事都幹不了(生產者阻塞).或者郵遞員挨家挨戶問,誰要寄信(消費者阻塞).

  • 支援忙閒不均

當生產者製造資料快的時候,消費者來不及處理,為處理的資料可以暫時存在緩衝區中,慢慢處理,而不至於因為消費者的效能過慢造成資料丟失或影響生產者生產資料.

再拿寄信的例子,假設郵遞員一次只能帶走1000封信,萬一碰上情人節或者其他的緊急任務,需要寄出的信超過了1000封,這個時候郵箱作為緩衝區就派上用場了.郵遞員把來不及帶走的信暫存在郵箱中,等下次過來時在拿走.

使用

from multiprocessing import Process, Queue
import time, random

def producer(name, food, q):
    for i in range(10):
        res = '%s %s' % (food, i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print('%s 生產了 %s' % (name, res))
        
def consumer(name, q):
    while True:
        res = q.get()
        time.sleep(random.randint(1,3))
        print('%s 消費了 %s' % (name, res))

if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer, args=('musibii', '