1. 程式人生 > >python全棧開發基礎【第二十四篇】(利用threading模塊開線程、join與守護線程、GIL與Lock)

python全棧開發基礎【第二十四篇】(利用threading模塊開線程、join與守護線程、GIL與Lock)

roc print 例子 線程 -- elf 定期 listen cti

一多線程的概念介紹

threading模塊介紹

threading模塊和multiprocessing模塊在使用層面,有很大的相似性。

二、開啟多線程的兩種方式

創建線程的開銷比創建進程的開銷小,因而創建線程的速度快。
#開啟進程的第一種方式
from multiprocessing import Process
from threading import Thread
import os
import time
def work():
    print(‘<%s> is running‘%os.getpid())
    time.sleep(2)
    print(‘<%s> is done‘%os.getpid())

if __name__ == ‘__main__‘:
    t=Thread(target=work,)
    # t= Process(target=work,)
    t.start()
    print(‘主‘,os.getpid())
#開啟線程的第二種方式(用類)
from threading import Thread
import time
class Work(Thread):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        # time.sleep(2)
        print(‘%s say hell‘%self.name)
if __name__ == ‘__main__‘:
    t = Work(‘egon‘)
    t.start()
    print(‘主‘)

在一個進程下開啟多個線程與在一個進程下開啟多個子進程的區別

#線程的開啟速度大於進程的開啟速度
from  multiprocessing import Process
from threading import Thread
import time
def work():
    time.sleep(2)
    print(‘hello‘)
if __name__ == ‘__main__‘:
    t = Thread(target=work)#如果等上幾秒,他會在開啟的過程中先打印主,如果不等會先打印hello
    # t = Process(target=work) #子進程會先打印主,
    t.start()
    print(‘主‘)
    
#在同一個進程下開多個進程和開多個線程的pid的不同
# 2.----------
from  multiprocessing import Process
from threading import Thread
import os
def work():
    print(‘hello‘,os.getpid())
if __name__ == ‘__main__‘:
    #在主進程下開啟多個線程,每個線程都跟主進程的pid一樣
    t1= Thread(target=work)
    t2 = Thread(target=work)
    t1.start()
    t2.start()
    print(‘主線程pid‘,os.getpid())

    #來多個進程,每個進程都有不同的pid
    p1 = Process(target=work)
    p2 = Process(target=work)
    p1.start()
    p2.start()
    print(‘主進程pid‘, os.getpid())
#同一進程內的線程共享該進程的數據
from  threading import Thread
from multiprocessing import  Process
import os
def work():
    global n
    n-=1
    print(n)  #所以被改成99了
n = 100
if __name__ == ‘__main__‘:
    # p = Process(target=work)
    p = Thread(target=work)  #當開啟的是線程的時候,因為同一進程內的線程之間共享進程內的數據
                            #所以打印的n為99
    p.start()
    p.join()
    print(‘主‘,n) #毫無疑問子進程p已經將自己的全局的n改成了0,
    # 但改的僅僅是它自己的,查看父進程的n仍然為100

進程之間是互相隔離的,不共享。需要借助第三方來完成共享(借助隊列,管道,共享數據)

三、練習

練習一:多線程實現並發

#服務端
from socket import *
from threading import Thread
s = socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #端口重用
s.bind((‘127.0.0.1‘,8081))
s.listen(5)
print(‘start running...‘)
def talk(coon,addr):
    while True:  # 通信循環
        try:
            cmd = coon.recv(1024)
            print(cmd.decode(‘utf-8‘))
            if not cmd: break
            coon.send(cmd.upper())
            print(‘發送的是%s‘%cmd.upper().decode(‘utf-8‘))
        except Exception:
            break
    coon.close()
if __name__ == ‘__main__‘:
    while True:#鏈接循環
        coon,addr = s.accept()
        print(coon,addr)
        p =Thread(target=talk,args=(coon,addr))
        p.start()
    s.close()
#客戶端
from socket import *
c = socket(AF_INET,SOCK_STREAM)
c.connect((‘127.0.0.1‘,8081))
while True:
    cmd = input(‘>>:‘).strip()
    if not cmd:continue
    c.send(cmd.encode(‘utf-8‘))
    data = c.recv(1024)
    print(‘接受的是%s‘%data.decode(‘utf-8‘))
c.close()

練習二:三個任務,一個接收用戶輸入,一個將用戶輸入的內容格式化成大寫,一個將格式化後的結果存入文件

from threading import Thread
import os
input_l = []
format_l = []
def talk(): #監聽輸入任務
    while True:
        cmd = input(‘>>:‘).strip()
        if not cmd:continue
        input_l.append(cmd)

def format():
     while True:
         if input_l:
             res = input_l.pop()#取出來
             format_l.append(res.upper()) #取出來後變大寫
def save():
    while True:
        if format_l:  #如果format_l不為空
            with open(‘db‘,‘a‘) as f:
                f.write(format_l.pop()+‘\n‘) #寫進文件
                f.flush()
if __name__ == ‘__main__‘:
    t1=Thread(target=talk)
    t2=Thread(target=format)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()

四、多線程共享同一個進程內的地址空間

from threading import Thread
from multiprocessing import Process
import os
n = 100
def talk():
    global n
    n-=100
    print(n)
if __name__ == ‘__main__‘:
    t = Thread(target=talk) #如果開啟的是線程的話,n=0
    # t = Process(target=talk) #如果開啟的是進程的話,n=100
    t.start()
    t.join()
    print(‘主‘,n)

  

五、線程對象的其他屬性和方法

Thread實例對象的方法
  # isAlive(): 返回線程是否活動的。
  # getName(): 返回線程名。
  # setName(): 設置線程名。

threading模塊提供的一些方法:
  # threading.currentThread(): 返回當前的線程變量。
  # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動後、結束前,不包括啟動前和終止後的線程。
  # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
#線程的其他屬性和方法
from  threading import Thread
from multiprocessing import Process
import time,os,threading
def work():
    time.sleep(2)
    print(‘%s is running‘ % threading.currentThread().getName())
    print(threading.current_thread()) #其他線程
    print(threading.currentThread().getName()) #得到其他線程的名字
if __name__ == ‘__main__‘:
    t = Thread(target=work)
    t.start()

    print(threading.current_thread().getName())  #主線程的名字
    print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內有兩個運行的線程
    time.sleep(2)
    print(t.is_alive()) #判斷線程是否存活
    print(threading.activeCount())
    print(‘主‘)

六、join與守護線程

主進程等所有的非守護的子進程結束他才結束(回收它子進程的資源):(有父子關系)
主線程等非守護線程全都結束它才結束: (沒父子關系)

# join
from  threading import Thread
import time,os
def talk():
    time.sleep(3)
    print(‘%s is running..‘%os.getpid())
if __name__ == ‘__main__‘:
    t = Thread(target=talk)
    t.start()
    t.join()  #主進程在等子進程結束
    print(‘主‘)

守護線程與守護進程的區別

1.守護進程:主進程會等到所有的非守護進程結束,才銷毀守護進程。也就是說(主進程運行完了被守護的那個就幹掉了)

2.守護線程:主線程運行完了守護的那個還沒有幹掉,主線程等非守護線程全都結束它才結束

#守護進程和守護線程
from  multiprocessing import Process
from threading import Thread,currentThread
import time,os
def talk1():
    time.sleep(2)
    print(‘hello‘)
def talk2():
    time.sleep(2)
    print(‘you see see‘)
if __name__ == ‘__main__‘:
    t1 = Thread(target=talk1)
    t2 = Thread(target=talk2)
    # t1 = Process(target=talk1)
    # t2 = Process(target=talk2)
    t1.daemon = True
    t1.start()
    t2.start()
    print(‘主線程‘,os.getpid())
# ----迷惑人的例子----
from threading import Thread
import time
def foo():
    print(123)
    # time.sleep(10) #如果這個等的時間大於下面等的時間,就把不打印end123了
    time.sleep(2)  #如果這個等的時間小於下面等的時間,就把end123也打印了
    print(‘end123‘)
def bar():
    print(456)
    # time.sleep(5)
    time.sleep(10)
    print(‘end456‘)
if __name__ == ‘__main__‘:
    t1 = Thread(target=foo)
    t2 = Thread(target=bar)
    t1.daemon = True #主線程運行完了守護的那個還沒有幹掉,
    # 主線程等非守護線程全都結束它才結束
    t1.start()
    t2.start()
    print(‘main---------‘)

七、GIL與Lock

1.python GIL(Global Interpreter Lock) #全局的解釋器鎖

2.鎖的目的:犧牲了效率,保證了數據的安全
3.保護不同的數據加不同的鎖()
4.python自帶垃圾回收

5.誰拿到GIL鎖就讓誰得到Cpython解釋器的執行權限

6.GIT鎖保護的是Cpython解釋器數據的安全,而不會保護你自己程序的數據的安全
7.GIL鎖當遇到阻塞的時候,就被迫的吧鎖給釋放了,那麽其他的就開始搶鎖了,搶到
後吧值修改了,但是第一個拿到的還在原本拿到的那個數據的那停留著呢,當再次拿
到鎖的時候,數據已經修改了,而你還拿的原來的,這樣就混亂了,所以也就保證不了
數據的安全了。
8.那麽怎麽解決數據的安全ne ?
自己再給加吧鎖:mutex=Lock()

八、同步鎖

GIL 與Lock是兩把鎖,保護的數據不一樣,前者是解釋器級別的(當然保護的就是解釋器級別的數據,比如垃圾回收的數據),後者是保護用戶自己開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

過程分析:所有線程搶的是GIL鎖,或者說所有線程搶的是執行權限

  線程1搶到GIL鎖,拿到執行權限,開始執行,然後加了一把Lock,還沒有執行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執行,執行過程中發現Lock還沒有被線程1釋放,於是線程2進入阻塞,被奪走執行權限,有可能線程1拿到GIL,然後正常執行到釋放Lock。。。這就導致了串行運行的效果

  既然是串行,那我們執行

  t1.start()

  t1.join

  t2.start()

  t2.join()

  這也是串行執行啊,為何還要加Lock呢,需知join是等待t1所有的代碼執行完,相當於鎖住了t1的所有代碼,而Lock只是鎖住一部分操作共享數據的代碼。

因為Python解釋器幫你自動定期進行內存回收,你可以理解為python解釋器裏有一個獨立的線程,每過一段時間它起wake up做一次全局輪詢看看哪些內存數據是可以被清空的,此時你自己的程序 裏的線程和 py解釋器自己的線程是並發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程中的clearing時刻,可能一個其它線程正好又重新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,為了解決類似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這可以說是Python早期版本的遺留問題。

# 全局解釋鎖
from threading import Thread,Lock
import time
n=100
def work():
    mutex.acquire()
    global n
    temp=n
    time.sleep(0.01)
    n=temp-1
    mutex.release()
if __name__ == ‘__main__‘:
    mutex=Lock()
    t_l=[]
    s=time.time()
    for i in range(100):
        t=Thread(target=work)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
    print(‘%s:%s‘ %(time.time()-s,n))

鎖通常被用來實現對共享資源的同步訪問。為每一個共享資源創建一個Lock對象,當你需要訪問該資源時,調用acquire方法來獲取鎖對象(如果其它線程已經獲得了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖:  

# 鎖的格式
import threading
mutex = threading.Lock()
mutex.aquire()
‘‘‘
對公共數據的操作
‘‘‘
mutex.release()
分析:
  1.100個線程去搶GIL鎖,即搶執行權限   2. 肯定有一個線程先搶到GIL(暫且稱為線程1),然後開始執行,一旦執行就會拿到lock.acquire()   3. 極有可能線程1還未運行完畢,就有另外一個線程2搶到GIL,然後開始運行,但線程2發現互斥鎖lock還未被線程1釋放,於是阻塞,被迫交出執行權限,即釋放GIL   4.直到線程1重新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,然後其他的線程再重復2 3 4的過程

如果不加鎖:並發執行,速度快,數據不安全。

加鎖:串行執行,速度慢,數據安全。

# 互斥鎖與join的區別(重點!!!)
#不加鎖:並發執行,速度快,數據不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    global n
    print(‘%s is running‘ %current_thread().getName())
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == ‘__main__‘:
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

    stop_time=time.time()
    print(‘主:%s n:%s‘ %(stop_time-start_time,n))

‘‘‘
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
‘‘‘


#不加鎖:未加鎖部分並發執行,加鎖部分串行執行,速度慢,數據安全
from threading import current_thread,Thread,Lock
import os,time
def task():
    #未加鎖的代碼並發運行
    time.sleep(3)
    print(‘%s start to run‘ %current_thread().getName())
    global n
    #加鎖的代碼串行運行
    lock.acquire()
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()

if __name__ == ‘__main__‘:
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print(‘主:%s n:%s‘ %(stop_time-start_time,n))

‘‘‘
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
‘‘‘

#有的同學可能有疑問:既然加鎖會讓運行變成串行,那麽我在start之後立即使用join,就不用加鎖了啊,也是串行的效果啊
#沒錯:在start之後立刻使用jion,肯定會將100個任務的執行變成串行,毫無疑問,最終n的結果也肯定是0,是安全的,但問題是
#start後立即join:任務內的所有代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的
#單從保證數據安全方面,二者都可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)
    print(‘%s start to run‘ %current_thread().getName())
    global n
    temp=n
    time.sleep(0.5)
    n=temp-1


if __name__ == ‘__main__‘:
    n=100
    lock=Lock()
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        t.start()
        t.join()
    stop_time=time.time()
    print(‘主:%s n:%s‘ %(stop_time-start_time,n))

‘‘‘
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多麽的恐怖
‘‘‘

  

python全棧開發基礎【第二十四篇】(利用threading模塊開線程、join與守護線程、GIL與Lock)