1. 程式人生 > >python之執行緒程序模組

python之執行緒程序模組

1.threading

Thread類呼叫方式

  • 建立Thread的例項,傳給它一個函式
  • 建立Thread的例項,傳給它一個可呼叫的類例項
  • 派生Thread的子類,並建立子類的例項

模組函式

  • active_count():返回當前alive狀態的Thread物件的個數
  • current_thread():返回當前的Thread物件,對應於呼叫者控制的執行緒。如果呼叫者控制的執行緒不是通過threading模組建立的,則返回一個只有有限功能的虛假執行緒物件。
  • get_ident():返回當前執行緒的’執行緒識別符號’。它是一個非零的整數。
  • enumerate():返回當前活著的Thread物件的列表。該列表包括守護執行緒、由current_thread()建立的虛假執行緒物件和主執行緒。它不包括已終止的執行緒和尚未開始的執行緒。
  • main_thread():返回主 Thread 物件。在正常情況下,主執行緒是從 Python 直譯器中啟動的執行緒。
  • settrace(func):為所有從threading模組啟動的執行緒設定一個跟蹤函式。在每個執行緒的run()方法呼叫之前,func將傳遞給sys.settrace()。
  • setprofile(func):為所有從threading模組啟動的執行緒設定一個profile函式。這個profile函式將在每個執行緒的run()方法被呼叫之前傳遞給sys.setprofile()。
  • stack_size([size]):返回建立新的執行緒時該執行緒使用的棧的大小。

模組常量

  • TIMEOUT_MAX:timeout引數表示阻塞函式 (Lock.acquire(), RLock.acquire(), Condition.wait(), 等)所允許等待的最長時限。指定超過此值的超時將引發OverflowError。

模組的類
class threading.local
表示thread-local資料的一個類。

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

  • start():開始執行緒的活動
  • run():執行緒執行的內容
  • join(timeout=None):等待直至執行緒終止
  • name:執行緒姓名
  • ident:執行緒ID
  • is_alive():返回執行緒是否還活著
  • daemon:返回執行緒是否是守護執行緒

class threading.Lock

  • acquire(blocking=True, timeout=-1):獲取一把鎖,阻塞的或者非阻塞的。
  • release():釋放一把鎖

class threading.RLock

  • acquire(blocking=True, timeout=-1):獲取一把鎖,阻塞的或者非阻塞的。
  • release():釋放一把鎖

class threading.Condition(lock=None)

  • acquire(*args)
  • release()
  • wait(timeout=None)
  • wait_for(predicate, timeout=None)
  • notify(n=1)
  • notify_all()

class threading.Semaphore(value=1)

  • acquire(blocking=True, timeout=None)
  • release()

class threading.BoundedSemaphore(value=1)

class threading.Event
事件物件是執行緒間最簡單的通訊機制之一:執行緒可以啟用在一個事件物件上等待的其他執行緒

  • is_set()
  • set()
  • clear()
  • wait(timeout=None)

class threading.Timer(interval, function, args=None, kwargs=None)
建立一個timer,在interval秒過去之後,它將以引數args和關鍵字引數kwargs執行function 。

  • cancel()

class threading.Barrier(parties, action=None, timeout=None)

  • wait(timeout=None)
  • reset()
  • abort()
  • parties
  • n_waiting
  • broken

exception threading.BrokenBarrierError
RuntimeError的子類,當Barrier物件被重置或被破壞會被丟擲。

本模組提供的所有具有acquire()和release()方法的物件,可以用作with語句的上下文管理器。

2.multiprocessing

multiprocessing是一個包,它支援使用類似於threading模組的API來生成程序。multiprocessing包提供本地和遠端併發,通過使用子程序而不是執行緒有效地轉移全域性直譯器鎖。因此,multiprocessing模組允許程式設計師充分利用給定機器上的多個處理器。它在Unix和Windows上都可以執行。

上下文和啟動方法
根據平臺,multiprocessing支援三種方式來啟動程序。這些啟動方法是:spawn、fork、forkserver

在程序之間交換物件
multiprocessing支援程序之間的兩種型別的通訊通道:Queue、Pipe

程序之間的同步
multiprocessing包含了全部和threading相同的同步原語 。對於一個例項可以使用鎖來保證同一時間只有一個程序在使用標準輸出。

程序之間的狀態共享
當進行併發程式設計時,通常最好避免使用盡可能共享的狀態。在使用多個程序時尤其如此。
但是,如果真的需要使用一些共享資料,則multiprocessing提供了幾種方法:

  • 共享記憶體(Value、Array)
  • 伺服器程序(Manager)

使用工作程序的程序池
Pool類表示工作程序的程序池。它具有允許將任務以幾種不同的方式分配到工作程序的方法。

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

  • run()
  • start()
  • join([timeout])
  • name
  • is_alive()
  • daemon
  • pid
  • exitcode
  • authkey
  • sentinel
  • terminate()

異常類
exception multiprocessing.ProcessError
所有multiprocessing異常的基類
exception multiprocessing.BufferTooShort
當提供的緩衝區物件對於訊息讀取而言太小時,由Connection.recv_bytes_into()引發的異常
exception multiprocessing.AuthenticationError
在出現身份驗證錯誤時觸發
exception multiprocessing.TimeoutError
在超時超時時由方法引發

Pipes and Queues
當使用多個程序時,通常使用訊息傳遞來用於程序之間的通訊,並避免必須使用任何同步原語,如鎖。

對於傳遞訊息,可以使用Pipe()(用於兩個程序之間的連線)或Queues(允許多個生產者和消費者)。

multiprocessing.Pipe([duplex])
返回表示管道末端的Connection物件的(conn1, conn2)

class multiprocessing.Queue([maxsize])
返回程序共享的佇列,底層使用管道和鎖來實現

  • qsize()
  • empty()
  • full()
  • put(obj[, block[, timeout]])
  • put_nowait(obj)
  • get([block[, timeout]])
  • get_nowait()
  • close()
  • join_thread()
  • cancel_join_thread()

class multiprocessing.SimpleQueue
它是簡化的Queue型別,非常接近鎖定的Pipe

  • empty()
  • get()
  • put(item)

class multiprocessing.JoinableQueue([maxsize])
JoinableQueue,Queue子類是另外具有task_done()和join()方法的佇列

  • task_done()
  • join()

其它內容
multiprocessing.active_children():返回當前程序的所有活的子程序的列表

multiprocessing.cpu_count():返回系統中的CPU數。May引發NotImplementedError(另外os.cpu_count())

multiprocessing.current_process():返回與當前程序相對應的Process物件。

multiprocessing.freeze_support():當使用multiprocessing的程式已凍結以產生Windows可執行檔案時,新增支援,需要在之後直接呼叫此函式 _ name _ == ‘_ main _’ t0 >主模組的線。

multiprocessing.get_all_start_methods():返回支援的開始方法的列表,其中第一個是預設值

multiprocessing.get_context(method=None):返回與multiprocessing模組具有相同屬性的上下文物件

multiprocessing.get_start_method(allow_none=False):返回用於啟動程序的start方法的名稱。

multiprocessing.set_executable():設定在啟動子程序時要使用的Python直譯器的路徑

multiprocessing.set_start_method(method):設定應用於啟動子程序的方法。方法可以是’fork’,’spawn’或’forkserver’。注意,這應該最多呼叫一次,並且應該在內保護if _ name _ == ‘_ main_ ‘子句。

連線物件
class multiprocessing.Connection
連線物件允許傳送和接收可拾取物件或字串。它們可以被認為是面向訊息的連線套接字。通常使用Pipe()建立連線物件

  • send(obj)
  • recv()
  • fileno()
  • close()
  • poll([timeout])
  • send_bytes(buffer[, offset[, size]])
  • recv_bytes([maxlength])
  • recv_bytes_into(buffer[, offset])

同步原語
通常,在多程序程式中同步原語不像在多執行緒程式中那樣是必要的。請注意,還可以使用管理器物件建立同步原語 。

class multiprocessing.Barrier(parties[, action[, timeout]])
屏障物件:threading.Barrier的克隆。

class multiprocessing.BoundedSemaphore([value])
有界訊號物件:threading.BoundedSemaphore的緊密模擬。

class multiprocessing.Condition([lock])
條件變數:threading.Condition的別名。如果指定lock,那麼它應該是來自multiprocessing的Lock或RLock物件。

class multiprocessing.Event
克隆threading.Event

class multiprocessing.Lock
非遞迴鎖定物件:threading.Lock的緊密模擬

  • acquire(block=True, timeout=None)
  • release()

class multiprocessing.RLock
遞迴鎖定物件:threading.RLock的緊密模擬。

  • acquire(block=True, timeout=None)
  • release()

class multiprocessing.Semaphore([value])
訊號量物件:threading.Semaphore的緊密模擬

共享物件
multiprocessing.Value(typecode_or_type, *args, lock=True)
返回從共享記憶體分配的ctypes物件。預設情況下,返回值實際上是物件的同步包裝器。物件本身可以通過Value的值屬性訪問。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
返回從共享記憶體分配的ctypes陣列。預設情況下,返回值實際上是陣列的同步包裝器。

管理器
管理器提供一種建立可在不同程序之間共享的資料的方法,包括在不同機器上執行的程序之間通過網路共享。管理員物件控制管理共享物件的伺服器程序。其他程序可以通過使用代理訪問共享物件。

multiprocessing.Manager()
返回開始的SyncManager物件,可用於在程序之間共享物件。返回的管理器物件對應於生成的子程序,並且具有將建立共享物件並返回相應代理的方法。

管理器程序將在垃圾收集或其父程序退出時立即關閉。管理器類在multiprocessing.managers模組中定義:
class multiprocessing.managers.BaseManager([address[, authkey]])

  • start([initializer[, initargs]])
  • get_server()
  • connect()
  • shutdown()
  • register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
  • address

class multiprocessing.managers.SyncManager
class multiprocessing.managers.Namespace

自定義管理器
要建立自己的管理器,需要建立BaseManager的子類,並使用register() classmethod向管理器類註冊新型別或可呼叫項
例如:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

程序池
可以用Pool類建立一個程序池來執行提交給它的任務
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

  • apply(func[, args[, kwds]])
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]])
  • map(func, iterable[, chunksize])
  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])
  • imap(func, iterable[, chunksize])
  • imap_unordered(func, iterable[, chunksize])
  • starmap(func, iterable[, chunksize])
  • starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
  • close()
  • terminate()

Listeners and Clients
通常,使用佇列或使用Pipe()返回的Connection物件來完成程序之間的訊息傳遞。

但是,multiprocessing.connection模組允許一些額外的靈活性。它基本上提供了一個高階的面向訊息的API來處理套接字或Windows命名管道。它還支援摘要認證使用hmac模組,並且同時輪詢多個連線。

Logging
一些支援日誌記錄可用。但請注意,logging包不使用程序共享鎖,因此可能(取決於處理程式型別)來自不同程序的訊息混淆。

Programming guidelines
使用multiprocessing時,應遵循一定的準則和慣用語。
以下適用於所有啟動方法:

  • 避免共享狀態
  • 可取性
  • 執行緒安全代理
  • 加入殭屍程序
  • 更好地繼承比pickle / unpickle
  • 避免終止程序
  • 加入使用佇列的程序
  • 將資源顯式傳遞給子程序
  • 小心用“檔案類物件”替換sys.stdin
  • -