Python執行緒同步
執行緒執行
join與setDaemon
子執行緒在主執行緒執行結束後,會繼續執行完,如果給子執行緒設定為守護執行緒(setDaemon=True),主執行緒執行結束子執行緒即結束;
如果join()執行緒,那麼主執行緒會等待子執行緒執行完再執行。
1 import threading 2 import time 3 4 5 def get_thread_a(): 6print("get thread A started") 7time.sleep(3) 8print("get thread A end") 9 10 11 def get_thread_b(): 12print("get thread B started") 13time.sleep(5) 14print("get thread B end") 15 16 17 if__name__ == "__main__": 18thread_a = threading.Thread(target=get_thread_a) 19thread_b = threading.Thread(target=get_thread_b) 20start_time = time.time() 21thread_b.setDaemon(True) 22thread_a.start() 23thread_b.start() 24thread_a.join() 25 26end_time = time.time() 27print("execution time: {}".format(end_time - start_time))
thread_a是join,首先子執行緒thread_a執行,thread_b是守護執行緒,當主執行緒執行完後,thread_b不會再執行 執行結果如下:
get thread A started get thread B started get thread A end execution time: 3.003199815750122
執行緒同步
當執行緒間共享全域性變數,多個執行緒對該變數執行不同的操作時,該變數最終的結果可能是不確定的(每次執行緒執行後的結果不同),如:ofollow,noindex">對count變數執行加減操作 ,count的值是不確定的,要想count的值是一個確定的需對執行緒執行的程式碼段加鎖。
python對執行緒加鎖主要有Lock和Rlock模組
Lock:
from threading import Lock lock = Lock() lock.acquire() lock.release()
Lock有acquire()和release()方法,這兩個方法必須是成對出現的,acquire()後面必須release()後才能再acquire(),否則會造成死鎖
Rlock:
鑑於Lock可能會造成死鎖的情況,RLock(可重入鎖)對Lock進行了改進,RLock可以在同一個執行緒裡面連續呼叫多次acquire(),但必須 再執行相同次數的release()
from threading import RLock lock = RLock() lock.acquire() lock.acquire() lock.release() lock.release()
condition(條件變數), 執行緒在執行時,當滿足了特定的條件後,才可以訪問相關的資料
import threading def get_thread_a(condition): with condition: condition.wait() print("A : Hello B,that's ok") condition.notify() condition.wait() print("A : I'm fine,and you?") condition.notify() condition.wait() print("A : Nice to meet you") condition.notify() condition.wait() print("A : That's all for today") condition.notify() def get_thread_b(condition): with condition: print("B : Hi A, Let's start the conversation") condition.notify() condition.wait() print("B : How are you") condition.notify() condition.wait() print("B : I'm fine too") condition.notify() condition.wait() print("B : Nice to meet you,too") condition.notify() condition.wait() print("B : Oh,goodbye") if __name__ == "__main__": condition = threading.Condition() thread_a = threading.Thread(target=get_thread_a, args=(condition,)) thread_b = threading.Thread(target=get_thread_b, args=(condition,)) thread_a.start() thread_b.start()
Condition內部有一把鎖,預設是RLock,在呼叫wait()和notify()之前必須先呼叫acquire()獲取這個鎖,才能繼續執行;當wait()和notify()執行完後,需呼叫release()釋放這個鎖,在執行 with condition時,會先執行acquire(),with結束時,執行了release();所以condition有兩層鎖,最底層鎖在呼叫wait()時會釋放,同時會加一把鎖到等待佇列,等待notify()喚醒釋放鎖
wait() :允許等待某個條件變數的通知,notify()可喚醒
notify(): 喚醒等待佇列wait()
執行結果:
B : Hi A, Let's start the conversation A : Hello B,that's ok B : How are you A : I'm fine,and you? B : I'm fine too A : Nice to meet you B : Nice to meet you,too A : That's all for today B : Oh,goodbye
Semaphore(訊號量)
用於控制執行緒的併發數,如爬蟲中請求次數過於頻繁會被禁止ip,每次控制爬取網頁的執行緒數量可在一定程度上防止ip被禁;檔案讀寫中,控制寫執行緒每次只有一個,讀執行緒可多個。
import time import threading def get_thread_a(semaphore,i): time.sleep(1) print("get thread : {}".format(i)) semaphore.release() def get_thread_b(semaphore): for i in range(10): semaphore.acquire() thread_a = threading.Thread(target=get_thread_a, args=(semaphore,i)) thread_a.start() if __name__ == "__main__": semaphore = threading.Semaphore(2) thread_b = threading.Thread(target=get_thread_b, args=(semaphore,)) thread_b.start()
上述示例了每隔1秒併發兩個執行緒執行的情況,當呼叫一次semaphore.acquire()時,Semaphore的數量就減1,直至Semaphore數量為0時被鎖上,當release()後Semaphore數量加1。Semaphore在本質上是呼叫的Condition,semaphore.acquire()在Semaphore的值為0的條件下會呼叫Condition.wait(), 否則將值減1,semaphore.release()會將Semaphore的值加1,並呼叫Condition.notify()
Semaphore原始碼
def acquire(self, blocking=True, timeout=None): if not blocking and timeout is not None: raise ValueError("can't specify timeout for non-blocking acquire") rc = False endtime = None with self._cond: while self._value == 0: if not blocking: break if timeout is not None: if endtime is None: endtime = _time() + timeout else: timeout = endtime - _time() if timeout <= 0: break self._cond.wait(timeout) else: self._value -= 1 rc = True return rc def release(self): with self._cond: self._value += 1 self._cond.notify()