1. 程式人生 > >併發程式設計之多執行緒

併發程式設計之多執行緒

一、什麼是執行緒

在傳統作業系統中,每個程序有一個地址空間,而且預設就有一個控制執行緒

執行緒顧名思義,就是一條流水線工作的過程,一條流水線必須屬於一個車間,一個車間的工作過程是一個程序

車間負責把資源整合到一起,是一個資源單位,而一個車間內至少有一個流水線  流水線的工作需要電源,電源就相當於cpu

所以,程序只是用來把資源集中到一起(程序只是一個資源單位,或者說資源集合),而執行緒才是cpu上的執行單位。

多執行緒(即多個控制執行緒)的概念是,在一個程序中存在多個控制執行緒,多個控制執行緒共享該程序的地址空間,相當於一個車間內有多條流水線,都共用一個車間的資源。

開啟執行緒的兩種方式

from
threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() print('主執行緒')
方式一
from threading import Thread
import time

class Sayhi(Thread):
    def __init__(self,name):
        super().
__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('egon') t.start() print('主執行緒')
方式二 

二、執行緒程序的區別(挺重要)

執行緒 和程序的區別
程序是一個資源單位一個程序可以包含多個執行緒多個執行緒之間資料可以共享執行緒開銷比程序小
在多執行緒中CPU的切換速度會非常快 但資源消耗沒有程序高

英文鍛鍊:

1、Threads share the address space of the process that created it; processes have their own address space.
2、Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
3、Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
4、New threads are easily created; new processes require duplication of the parent process.
5、Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
6、Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
  多執行緒指的是,在一個程序中開啟多個執行緒,簡單的講:如果多個任務共用一塊地址空間,那麼必須在一個程序內開啟多個執行緒。詳細的講分為4點:

  1. 多執行緒共享一個程序的地址空間

      2. 執行緒比程序更輕量級,執行緒比程序更容易建立可撤銷,在許多作業系統中,建立一個執行緒比建立一個程序要快10-100倍,在有大量執行緒需要動態和快速修改時,這一特性很有用

      3. 若多個執行緒都是cpu密集型的,那麼並不能獲得性能上的增強,但是如果存在大量的計算和大量的I/O處理,擁有多個執行緒允許這些活動彼此重疊執行,從而會加快程式執行的速度。

      4. 在多cpu系統中,為了最大限度的利用多核,可以開啟多個執行緒,比開程序開銷要小的多。(這一條並不適用於python)
為何要用多執行緒 

三、tread物件的其他屬性或方法

Thread例項物件的方法
  # isAlive(): 返回執行緒是否活動的。
  # getName(): 返回執行緒名。
  # setName(): 設定執行緒名。

threading模組提供的一些方法:
  # threading.currentThread(): 返回當前的執行緒變數。
  # threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
  # threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。
介紹
from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主程序下開啟執行緒
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主執行緒
    print(threading.enumerate()) #連同主執行緒在內有兩個執行的執行緒
    print(threading.active_count())
    print('主執行緒/主程序')
驗證
MainThread
<_MainThread(MainThread, started 140735268892672)>
[<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
主執行緒/主程序
Thread-1
執行結果
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.start()
    t.join()
    print('主執行緒')
    print(t.is_alive())
主執行緒等待子執行緒結束
egon say hello
主執行緒
False
執行結果

四、守護執行緒

無論是程序還是執行緒,都遵循:守護xxx會等待主xxx執行完畢後被銷燬

需要強調的是:執行完畢並非終止執行

1、對主程序來說,執行完畢指的是主程序程式碼執行完畢

2、對主執行緒來說,執行完畢指的是主執行緒所在的程序內所有非守護執行緒統統執行完畢,主執行緒才算執行完畢
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必須在t.start()之前設定
    t.start()

    print('主執行緒')
    print(t.is_alive())
驗證
主執行緒
True
執行結果

五、GIL全域性直譯器鎖(特別之處 記住)

什麼是GIL?   全域性直譯器鎖 僅存在與Cpython
為什麼需要它?   在同一時間只有一個執行緒在使用直譯器
如果程式中只有一個執行緒還需要嗎?  直譯器會自己啟動垃圾回收機制,也會造成直譯器的競爭問題

例如 GC發現變數x引用計數為0 正準備清掃 CPU突然切換到了另一個執行緒a
a拿著x進行使用 在使用的過程中 又切換到了GC GC接著把X指向的空間進行釋放
這樣一來a中的x就無法使用了 GIL將分配記憶體回收記憶體相關的操作加了鎖
GIL無法避免自定義的執行緒中的資料競爭問題

GIL帶來的問題?

加鎖雖然保證了資料的安全 但是降低了效能 在多CPU的機器上 無法利用多核提升效率
其他執行緒要想執行 必須等到之前的執行緒釋放了GIL 這就意味著 同一時間只有一個執行緒在執行

為什麼不用其他直譯器?

因為cpython是c語言實現 可以無縫對接c現有的所有庫 就是很多現成的功能

GIL 和 自定義互斥鎖的異同點
相同點:都是互斥鎖 爭搶執行權是無序的 執行被鎖定的程式碼時有序的
不同點:GIL鎖的是直譯器的資料 自定義互斥鎖所得是使用者自定義的資料

GIL的加鎖與解鎖 是自動執行的
自動釋放的時間點: io/程式碼執行完畢 同一執行緒執行時間過長3ms(py3中)
執行的位元組碼指令數量達到一定值(py2中)

from threading import Thread,Lock
import os,time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果肯定為0,由原來的併發執行變成序列,犧牲了執行效率保證了資料安全,不加鎖則結果可能為99
程式碼示範

六、死鎖現象與遞迴鎖

所謂死鎖: 是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序

from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
死鎖
Thread-1 拿到A鎖
Thread-1 拿到B鎖
Thread-1 拿到B鎖
Thread-2 拿到A鎖 #出現死鎖,整個程式阻塞住
執行結果

解決方法,遞迴鎖,在Python中為了支援在同一執行緒中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖,二者的區別是:遞迴鎖可以連續acquire多次,而互斥鎖只能acquire一次

from threading import Thread,RLock
import time

mutexA=mutexB=RLock() #一個執行緒拿到鎖,counter加1,該執行緒內又碰到加鎖的情況,則counter繼續加1,這期間所有其他執行緒都只能等待,等待該執行緒釋放所有鎖,即counter遞減到0為止

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()
    def func1(self):
        mutexA.acquire()
        print('\033[41m%s 拿到A鎖\033[0m' %self.name)

        mutexB.acquire()
        print('\033[42m%s 拿到B鎖\033[0m' %self.name)
        mutexB.release()

        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print('\033[43m%s 拿到B鎖\033[0m' %self.name)
        time.sleep(2)

        mutexA.acquire()
        print('\033[44m%s 拿到A鎖\033[0m' %self.name)
        mutexA.release()

        mutexB.release()

if __name__ == '__main__':
    for i in range(10):
        t=MyThread()
        t.start()
View Code

七、同步非同步 阻塞和非阻塞 非同步回撥

同步:提交任務需要等待任務執行完成才能繼續執行
非同步:提交任務不需要等待任務執行 可以立即繼續執行
指的都是提交任務的方式
阻塞:遇到IO 失去了CPU執行權 看上去也是在等 與同步會混淆
非阻塞:就緒,執行,程式碼正常執行
非同步回撥:
為什麼需要回調?
子程序幫助主程序完成任務 處理任務的結果應該交還給準程序
其他方式也可以將資料交還給主程序
1.shutdown 主程序會等到所有任務完成
2.result函式 會阻塞直到任務完成
都會阻塞 導致效率降低 所以使用回撥
注意:
回撥函式什麼時候被執行? 子程序任務完成時
誰在執行回撥函式? 主程序
執行緒的非同步回撥
使用方式都相同 唯一的不同是執行回撥函式 是子執行緒在執行 

八、執行緒佇列

class queue.Queue(maxsize=0) #佇列:先進先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())



'''
結果(先進先出):
first
second
third
'''

class queue.LifoQueue(maxsize=0) #堆疊:last in fisrt out

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())



'''
結果(後進先出):
third
second
first
'''

class queue.PriorityQueue(maxsize=0) #優先順序佇列:儲存資料時可設定優先順序的佇列

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先順序(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())



'''
結果(數字越小優先順序越高,優先順序高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''
import queue
# 普通佇列 先進先出
q = queue.Queue()
q.put("a")
q.put("b")


print(q.get())
print(q.get())

# 堆疊佇列  先進後出 後進先出  函式呼叫就是進棧  函式結束就出棧 遞迴造成棧溢位
q2 = queue.LifoQueue()
q2.put("a")
q2.put("b")
print(q2.get())


# 優先順序佇列
q3 = queue.PriorityQueue()  # 數值越小優先順序越高  優先順序相同時 比較大小 小的先取
q3.put((-100,"c"))
q3.put((1,"a"))
q3.put((100,"b"))
print(q3.get())
三種形式

九、執行緒池和程序池

什麼是池? 一種儲存資料的容器 要儲存的資料是執行緒或程序
為什麼使用? 為了方便管多執行緒或程序 (在有很多子程序或子執行緒的情況下)
有什麼特點:
1.管理程序的建立
2.管理程序的銷燬
3.負責任務的分配
4.控制最大併發數量
注意:用池來處理TCP 是不正確的 因為程序中程式碼執行完畢才算空閒

1、submit(fn, *args, **kwargs)
非同步提交任務

2、map(func, *iterables, timeout=None, chunksize=1) 
取代for迴圈submit的操作

3、shutdown(wait=True) 
相當於程序池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源後才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait引數為何值,整個程式都會等到所有任務執行完畢
submit和map必須在shutdown之前

4、result(timeout=None)
取得結果

5、add_done_callback(fn)
回撥函式
基本使用方法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())
進/執行緒池用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit
map用法

 回撥函式:

可以為程序池或執行緒池內的每個程序或執行緒繫結一個函式,該函式在程序或執行緒的任務執行完畢後自動觸發,並接收任務的返回值當作引數,該函式稱為回撥函式

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<程序%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<程序%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future物件obj,需要用obj.result()拿到結果
view code

十、訊號量,事件、定時器

訊號量

from threading import Thread,Semaphore,current_thread,active_count

import time
# 用於控制 同時執行被鎖定程式碼的執行緒數量   也就是執行緒的併發數量
# 也是一種鎖
sm = Semaphore(1)

def task():
    sm.acquire()
    for i in range(10):
        print(current_thread())
        time.sleep(0.5)
    sm.release()

def task2():
     for i in range(10):
        print(current_thread())
        time.sleep(0.5)


for i in range(5):
    Thread(target=task).start()
    Thread(target=task2).start()
print(active_count())

事件

事件是什麼? 某件事情發生的訊號
用來幹什麼? 線上程間通訊 然而執行緒本來就能通訊  作用只有一個就是簡化程式碼

執行緒間通訊的例子
伺服器啟動需要五秒
客戶端啟動後去連線伺服器
去連線伺服器必須保證伺服器已經開啟成功了

是否啟動完成就是要通訊的內容

注意 Event執行緒通訊 僅僅用於簡單的條件判斷 說白了代替bool型別 和if判斷
set() 將狀態修改為True
wati() 等待狀態為True才繼續執行

import time
from threading import Thread
boot = False
def server_task():
    global boot
    print("正在啟動....")
    time.sleep(5)
    print("啟動....成功")
    boot = True

def client_task():
    while True:
        print("連線伺服器....")
        time.sleep(1)
        if boot:
            print("連線成功")
            break
        else:
            print("error 連線失敗 伺服器未啟動!!")

t1 = Thread(target=server_task)
t1.start()

t2 = Thread(target=client_task)
t2.start()

t1.join()
t2.join()
# import time
# from threading import Thread,Event
# event =Event()
#
# def server_task():
#     print("正在啟動....")
#     time.sleep(5)
#     print("啟動....成功")
#     event.set()
#
# def client_task():
#     event.wait() #一個阻塞的函式  會阻塞直到對event執行set函式為止
#     print("連線成功!")
#
# t1 = Thread(target=server_task)
# t1.start()
# t2 = Thread(target=client_task)
# t2.start()
使用事件實現

定時器

定時器,指定n秒後執行某個操作

from threading import Timer
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed