1. 程式人生 > >網絡編程——進程同步

網絡編程——進程同步

通信 發送 行修改 信號量 targe 判斷 error main lock

鎖——multiprocess.Lock:

  加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行的修改,會犧牲了速度卻保證了數據安全。

  雖然可以用文件共享數據實現進程間通信,但問題是:

  1,效率低。2,需要自己加鎖處理。

multiprocess模塊為我們提供的基於消息的IPC通信機制:隊列和管道。

  隊列和管道都是將數據存放於內存中:

  隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,我們應該避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可擴展性。

import
os import time import random from multiprocessing import Lock from multiprocessing import Process def work(n,lock): lock.acquire() print(%s:%s is running % (n,os.getpid())) time.sleep(random.random()) print(%s:%s is done % (n,os.getpid())) lock.release() if __name__ == __main__
: lock = Lock() for i in range(10): p = Process(target=work,args=(i,lock)) p.start() # 同步控制 # 只要用到了鎖,鎖之內的代碼就會變成同步的了 # 鎖:控制一段代碼,同一時間 只能被一個進程執行
技術分享圖片
import json
import time
import random
from multiprocessing import Lock
from multiprocessing import Process

def check_ticket(i):
    with open(
ticket) as f: ticket_count = json.load(f) # 通過json獲取文件中的信息 print(person%s查詢當前余票:% i, ticket_count[count]) def buy_ticket(i,lock): check_ticket(i) # 先進行查票操作 lock.acquire() # 得到鑰匙,進入程序 with open(ticket) as f: ticket_count = json.load(f) # 這一步是為了再次判斷是否還有余票 time.sleep(random.random()) if ticket_count[count]>0: print(person%s購票成功% i) ticket_count -= 1 # 字典的賦值 else: print(余票不足,person%s購票失敗% i) time.sleep(random.random()) with open(ticket,w)as f: json.dump(ticket_count,f) # 通過json.dump將字典轉化成字符串形式,然後寫入文件。 lock.release() # 歸還鑰匙 if __name__ == __main__: lock = Lock() for i in range(10): Process(target=buy_ticket,args = (i,lock)).start()
模擬搶票程序

信號量——multiprocess.Semaphore(了解)

技術分享圖片
互斥鎖同時只允許一個線程更愛數據,而信號量Semaphore是同時允許一定數量的線程更改數據。
信號量同步基於內部計數器,每調用一次acquire(),計數器減1,每調用一次release(),計數器加1,當計數器為0時,acquire()調用被阻塞,這是迪科斯徹信號量概念p()和v()的python實現,信號量同步機制適用於訪問像服務器這樣的有限資源。
信號量與進程池的概念很像,但是要區分開,信號量涉及到枷鎖的概念。
信號量介紹 技術分享圖片
import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore

def ktv(i,sema):
    sema.acquire()
    print(person%s 進來唱歌了% i)
    time.sleep(random.randint(1,5))
    print(person%s 從ktv出去了% i)
    sema.release()
if __name__ == __main__:
    sema = Semaphore(3)
    for i in range(5):
        Process(target=ktv,args=(i,sema)).start()

# Semaphore  就是鎖+計數器
# acquire()     計數器-1

# release()     計數器+1

# 當計數器為0,acquire()就會阻塞
實例

事件——multiprocess.Event (了解)

技術分享圖片
python線程的時間用於主線程控制其他線程的執行,事件主要提供了三個方法:set(),wait(),clear().


事件處理的機制,全局定義了一個flag,如果flag值為False,那麽當程序執行,event.wait()方法時就會阻塞,如果flag值為True,那麽event.wait 方法時便不再阻塞。

clear:將flag設置為False.
set:將flag設置為True.
Event介紹 技術分享圖片
import time
import json
import random
from multiprocessing import Event
from multiprocessing import Process

def car(i,e):
    if not e.is_set():
        print(car%s正在等待% i)
    e.wait()
    print(car%s正在通過% i)

def traffic_light(e):
    print(\033[1;31m紅燈亮了\033[0m)
    time.sleep(2)
    while True:
        if not e.is_set():
            print(\033[1;32m綠燈亮了\033[0m)
            e.set()
        elif e.is_set():
            print(\033[1;31m紅燈亮了\033[0m)
            e.clear()
        time.sleep(2)

if __name__ == __main__:
    e = Event()
    Process(target=traffic_light,args=(e,)).start()
    for i in range(10):
        time.sleep(random.randrange(1,5,3))
        Process(target=car,args=(i,e)).start()
紅綠燈實例

進程之間的通信——隊列和管道:        (multiprocess.Queue,multiprocess.Pipe)

  進程間的通信:IPC(Inter-Process Communication)

隊列:創建共享的進程隊列,Queue時多進程安全的隊列,可以使用Queue實現多進程之間的數據傳遞。

Queue([maxsize])

創建共享的進程隊列。

參數:maxsize是隊列中允許的最大項數,如果省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。
技術分享圖片
Queue的實例q具有以下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。如果q為空,此方法將阻塞,直到隊列中有項目可用為止。block用於控制阻塞行為,默認為True. 如果設置為False,將引發Queue.Empty異常(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。如果在制定的時間間隔內沒有項目變為可用,將引發Queue.Empty異常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。如果隊列已滿,此方法將阻塞至有空間可用為止。block控制阻塞行為,默認為True。如果設置為False,將引發Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引發Queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,因為在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引發NotImplementedError異常。


q.empty() 
如果調用此方法時 q為空,返回True。如果其他進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
如果q已滿,返回為True. 由於線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。
Queue的方法介紹 技術分享圖片
q.close() 
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。

q.cancel_join_thread() 
不會再進程退出時自動連接後臺線程。這可以防止join_thread()方法阻塞。

q.join_thread() 
連接隊列的後臺線程。此方法用於在調用q.close()方法後,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread()方法可以禁止這種行為。
Queue的其他方法 技術分享圖片
from multiprocessing import Queue
q = Queue(3)    # 限定隊列中只能存在3項數據。

q.put(1)
q.put(2)
q.put(3)
# q.put(4)
# 如果隊列已經滿了,程序就會停在這裏,等待之前的數據被取走,再將數據放入q隊列中
# 如果數據沒有被取走,那麽程序將會一直停在這裏。
try:
    q.put_nowait(3) # 使用put_nowait()方法,如果隊列滿了不會阻塞而是會報錯。
except:
    print(隊列已經滿了)     # 報錯則會打印

print(q.full())     # 判斷隊列是否滿了,返回bool值

print(q.get())
print(q.full())
print(q.get())
print(q.get())
# print(q.get())        同放入一樣,如果隊列已經空了,繼續取值,就會出現阻塞。
try:
    print(q.get_nowait())   # 用get_nowait()方法取值,如果隊列為空不會阻塞則會報錯
except:
    print(隊列已經空了)
    
print(q.empty())    # 判斷隊列是否為空,返回bool值
Queue例子 技術分享圖片
import time
from multiprocessing import Process,Queue

def f(q):
    q.put([time.asctime(),from Eva,hello]) # 調用主函數中p進程傳遞過來的進程參數put向隊列中添加數據。

if __name__ == __main__:
    q = Queue()     # 創建一個Queue對象
    p = Process(target=f,args=(q,))     # 創建一個進程
    p.start()
    print(q.get())  # [‘Fri May 11 17:28:43 2018‘, ‘from Eva‘, ‘hello‘]
    p.join()
子進程發送數據給父進程 技術分享圖片
import os
import time
import multiprocessing

# 向queue中輸入數據的函數
def inputQ(queue):
    info = str(os.getpid()) + (put): + str(time.asctime())
    queue.put(info)

# 向queue中輸出數據的函數
def outputQ(queue):
    info = queue.get()
    print(%s%s\033[32m%s\033[0m%(str(os.getpid()),(get):,info))

if __name__ == __main__:
    multiprocessing.freeze_support()
    record1 = []
    record2 = []
    queue = multiprocessing.Queue(3)

    # 輸入進程
    for i in range(10):
        process = multiprocessing.Process(target=inputQ,args=(queue,))
        process.start()
        record1.append(process)

    # 輸出進程
    for i in range(10):
        process = multiprocessing.Process(target=outputQ, args=(queue,))
        process.start()
        record2.append(process)

    for p in record1:
        p.join()

    for p in record2:
        p.join()
批量生產數據放入隊列在批量獲得結果

網絡編程——進程同步