1. 程式人生 > >網路程式設計和併發之多執行緒程式設計

網路程式設計和併發之多執行緒程式設計

多執行緒threading

執行緒與程序的區別可以歸納為以下4點:   1)地址空間和其它資源(如開啟檔案):程序間相互獨立,同一程序的各執行緒間共享。某程序內的執行緒在其它程序不可見。   2)通訊: 程序間通訊 IPC,執行緒間可以直接讀寫程序資料段(如全域性變數)來進行通訊——需要 程序同步和互斥手段的輔助,以保證資料的一致性。   3)排程和切換:執行緒上下文切換比程序上下文切換要快得多。   4)在多執行緒作業系統中,程序不是一個可執行的實體。    *通過漫畫了解執行緒進城

 

1.threading模組

multiprocess模組的完全模仿了threading模組的介面,二者在使用層面,有很大的相似性

1.1模組的建立

 1 from threading import Thread
 2 import time
 3 def sayhi(name):
 4     time.sleep(2)
 5     print('%s say hello' %name)
 6 
 7 if __name__ == '__main__':
 8     t=Thread(target=sayhi,args=('egon',))
 9     t.start()
10     print('主執行緒')
建立執行緒的方式1
 1 from threading import
Thread 2 import time 3 class Sayhi(Thread): 4 def __init__(self,name): 5 super().__init__() 6 self.name=name 7 def run(self): 8 time.sleep(2) 9 print('%s say hello' % self.name) 10 11 12 if __name__ == '__main__': 13 t = Sayhi('egon') 14 t.start()
15 print('主執行緒')
建立執行緒的方式2

1.2多執行緒與多程序比較

 1 from threading import Thread
 2 from multiprocessing import Process
 3 import os
 4 
 5 def work():
 6     print('hello',os.getpid())
 7 
 8 if __name__ == '__main__':
 9     #part1:在主程序下開啟多個執行緒,每個執行緒都跟主程序的pid一樣
10     t1=Thread(target=work)
11     t2=Thread(target=work)
12     t1.start()
13     t2.start()
14     print('主執行緒/主程序pid',os.getpid())
15 
16     #part2:開多個程序,每個程序都有不同的pid
17     p1=Process(target=work)
18     p2=Process(target=work)
19     p1.start()
20     p2.start()
21     print('主執行緒/主程序pid',os.getpid())
pid的比較
 1 from threading import Thread
 2 from multiprocessing import Process
 3 import os
 4 
 5 def work():
 6     print('hello')
 7 
 8 if __name__ == '__main__':
 9     #在主程序下開啟執行緒
10     t=Thread(target=work)
11     t.start()
12     print('主執行緒/主程序')
13     '''
14     列印結果:
15     hello
16     主執行緒/主程序
17     '''
18 
19     #在主程序下開啟子程序
20     t=Process(target=work)
21     t.start()
22     print('主執行緒/主程序')
23     '''
24     列印結果:
25     主執行緒/主程序
26     hello
27     '''
開啟效率的較量
 1 from  threading import Thread
 2 from multiprocessing import Process
 3 import os
 4 def work():
 5     global n
 6     n=0
 7 
 8 if __name__ == '__main__':
 9     # n=100
10     # p=Process(target=work)
11     # p.start()
12     # p.join()
13     # print('主',n) #毫無疑問子程序p已經將自己的全域性的n改成了0,但改的僅僅是它自己的,檢視父程序的n仍然為100
14 
15 
16     n=1
17     t=Thread(target=work)
18     t.start()
19     t.join()
20     print('',n) #檢視結果為0,因為同一程序內的執行緒之間共享程序內的資料
21 同一程序內的執行緒共享該程序的資料?
記憶體資料的共享問題

1.3Thread類的其他方法

 

Thread例項物件的方法
  # isAlive(): 返回執行緒是否活動的。
  # getName(): 返回執行緒名。
  # setName(): 設定執行緒名。

threading模組提供的一些方法:
  # threading.currentThread(): 返回當前的執行緒變數。
  # threading.enumerate(): 返回一個包含正在執行的執行緒的list。正在執行指執行緒啟動後、結束前,不包括啟動前和終止後的執行緒。
  # threading.activeCount(): 返回正在執行的執行緒數量,與len(threading.enumerate())有相同的結果。
from threading import Thread
import threading
from multiprocessing import Process
import os

def work():
    import time
    time.sleep(3)
    print(threading.current_thread().getName())


if __name__ == '__main__':
    #在主程序下開啟執行緒
    t=Thread(target=work)
    t.start()

    print(threading.current_thread().getName())
    print(threading.current_thread()) #主執行緒
    print(threading.enumerate()) #連同主執行緒在內有兩個執行的執行緒
    print(threading.active_count())
    print('主執行緒/主程序')

    '''
    列印結果:
    MainThread
    <_MainThread(MainThread, started 140735268892672)>
    [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    主執行緒/主程序
    Thread-1
    '''
程式碼示例
 1 from threading import Thread
 2 import time
 3 def sayhi(name):
 4     time.sleep(2)
 5     print('%s say hello' %name)
 6 
 7 if __name__ == '__main__':
 8     t=Thread(target=sayhi,args=('egon',))
 9     t.start()
10     t.join()
11     print('主執行緒')
12     print(t.is_alive())
13     '''
14     egon say hello
15     主執行緒
16     False
17     '''
join方法

1.4守護執行緒

無論是程序還是執行緒,都遵循:守護xx會等待主xx執行完畢後被銷燬。需要強調的是:執行完畢並非終止執行

#1.對主程序來說,執行完畢指的是主程序程式碼執行完畢
#2.對主執行緒來說,執行完畢指的是主執行緒所在的程序內所有非守護執行緒統統執行完畢,主執行緒才算執行完畢
#1 主程序在其程式碼結束後就已經算執行完畢了(守護程序在此時就被回收),然後主程序會一直等非守護的子程序都執行完畢後回收子程序的資源(否則會產生殭屍程序),才會結束,
#2 主執行緒在其他非守護執行緒執行完畢後才算執行完畢(守護執行緒在此時就被回收)。因為主執行緒的結束意味著程序的結束,程序整體的資源都將被回收,而程序必須保證非守護執行緒都執行完畢後才能結束。
詳細解釋
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)

if __name__ == '__main__':
    t=Thread(target=sayhi,args=('egon',))
    t.setDaemon(True) #必須在t.start()之前設定
    t.start()

    print('主執行緒')
    print(t.is_alive())
    '''
    主執行緒
    True
    '''
守護執行緒例1
 1 from threading import Thread
 2 import time
 3 def foo():
 4     print(123)
 5     time.sleep(1)
 6     print("end123")
 7 
 8 def bar():
 9     print(456)
10     time.sleep(3)
11     print("end456")
12 
13 
14 t1=Thread(target=foo)
15 t2=Thread(target=bar)
16 
17 t1.daemon=True
18 t1.start()
19 t2.start()
20 print("main-------")
守護執行緒例2

 

2.鎖

2.1GIL鎖

Python程式碼的執行由Python虛擬機器(也叫直譯器主迴圈)來控制。Python在設計之初就考慮到要在主迴圈中,同時只有一個執行緒在執行。雖然 Python 直譯器中可以“執行”多個執行緒,但在任意時刻只有一個執行緒在直譯器中執行。
  對Python虛擬機器的訪問由全域性直譯器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個執行緒在執行。

  在多執行緒環境中,Python 虛擬機器按以下方式執行:

  a、設定 GIL;

  b、切換到一個執行緒去執行;

  c、執行指定數量的位元組碼指令或者執行緒主動讓出控制(可以呼叫 time.sleep(0));

  d、把執行緒設定為睡眠狀態;

  e、解鎖 GIL;

  d、再次重複以上所有步驟。
  在呼叫外部程式碼(如 C/C++擴充套件函式)的時候,GIL將會被鎖定,直到這個函式結束為止(由於在這期間沒有Python的位元組碼被執行,所以不會做執行緒切換)編寫擴充套件的程式設計師可以主動解鎖GIL。

2.2同步鎖

from threading import Thread
import os,time
def work():
    global n
    temp=n
    time.sleep(0.1)
    n=temp-1
if __name__ == '__main__':
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n) #結果可能為99
多個執行緒搶佔資源的情況
import threading
R=threading.Lock()
R.acquire()
'''
對公共資料的操作
'''
R.release()
 1 from threading import Thread,Lock
 2 import os,time
 3 def work():
 4     global n
 5     lock.acquire()
 6     temp=n
 7     time.sleep(0.1)
 8     n=temp-1
 9     lock.release()
10 if __name__ == '__main__':
11     lock=Lock()
12     n=100
13     l=[]
14     for i in range(100):
15         p=Thread(target=work)
16         l.append(p)
17         p.start()
18     for p in l:
19         p.join()
20 
21     print(n) #結果肯定為0,由原來的併發執行變成序列,犧牲了執行效率保證了資料安全
同步鎖的引用
#不加鎖:併發執行,速度快,資料不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print('%s is running' %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''


#不加鎖:未加鎖部分併發執行,加鎖部分序列執行,速度慢,資料安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的程式碼併發執行
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    #加鎖的程式碼序列執行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''

#有的同學可能有疑問:既然加鎖會讓執行變成序列,那麼我在start之後立即使用join,就不用加鎖了啊,也是序列的效果啊
#沒錯:在start之後立刻使用jion,肯定會將100個任務的執行變成序列,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是
#start後立即join:任務內的所有程式碼都是序列執行的,而加鎖,只是加鎖的部分即修改共享資料的部分是序列的
#單從保證資料安全方面,二者都可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print('%s start to run' %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == '__main__':
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多麼的恐怖
'''
互斥鎖與join的區別

2.3死鎖

程序也有死鎖與遞迴鎖,在程序那裡忘記說了,放到這裡一切說了額

所謂死鎖: 是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序,如下就是死鎖

from threading import Lock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()
死鎖

解決方法,遞迴鎖,在Python中為了支援在同一執行緒中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:

from threading import RLock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()
遞迴鎖RLock

典型問題:科學家吃麵

import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麵條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麵'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麵條' % name)
    print('%s 吃麵' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','egon','yuan']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()
死鎖問題
import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
    noodle_lock.acquire()
    print('%s 搶到了麵條'%name)
    fork_lock.acquire()
    print('%s 搶到了叉子'%name)
    print('%s 吃麵'%name)
    fork_lock.release()
    noodle_lock.release()

def eat2(name):
    fork_lock.acquire()
    print('%s 搶到了叉子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 搶到了麵條' % name)
    print('%s 吃麵' % name)
    noodle_lock.release()
    fork_lock.release()

for name in ['哪吒','egon','yuan']:
    t1 = Thread(target=eat1,args=(name,))
    t2 = Thread(target=eat2,args=(name,))
    t1.start()
    t2.start()
遞迴鎖解決死鎖問題

 

3.訊號量

同進程的一樣

Semaphore管理一個內建的計數器,
每當呼叫acquire()時內建計數器-1;
呼叫release() 時內建計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞執行緒直到其他執行緒呼叫release()。

例項:(同時只有5個執行緒可以獲得semaphore,即可以限制最大連線數為5):

from threading import Thread,Semaphore
import threading
import time
# def func():
#     if sm.acquire():
#         print (threading.currentThread().getName() + ' get semaphore')
#         time.sleep(2)
#         sm.release()
def func():
    sm.acquire()
    print('%s get sm' %threading.current_thread().getName())
    time.sleep(3)
    sm.release()
if __name__ == '__main__':
    sm=Semaphore(5)
    for i in range(23):
        t=Thread(target=func)
        t.start()
例項
池與訊號量:與程序池是完全不同的概念,程序池Pool(4),最大隻能產生4個程序,而且從頭到尾都只是這四個程序,不會產生新的,而訊號量是產生一堆執行緒/程序

 

4.事件

同進程的一樣

執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。如果程式中的其 他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event物件。 物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。在 初始情況下,Event物件中的訊號標誌被設定為假。如果有執行緒等待一個Event物件, 而這個Event物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個Event物件的訊號標誌設定為真,它將喚醒所有等待這個Event物件的執行緒。如果一個執行緒等待一個已經被設定為真的Event物件,那麼它將忽略這個事件, 繼續執行

event.isSet():返回event的狀態值;
event.wait():如果 event.isSet()==False將阻塞執行緒;
event.set(): 設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程;
event.clear():恢復event的狀態值為False。

 例如,有多個工作執行緒嘗試連結MySQL,我們想要在連結前確保MySQL服務正常才讓那些工作執行緒去連線MySQL伺服器,如果連線不成功,都會去嘗試重新連線。那麼我們就可以採用threading.Event機制來協調各個工作執行緒的連線操作

import threading
import time,random
from threading import Thread,Event

def conn_mysql():
    count=1
    while not event.is_set():
        if count > 3:
            raise TimeoutError('連結超時')
        print('<%s>第%s次嘗試連結' % (threading.current_thread().getName(), count))
        event.wait(0.5)
        count+=1
    print('<%s>連結成功' %threading.current_thread().getName())


def check_mysql():
    print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
    time.sleep(random.randint(2,4))
    event.set()
if __name__ == '__main__':
    event=Event()
    conn1=Thread(target=conn_mysql)
    conn2=Thread(target=conn_mysql)
    check=Thread(target=check_mysql)

    conn1.start()
    conn2.start()
    check.start()
例項

 

5.執行緒佇列

queue佇列 :使用import queue,用法與程序Queue一樣

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

 class queue.Queue(maxsize=0) #先進先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''
先進先出

class queue.LifoQueue(maxsize=0) #last in fisrt out

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(後進先出):
third
second
first
'''
後進先出

class queue.PriorityQueue(maxsize=0) #儲存資料時可設定優先順序的佇列

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先順序(通常是數字,也可以是非數字之間的比較),數字越小優先順序越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先順序越高,優先順序高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''
優先順序佇列

 

6.Python標準模組--concurrent.futures

https://docs.python.org/dev/library/concurrent.futures.html

#1 介紹
concurrent.futures模組提供了高度封裝的非同步呼叫介面
ThreadPoolExecutor:執行緒池,提供非同步呼叫
ProcessPoolExecutor: 程序池,提供非同步呼叫
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
非同步提交任務

#map(func, *iterables, timeout=None, chunksize=1) 
取代for迴圈submit的操作

#shutdown(wait=True) 
相當於程序池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源後才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait引數為何值,整個程式都會等到所有任務執行完畢
submit和map必須在shutdown之前

#result(timeout=None)
取得結果

#add_done_callback(fn)
回撥函式
#介紹
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.


#用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())
ProcessPoolExecutor
#介紹
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

#用法
與ProcessPoolExecutor相同
ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit
map的用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<程序%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<程序%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future物件obj,需要用obj.result()拿到結果
回撥函式