1. 程式人生 > >初識多線程__下

初識多線程__下

多線程開發 count 什麽是 ML 互斥 消費者模式 roc 再次 能夠

初識多線程__上

互斥鎖

當多個線程幾乎同時修改某一個共享數據的時候,需要進行同步控制

線程同步能夠保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。

互斥鎖為資源引入一個狀態:鎖定/非鎖定。

某個線程要更改共享數據時,先將其鎖定,此時資源的狀態為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性。

threading模塊中定義了Lock類,可以方便的處理鎖定:

#創建鎖
mutex = threading.Lock()
#鎖定
mutex.acquire([blocking])
#釋放
mutex.release()

其中,鎖定方法acquire可以有一個blocking參數。

  • 如果設定blocking為True,則當前線程會堵塞,直到獲取到這個鎖為止(如果沒有指定,那麽默認為True)
  • 如果設定blocking為False,則當前線程不會堵塞

使用互斥鎖實現上面的例子的代碼如下:

技術分享圖片
from threading import Thread, Lock
import time

g_num = 0

def test1():
    global g_num
    for i in range(1000000):
        #True表示堵塞 即如果這個鎖在上鎖之前已經被上鎖了,那麽這個線程會在這裏一直等待到解鎖為止 
#False表示非堵塞,即不管本次調用能夠成功上鎖,都不會卡在這,而是繼續執行下面的代碼 mutexFlag = mutex.acquire(True) if mutexFlag: g_num += 1 mutex.release() print("---test1---g_num=%d"%g_num) def test2(): global g_num for i in range(1000000): mutexFlag = mutex.acquire(True) #
True表示堵塞 if mutexFlag: g_num += 1 mutex.release() print("---test2---g_num=%d"%g_num) #創建一個互斥鎖 #這個所默認是未上鎖的狀態 mutex = Lock() p1 = Thread(target=test1) p1.start() p2 = Thread(target=test2) p2.start() print("---g_num=%d---"%g_num)
代碼示例 技術分享圖片
---g_num=61866---
---test1---g_num=1861180
---test2---g_num=2000000
運行結果:

可以看到,加入互斥鎖後,運行結果與預期相符。

上鎖解鎖過程

當一個線程調用鎖的acquire()方法獲得鎖時,鎖就進入“locked”狀態。

每次只有一個線程可以獲得鎖。如果此時另一個線程試圖獲得這個鎖,該線程就會變為“blocked”狀態,稱為“阻塞”,直到擁有鎖的線程調用鎖的release()方法釋放鎖之後,鎖進入“unlocked”狀態。

線程調度程序從處於同步阻塞狀態的線程中選擇一個來獲得鎖,並使得該線程進入運行(running)狀態。

總結

鎖的好處:

  • 確保了某段關鍵代碼只能由一個線程從頭到尾完整地執行

鎖的壞處:

  • 阻止了多線程並發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率就大大地下降了
  • 由於可以存在多個鎖,不同的線程持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖

多線程-非共享數據

對於全局變量,在多線程中要格外小心,否則容易造成數據錯亂的情況發生

1. 非全局變量是否要加鎖呢?

技術分享圖片
    #coding=utf-8
    import threading
    import time

    class MyThread(threading.Thread):
        # 重寫 構造方法
        def __init__(self,num,sleepTime):
            threading.Thread.__init__(self)
            self.num = num
            self.sleepTime = sleepTime

        def run(self):
            self.num += 1
            time.sleep(self.sleepTime)
            print(線程(%s),num=%d%(self.name, self.num))

    if __name__ == __main__:
        mutex = threading.Lock()
        t1 = MyThread(100,5)
        t1.start()
        t2 = MyThread(200,1)
        t2.start()
    import threading
    from time import sleep

    def test(sleepTime):
        num=1
        sleep(sleepTime)
        num+=1
        print(---(%s)--num=%d%(threading.current_thread(), num))


    t1 = threading.Thread(target = test,args=(5,))
    t2 = threading.Thread(target = test,args=(1,))

    t1.start()
    t2.start()
代碼示例

小總結

  • 在多線程開發中,全局變量是多個線程都共享的數據,而局部變量等是各自線程的,是非共享的

死鎖

1. 死鎖

在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源並且同時等待對方的資源,就會造成死鎖。

盡管死鎖很少發生,但一旦發生就會造成應用的停止響應。下面看一個死鎖的例子

技術分享圖片
#coding=utf-8
import threading
import time

class MyThread1(threading.Thread):
    def run(self):
        if mutexA.acquire():
            print(self.name+----do1---up----)
            time.sleep(1)

            if mutexB.acquire():
                print(self.name+----do1---down----)
                mutexB.release()
            mutexA.release()

class MyThread2(threading.Thread):
    def run(self):
        if mutexB.acquire():
            print(self.name+----do2---up----)
            time.sleep(1)
            if mutexA.acquire():
                print(self.name+----do2---down----)
                mutexA.release()
            mutexB.release()

mutexA = threading.Lock()
mutexB = threading.Lock()

if __name__ == __main__:
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()
代碼示例

此時已經進入到了死鎖狀態,可以使用ctrl-z退出

3. 避免死鎖

  • 程序設計時要盡量避免

  • 添加超時時間等

同步應用

多個線程有序執行

技術分享圖片
from threading import Thread,Lock
from time import sleep

class Task1(Thread):
  def run(self):
      while True:
          if lock1.acquire():
              print("------Task 1 -----")
              sleep(0.5)
              lock2.release()

class Task2(Thread):
  def run(self):
      while True:
          if lock2.acquire():
              print("------Task 2 -----")
              sleep(0.5)
              lock3.release()

class Task3(Thread):
  def run(self):
      while True:
          if lock3.acquire():
              print("------Task 3 -----")
              sleep(0.5)
              lock1.release()


#使用Lock創建出的鎖默認沒有“鎖上”

lock1 = Lock()

#創建另外一把鎖,並且“鎖上”

lock2 = Lock()
lock2.acquire()

#創建另外一把鎖,並且“鎖上”

lock3 = Lock()
lock3.acquire()

t1 = Task1()
t2 = Task2()
t3 = Task3()

t1.start()
t2.start()
t3.start()
代碼示例 技術分享圖片
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
...省略...
運行結果:

總結

  • 可以使用互斥鎖完成多個任務,有序的進程工作,這就是線程的同步

生產者與消費者模式

1. 隊列

先進先出

2. 棧

先進後出

Python的Queue模塊中提供了同步的、線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先級隊列PriorityQueue。這些隊列都實現了鎖原語(可以理解為原子操作,即要麽不做,要麽就做完),能夠在多線程中直接使用。可以使用隊列來實現線程間的同步。

用FIFO隊列實現上述生產者與消費者問題的代碼如下:

技術分享圖片
#encoding=utf-8

import threading
import time


#python2中

from Queue import Queue


#python3中


# from queue import Queue


class Producer(threading.Thread):
    def run(self):
        global queue
        count = 0
        while True:
            if queue.qsize() < 1000:
                for i in range(100):
                    count = count +1
                    msg = 生成產品+str(count)
                    queue.put(msg)
                    print(msg)
            time.sleep(0.5)

class Consumer(threading.Thread):
    def run(self):
        global queue
        while True:
            if queue.qsize() > 100:
                for i in range(3):
                    msg = self.name + 消費了 +queue.get()
                    print(msg)
            time.sleep(1)


if __name__ == __main__:
    queue = Queue()

    for i in range(500):
        queue.put(初始產品+str(i))
    for i in range(2):
        p = Producer()
        p.start()
    for i in range(5):
        c = Consumer()
        c.start()
代碼示例

3. Queue的說明

  1. 對於Queue,在多線程通信之間扮演重要的角色
  2. 添加數據到隊列中,使用put()方法
  3. 從隊列中取數據,使用get()方法
  4. 判斷隊列中是否還有數據,使用qsize()方法

4. 生產者消費者模式的說明

  • 為什麽要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麽生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

  • 什麽是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

這個阻塞隊列就是用來給生產者和消費者解耦的。縱觀大多數設計模式,都會找一個第三者出來進行解耦,

ThreadLocal

在多線程環境下,每個線程都有自己的數據。一個線程使用自己的局部變量比使用全局變量好,因為局部變量只有線程自己能看見,不會影響其他線程,而全局變量的修改必須加鎖。

1. 使用函數傳參的方法

但是局部變量也有問題,就是在函數調用的時候,傳遞起來很麻煩:

技術分享圖片
def process_student(name):
    std = Student(name)
    # std是局部變量,但是每個函數都要用它,因此必須傳進去:
    do_task_1(std)
    do_task_2(std)

def do_task_1(std):
    do_subtask_1(std)
    do_subtask_2(std)

def do_task_2(std):
    do_subtask_2(std)
    do_subtask_2(std)
代碼示例

每個函數一層一層調用都這麽傳參數那還得了?用全局變量?也不行,因為每個線程處理不同的Student對象,不能共享。

2. 使用全局字典的方法

如果用一個全局dict存放所有的Student對象,然後以thread自身作為key獲得線程對應的Student對象如何?

技術分享圖片
global_dict = {}

def std_thread(name):
    std = Student(name)
    # 把std放到全局變量global_dict中:
    global_dict[threading.current_thread()] = std
    do_task_1()
    do_task_2()

def do_task_1():
    # 不傳入std,而是根據當前線程查找:
    std = global_dict[threading.current_thread()]
    ...

def do_task_2():
    # 任何函數都可以查找出當前線程的std變量:
    std = global_dict[threading.current_thread()]
    ...
代碼示例

這種方式理論上是可行的,它最大的優點是消除了std對象在每層函數中的傳遞問題,但是,每個函數獲取std的代碼有點low。

有沒有更簡單的方式?

3. 使用ThreadLocal的方法

ThreadLocal應運而生,不用查找dict,ThreadLocal幫你自動做這件事:

技術分享圖片
import threading


# 創建全局ThreadLocal對象:

local_school = threading.local()

def process_student():
    # 獲取當前線程關聯的student:
    std = local_school.student
    print(Hello, %s (in %s) % (std, threading.current_thread().name))

def process_thread(name):
    # 綁定ThreadLocal的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=(dongGe,), name=Thread-A)
t2 = threading.Thread(target= process_thread, args=(老王,), name=Thread-B)
t1.start()
t2.start()
t1.join()
t2.join()
代碼示例 技術分享圖片
Hello, dongGe (in Thread-A)
Hello, 老王 (in Thread-B)
執行結果

說明

全局變量local_school就是一個ThreadLocal對象,每個Thread對它都可以讀寫student屬性,但互不影響。你可以把local_school看成全局變量,但每個屬性如local_school.student都是線程的局部變量,可以任意讀寫而互不幹擾,也不用管理鎖的問題,ThreadLocal內部會處理。

可以理解為全局變量local_school是一個dict,不但可以用local_school.student,還可以綁定其他變量,如local_school.teacher等等。

ThreadLocal最常用的地方就是為每個線程綁定一個數據庫連接,HTTP請求,用戶身份信息等,這樣一個線程的所有調用到的處理函數都可以非常方便地訪問這些資源。

4. 小結

一個ThreadLocal變量雖然是全局變量,但每個線程都只能讀寫自己線程的獨立副本,互不幹擾。ThreadLocal解決了參數在一個線程中各個函數之間互相傳遞的問題

異步

  • 同步調用就是你 喊 你朋友吃飯 ,你朋友在忙 ,你就一直在那等,等你朋友忙完了 ,你們一起去
  • 異步調用就是你 喊 你朋友吃飯 ,你朋友說知道了 ,待會忙完去找你 ,你就去做別的了。
技術分享圖片
from multiprocessing import Pool
import time
import os

def test():
    print("---進程池中的進程---pid=%d,ppid=%d--"%(os.getpid(),os.getppid()))
    for i in range(3):
        print("----%d---"%i)
        time.sleep(1)
    return "hahah"

def test2(args):
    print("---callback func--pid=%d"%os.getpid())
    print("---callback func--args=%s"%args)

pool = Pool(3)
pool.apply_async(func=test,callback=test2)

time.sleep(5)

print("----主進程-pid=%d----"%os.getpid())
代碼示例 技術分享圖片
---進程池中的進程---pid=9401,ppid=9400--
----0---
----1---
----2---
---callback func--pid=9400
---callback func--args=hahah
----主進程-pid=9400----
運行結果

初識多線程__下