1. 程式人生 > >並發編程(五)

並發編程(五)

即使 reset 管理 client sta ide 重復 glob 便是

GIL介紹

‘‘‘
定義:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple 
native threads from executing Python bytecodes at once. This lock is necessary mainly 
because CPython’s memory management is not thread-safe. (However, since the GIL 
exists, other features have grown to depend on the guarantees that it enforces.)
‘‘‘ 結論:在Cpython解釋器中,同一個進程下開啟的多線程,同一時刻只能有一個線程執行,無法利用多核優勢

首先需要明確的一點是GIL並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念裏CPython就是Python,也就想當然的把GIL

歸結為Python語言的缺陷。所以這裏要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL

GIL本質就是一把互斥鎖,既然是互斥鎖,所有互斥鎖的本質都一樣,都是將並發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。

可以肯定的一點是:保護不同的數據的安全,就應該加不同的鎖。

要想了解GIL,首先確定一點:每次執行python程序,都會產生一個獨立的進程。例如python test.py,python aaa.py,python bbb.py會產生3個不同的python進程

技術分享圖片
‘‘‘
#驗證python test.py只會產生一個進程
#test.py內容
import os,time
print(os.getpid())
time.sleep(1000)
‘‘‘ python3 test.py #在windows下 tasklist |findstr python #在linux下 ps aux |grep python
View Code

在一個python的進程內,不僅有test.py的主線程或者由該主線程開啟的其他線程,還有解釋器開啟的垃圾回收等解釋器級別的線程,總之,所有線程都運行在這一個進程內,毫無疑問

#1 所有數據都是共享的,這其中,代碼作為一種數據也是被所有線程共享的(test.py的所有代碼以及Cpython解釋器的所有代碼)
例如:test.py定義一個函數work(代碼內容如下圖),在進程內所有線程都能訪問到work的代碼,於是我們可以開啟三個線程然後target都指向該代碼,能訪問到意味著就是可以執行。

#2 所有線程的任務,都需要將任務的代碼當做參數傳給解釋器的代碼去執行,即所有的線程要想運行自己的任務,首先需要解決的是能夠訪問到解釋器的代碼。

綜上:

如果多個線程的target=work,那麽執行流程是

多個線程先訪問到解釋器的代碼,即拿到執行權限,然後將target的代碼交給解釋器的代碼去執行

解釋器的代碼是所有線程共享的,所以垃圾回收線程也可能訪問到解釋器的代碼而去執行,這就導致了一個問題:對於同一個數據100,可能線程1執行x=100的同時,而垃圾回收執行的是回收100的操作,解決這種問題沒有什麽高明的方法,就是加鎖處理,如下圖的GIL,保證python解釋器同一時間只能執行一個任務的代碼

GIL與多線程

GIL保護的是解釋器級的數據,保護用戶自己的數據則需要自己加鎖處理

有了GIL的存在,同一時刻同一進程中只有一個線程被執行

#1. cpu到底是用來做計算的,還是用來做I/O的?

#2. 多cpu,意味著可以有多個核並行完成計算,所以多核提升的是計算性能

#3. 每個cpu一旦遇到I/O阻塞,仍然需要等待,所以多核對I/O操作沒什麽用處 

一個工人相當於cpu,此時計算相當於工人在幹活,I/O阻塞相當於為工人幹活提供所需原材料的過程,工人幹活的過程中如果沒有原材料了,則工人幹活的過程需要停止,直到等待原材料的到來。

如果你的工廠幹的大多數任務都要有準備原材料的過程(I/O密集型),那麽你有再多的工人,意義也不大,還不如一個人,在等材料的過程中讓工人去幹別的活,

反過來講,如果你的工廠原材料都齊全,那當然是工人越多,效率越高

結論:

  對計算來說,cpu越多越好,但是對於I/O來說,再多的cpu也沒用

#分析:
我們有四個任務需要處理,處理方式肯定是要玩出並發的效果,解決方案可以是:
方案一:開啟四個進程
方案二:一個進程下,開啟四個線程

#單核情況下,分析結果: 
  如果四個任務是計算密集型,沒有多核來並行計算,方案一徒增了創建進程的開銷,方案二勝
  如果四個任務是I/O密集型,方案一創建進程的開銷大,且進程的切換速度遠不如線程,方案二勝

#多核情況下,分析結果:
  如果四個任務是計算密集型,多核意味著並行計算,在python中一個進程中同一時刻只有一個線程執行用不上多核,方案一勝
  如果四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝

 
#結論:現在的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提升,甚至不如串行(沒有大量切換),但是,對於IO密集型的任務效率還是有顯著提升的。

多線程性能測試

技術分享圖片
from multiprocessing import Process
from threading import Thread
import os,time
def work():
    res=0
    for i in range(100000000):
        res*=i


if __name__ == __main__:
    l=[]
    print(os.cpu_count()) #本機為4核
    start=time.time()
    for i in range(4):
        p=Process(target=work) #耗時5s多
        p=Thread(target=work) #耗時18s多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print(run time is %s %(stop-start))
計算密集型(多進程效率高) 技術分享圖片
from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
    time.sleep(2)
    print(===>)

if __name__ == __main__:
    l=[]
    print(os.cpu_count()) #本機為4核
    start=time.time()
    for i in range(400):
        # p=Process(target=work) #耗時12s多,大部分時間耗費在創建進程上
        p=Thread(target=work) #耗時2s多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print(run time is %s %(stop-start))
IO密集型(多線程效率高)

GIL與互斥鎖對比

#1.線程搶的是GIL鎖,GIL鎖相當於執行權限,拿到執行權限後才能拿到互斥鎖Lock,其他線程也可以搶到GIL,但如果發現Lock仍然沒有被釋放則阻塞,即便是拿到執行權限GIL也要立刻交出來

#2.join是等待所有,即整體串行,而鎖只是鎖住修改共享數據的部分,即部分串行,要想保證數據安全的根本原理在於讓並發變成串行,join與互斥鎖都可以實現,毫無疑問,互斥鎖的部分串行效率要更高

GIL本質就是一把互斥鎖,而且是夾在python解釋器上的,同一個進程內的所有線程都要先搶到GIL鎖,才可以執行解釋器代碼

使用GIL的優點:

保證cpython解釋器內存管理的線程安全

缺點:

同一進程內所有的線程同一時刻只能執行一個cpython解釋器,無法並行

鎖的目的是為了保護共享的數據,同一時間只能有一個線程來修改共享的數據,保護不同的數據就應該加不同的鎖

GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),後者是保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

所有線程搶的是GIL鎖,或者說所有線程搶的是執行權限

  線程1搶到GIL鎖,拿到執行權限,開始執行,然後加了一把Lock,還沒有執行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執行,執行過程中發現Lock還沒有被線程1釋放,於是線程2進入阻塞,被奪走執行權限,有可能線程1拿到GIL,然後正常執行到釋放Lock。。。這就導致了串行運行的效果

  既然是串行,那我們執行

  t1.start()

  t1.join

  t2.start()

  t2.join()

  這也是串行執行啊,為何還要加Lock呢,需知join是等待t1所有的代碼執行完,相當於鎖住了t1的所有代碼,而Lock只是鎖住一部分操作共享數據的代碼。

技術分享圖片
from threading import Thread,Lock
import time
mutex=Lock()
count=0
def task():
    global count
    temp=count
    time.sleep(0.1)
    count=temp+1
if __name__ == __main__:
    t_l=[]
    for i in range(100):
        t=Thread(target=task)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
    print(,count)  #count最後打印結果為99

# 因為全局變量count=0,for循環開100個線程,第一個線程搶到GIL,在執行時其他所有的程序都得等待,但到了time.sleep(0.1)那裏睡0.1秒,這0.1秒足夠for循環完所有的線程,這個時候所有的線程手中拿到都是count=0,所有最後的的執行結果為99
View Code

鎖通常被用來實現對共享資源的同步訪問。為每一個共享資源創建一個Lock對象,當你需要訪問該資源時,調用acquire方法來獲取鎖對象(如果其它線程已經獲得了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖:

技術分享圖片
from threading import Thread,Lock
import time
mutex=Lock()
count=0
def task():
    global count
    mutex.acquire()
    temp=count
    time.sleep(0.1)
    count=temp+1
    mutex.release()
if __name__ == __main__:
    t_l=[]
    for i in range(100):
        t=Thread(target=task)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
    print(,count)    #count最後執行結果為0
加上互斥鎖
互斥鎖和GIL綜合分析
分析:
  #1.100個線程去搶GIL鎖,即搶執行權限
     #2. 肯定有一個線程先搶到GIL(暫且稱為線程1),然後開始執行,一旦執行就會拿到lock.acquire()
     #3. 極有可能線程1還未運行完畢,就有另外一個線程2搶到GIL,然後開始運行,但線程2發現互斥鎖lock還未被線程1釋放,於是阻塞,被迫交出執行權限,即釋放GIL
    #4.直到線程1重新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,然後其他的線程再重復2 3 4的過程
技術分享圖片
#不加鎖:並發執行,速度快,數據不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print(%s is running %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == __main__:
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print(主:%s n:%s %(stop_time-start_time,n))

‘‘‘
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
‘‘‘


#不加鎖:未加鎖部分並發執行,加鎖部分串行執行,速度慢,數據安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的代碼並發運行
    time.sleep(3)
    print(%s start to run %current_thread().getName())
    global n
    #加鎖的代碼串行運行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == __main__:
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print(主:%s n:%s %(stop_time-start_time,n))

‘‘‘
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
‘‘‘

#有的同學可能有疑問:既然加鎖會讓運行變成串行,那麽我在start之後立即使用join,就不用加鎖了啊,也是串行的效果啊
#沒錯:在start之後立刻使用jion,肯定會將100個任務的執行變成串行,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是
#start後立即join:任務內的所有代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的
#單從保證數據安全方面,二者都可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print(%s start to run %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == __main__:
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print(主:%s n:%s %(stop_time-start_time,n))

‘‘‘
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多麽的恐怖
互斥鎖和join的區別

進程池和線程池

“ 池 ” 是用來限制並發的任務數目,限制我們的計算機在一個自己可承受的範圍內去並發地執行任務

什麽時候裝進程:並發的任務屬於計算密集型
什麽時候裝線程:並發的任務屬於IO密集型

# from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
# import time,os,random
#
# def task(x):
#     print(‘%s 接客‘ %os.getpid())
#     time.sleep(random.randint(2,5))
#     return x**2
#
# if __name__ == ‘__main__‘:
#     p=ProcessPoolExecutor() # 默認開啟的進程數是cpu的核數
#
#
#     for i in range(20):
#         p.submit(task,i)


from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time,os,random

def task(x):
    print(%s 接客 %x)
    time.sleep(random.randint(2,5))
    return x**2

if __name__ == __main__:
    p=ThreadPoolExecutor(4) # 如果括號裏面不寫值,默認開啟的線程數是cpu的核數*5


    for i in range(20):
        p.submit(task,i)

基於多線程實現並發的套接字通信

技術分享圖片
from socket import *
from threading import Thread
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

tpool=ThreadPoolExecutor(3)

def communicate(conn,client_addr):
    while True:  # 通訊循環
        try:
            data = conn.recv(1024)
            if not data: break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()


def server():
    server=socket(AF_INET,SOCK_STREAM)
    server.bind((127.0.0.1,8080))
    server.listen(5)

    while True: # 鏈接循環
        conn,client_addr=server.accept()
        print(client_addr)
        # t=Thread(target=communicate,args=(conn,client_addr))
        # t.start()
        tpool.submit(communicate,conn,client_addr)

    server.close()

if __name__ == __main__:
    server()
服務端

技術分享圖片
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect((127.0.0.1,8080))

while True:
    msg=input(>>>: ).strip()
    if not msg:continue
    client.send(msg.encode(utf-8))
    data=client.recv(1024)
    print(data.decode(utf-8))

client.close()
客戶端

並發編程(五)