1. 程式人生 > >GIL、定時器、線程queue、進程池和線程池

GIL、定時器、線程queue、進程池和線程池

nco ces war 同時 127.0.0.1 非阻塞 enc rgs turn

一、GIL
1、什麽是GIL(這是Cpython解釋器)
GIL本質就是一把互斥鎖,那既然是互斥鎖,原理都一樣,都是讓多個並發線程同一時間只能
有一個執行
即:有了GIL的存在,同一進程內的多個線程同一時刻只能有一個在運行,意味著在Cpython中
一個進程下的多個線程無法實現並行===》意味著無法利用多核優勢
但不影響並發的實現

GIL可以被比喻成執行權限,同一進程下的所以線程 要想執行都需要先搶執行權限

2、為何要有GIL
因為Cpython解釋器自帶垃圾回收機制不是線程安全的(對共享數據修改同時運行,不知誰改對了)

3、如何用

01、GIL vs 自定義互斥鎖
GIL相當於執行權限,會在任務無法執行的情況,被強行釋放
自定義互斥鎖即便是無法執行,也不會自動釋放

GIL能保護解釋器級別代碼(和垃圾回收機制有關)但保護不了其他共享數據(比如自己的代碼)。
所以在程序中對於需要保護的數據要自行加鎖

02、GIL的優缺點:
優點:保證Cpython解釋器內存管理的線程安全
缺點:在Cpython解釋器中,同一個進程下開啟的多線程,同一時刻只能有一個線程執行,
也就說Cpython解釋器的多線程無法實現並行無法利用多核優勢

註意:
a、GIL不能並行,但有可能並發,不一定為串行。因為串行是一個任務完完全全執行完畢後才進行下一個;
而cpython中,一個線程在io時,被CPU釋放時,會被強行取消GIL的使用權限
b、多核(多CPU)的優勢是提升運算效率
c、計算密集型--》使用多進程,以用上多核
d、IO密集型--》使用多線程

4、有兩種並發解決方案:
多進程:計算密集型
多線程:IO密集型



計算密集型:應該使用多進程
from multiprocessing import Process
from threading import Thread
import os,time
def work1():
res=0
for i in range(100000000):
res*=i

def work2():
res=0
for i in range(100000000):
res*=i

def work3():
res=0
for i in range(100000000):
res*=i

def work4():
res=0
for i in range(100000000):
res*=i

if __name__ == ‘__main__‘:
l=[]
# print(os.cpu_count()) #(計算cpu個數)本機為4核
start=time.time()
# p1=Process(target=work1) #
# p2=Process(target=work2)
# p3=Process(target=work3)
# p4=Process(target=work4)

p1=Thread(target=work1)
p2=Thread(target=work2)
p3=Thread(target=work3)
p4=Thread(target=work4)

p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
stop=time.time()
print(‘run time is %s‘ %(stop-start))


IO密集型:應該使用多線程
from multiprocessing import Process
from threading import Thread
import os,time
def work1():
time.sleep(5)

def work2():
time.sleep(5)

def work3():
time.sleep(5)

def work4():
time.sleep(5)



if __name__ == ‘__main__‘:
l=[]
# print(os.cpu_count()) #本機為4核
start=time.time()
# p1=Process(target=work1) #
# p2=Process(target=work2)
# p3=Process(target=work3)
# p4=Process(target=work4)

p1=Thread(target=work1) #
p2=Thread(target=work2)
p3=Thread(target=work3)
p4=Thread(target=work4)

p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
stop=time.time()
print(‘run time is %s‘ %(stop-start))


二、定時器
定時器,指定n秒後執行某操作
from threading import Timer,current_thread


def task(x):
print(‘%s run....‘ %x)
print(current_thread().name)


if __name__ == ‘__main__‘:
t=Timer(3,task,args=(10,))
t.start()
print(‘主‘)

三、線程queue

import queue
隊列:先進先出
q=queue.Queue(3)
q.put(1)
q.put(2)
q.put(3)

print(q.get())
print(q.get())
print(q.get())

堆棧:先進後出
q=queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())

優先級隊列:優先級高先出來,數字越小,優先級越高
q=queue.PriorityQueue()
q.put((3,‘data1‘))
q.put((-10,‘data2‘))
q.put((11,‘data3‘))

print(q.get())
print(q.get())
print(q.get())



三、基於多線程實現並發的套接字通信
服務端:
from socket import *
from threading import Thread

def talk(conn):
while True:
try:
data=conn.recv(1024)
if len(data) == 0:break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()

def server(ip,port,backlog=5):
server = socket(AF_INET, SOCK_STREAM)
server.bind((ip, port))
server.listen(backlog)

print(‘starting...‘)
while True:
conn, addr = server.accept()

t = Thread(target=talk, args=(conn,))
t.start()

if __name__ == ‘__main__‘:
server(‘127.0.0.1‘,8080)


客戶端:
from socket import *
import os

client=socket(AF_INET,SOCK_STREAM)
client.connect((‘127.0.0.1‘,8080))

while True:
msg=‘%s say hello‘ %os.getpid()
client.send(msg.encode(‘utf-8‘))
data=client.recv(1024)
print(data.decode(‘utf-8‘))





四、進程池與線程池

1、什麽時候用池:
池的功能是限制啟動的進程數或線程數。

什麽時候應該限制???
當並發的任務數遠遠超過了計算機的承受能力時,即無法一次性開啟過多的進程數或線程數時
就應該用池的概念將開啟的進程數或線程數限制在計算機可承受的範圍內

2、提交任務的兩種方式:
同步vs異步
同步、異步指的是提交任務的兩種方式

同步:提交完任務後就在原地等待,直到任務運行完畢後拿到任務的返回值,再繼續運行下一行代碼
異步:提交完任務(綁定一個回調函數)後根本就不在原地等待,直接運行下一行代碼,等到任務有返回值後會自動觸發回調函數

程序的運行狀態(阻塞,非阻塞)
1、阻塞:
IO阻塞
2、非阻塞:
運行
就緒

2.1 基本方法
submit(fn, *args, **kwargs)
異步提交任務

#map(func, *iterables, timeout=None, chunksize=1)
取代for循環submit的操作

#shutdown(wait=True)
相當於進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源後才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
submit和map必須在shutdown之前

#result(timeout=None)
取得結果

#add_done_callback(fn)
回調函數



進程池:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import os,time,random

def task(n):
print(‘%s is running‘%os.getpid())
time.sleep(5)
return n**2

def parse(future):
time.sleep(1)
res=future.result()
print(‘%s 處理了 %s‘%(os.getpid(),res))

if __name__ == ‘__main__‘:
pool=ProcessPoolExecutor(4)
start=time.time()
for i in range(1,5):
future=pool.submit(task,i) #異步提交任務
future.add_done_callback(parse) # parse會在future有返回值時立刻觸發,並且將future當作參數傳給parse
pool.shutdown(wait=True)
stop=time.time()
print(‘主‘,os.getpid(),(stop-start))

‘‘‘
4340 is running
6572 is running
6652 is running
392 is running
5148 處理了 1
5148 處理了 4
5148 處理了 9
5148 處理了 16
主 5148 9.330533742904663
‘‘‘
最後由主進程一個一個處理結果



線程池:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import time,os,random

def task(n):
print(‘%s is running‘%current_thread().name)
time.sleep(5)
return n**2

def parse(future):
time.sleep(1)
res=future.result()
print(‘%s 處理了 %s‘%(current_thread().name,res))

if __name__ == ‘__main__‘:
pool=ThreadPoolExecutor(4)
start=time.time()
for i in range(1,5):
future=pool.submit(task,i)
future.add_done_callback(parse)
pool.shutdown(wait=True)
stop=time.time()
print(‘主‘,current_thread().name,(stop-start))

‘‘‘
ThreadPoolExecutor-0_0 is running
ThreadPoolExecutor-0_1 is running
ThreadPoolExecutor-0_2 is running
ThreadPoolExecutor-0_3 is running
ThreadPoolExecutor-0_2 處理了 9
ThreadPoolExecutor-0_1 處理了 4
ThreadPoolExecutor-0_3 處理了 16
ThreadPoolExecutor-0_0 處理了 1
主 MainThread 6.002343416213989

‘‘‘
最後誰(線程)空閑下來誰處理結果

GIL、定時器、線程queue、進程池和線程池