1. 程式人生 > >Python並行程式設計(十二):程序同步

Python並行程式設計(十二):程序同步

1、基本概念

      多個程序可以協同工作來完成一項任務,通常需要共享資料。所以在多程序之間保持資料的一致性就很重要,需要共享資料協同的程序必須以適當的策略來讀寫資料。同步原語和執行緒的庫類似。

      - Lock:一個Lock物件有兩個方法acquire和release來控制共享資料的讀寫許可權。

      - Event:一個程序發事件的訊號,另一個程序等待事件的訊號。Event物件有兩個方法set和clear來管理自己內部的變數。

      - Condition:此物件用來同步部分工作流程,在並行的程序中,有兩個基本的方法,wait()用來等待程序,notify_all用來通知所有等待此條件的程序。

      - Semaphore:用來共享資源,比如:支援固定資料的共享連線。

      - RLock:遞迴鎖物件,其用途和方法同Threading模組一樣。

      - Barrier:將程式分成幾個階段,適用於有些程序必須在某些特性程序之後執行,處於Barrier之後的程式碼不能同處於Barrier之前的程式碼並行。

2、測試用例

      使用barrier函式來同步兩個程序

import multiprocessing
from multiprocessing import Barrier, Lock, Process
from time import time
from
datetime import datetime def test_with_barrier(synchronizer, serializer): name = multiprocessing.current_process().name synchronizer.wait() now = time() with serializer: print("process %s ----> %s" %(name, datetime.fromtimestamp(now))) def test_without_barrier(): name
= multiprocessing.current_process().name now = time() print("process %s ----> %s" %(name, datetime.fromtimestamp(now))) if __name__ == "__main__": # create a barrier and lock. synchronizer = Barrier(2) serializer = Lock() # create four processes Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer, serializer)).start() Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer, serializer)).start() Process(name='p3 - test_without_barrier', target=test_without_barrier).start() Process(name='p4 - test_without_barrier', target=test_without_barrier).start()

      執行結果如下:

      

      test_with_barrier函式呼叫了barrier的wait()方法,當兩個程序都呼叫wait()方法時,他們會一起繼續執行。

3、程序之間管理狀態

      Python的多程序模組提供了在所有的使用者間管理共享資訊的管理者(Manager),一個管理者物件控制著持有Python物件的服務程序,並允許其他程序操作共享物件。

      管理者特性:

      - 它控制著管理共享物件的服務程序

      - 它確保當某一程序修改了共享物件之後,所有的程序拿到的共享物件都得到了更新。

      程式碼示例:

import multiprocessing

def worker(dictionary, key, item):
    dictionary[key] = item
    print("key = %d value = %d" %(key, item))

if __name__ == "__main__":
    mgr = multiprocessing.Manager()
    dictionary = mgr.dict()
    jobs = [multiprocessing.Process(target=worker, args=(dictionary, i, i*2)) for i in range(10)]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print("Results:",dictionary)

      執行結果:

      

      上述程式碼建立了一個管理者字典dictionary,在n個job之間共享,每個job都會更新字典的某一個index,所有的job完成之後,最後列印該字典,所有資料均存在。