python中多執行緒,多程序,多協程概念及程式設計上的應用
1, 多執行緒
- 執行緒是程序的一個實體,是CPU進行排程的最小單位,他是比程序更小能獨立執行的基本單位。
- 執行緒基本不擁有系統資源,只佔用一點執行中的資源(如程式計數器,一組暫存器和棧),但是它可以與同屬於一個程序的其他執行緒共享全部的資源。
- 提高程式的執行速率,上下文切換快,開銷比較少,但是不夠穩定,容易丟失資料,形成死鎖。
直接上程式碼:
import time import threading # 函式1用時2秒 def fun1(): time.sleep(2) print(threading.current_thread().name, time.ctime())# 函式2用時4秒 def fun2(): time.sleep(4) print(threading.current_thread().name, time.ctime()) # 函式3用時6秒 def fun3(): time.sleep(6) print('hello python', time.ctime()) th1 = threading.Thread(target=fun1) th2 = threading.Thread(target=fun2) th3 = threading.Thread(target=fun3) th1.start() th2.start() th3.start()
列印結果:
Thread-1 Mon Jan 7 11:01:52 2019 Thread-2 Mon Jan 7 11:01:54 2019 hello python Mon Jan 7 11:01:56 2019
解析:從結果看出,他們同一時間 11:01:50開始執行,分別用了不同的時間結束
接著往下看,新增jion阻塞執行緒
''''''
th1.start()
th1.join()
th2.start()
th2.join()
th3.start()
th3.join()
列印結果:
Thread-1 Mon Jan 7 11:19:00 2019
Thread-2 Mon Jan 7 11:19:04 2019
hello python Mon Jan 7 11:19:10 2019
我們看到這三執行緒按順序依次執行。
我們接著看看執行緒的方法使用:
threading.enumerate() #列舉執行緒,返回列表,其中裡面會有一條主執行緒 threading.activeCount() #檢視執行緒執行個數 threading.current_thread().name #檢視當前執行執行緒名稱 join() #阻塞執行緒執行
我們接著看第二種開執行緒的方式:
import threading import time class MyThread(threading.Thread): def run(self): for i in range(3): time.sleep(1) msg = "I'm "+self.name+' @ '+str(i) #name屬性中儲存的是當前執行緒的名字 print(msg) if __name__ == '__main__': t = MyThread() t.setName('yangzhenyu') a = t.isAlive() print(a) print(t.getName()) t.start() b = t.isAlive() print(b)
列印結果:
False yanzghenyu True I'm yanzghenyu @ 0 I'm yanzghenyu @ 1 I'm yanzghenyu @ 2
方法總結:
t.setName() #設定執行執行緒名稱,不指定預設Thread-1 t.getName() #獲取執行緒名稱 t.isAlive() #判斷執行緒是否執行,返回布林型別
執行緒間共享全域性變數:
import threading import time n = 100 def work01(): global n for i in range(3): n += 1 print(n) //103 def work02(): global n print(n) //103 print(n) //100 t1 = threading.Thread(target=work01) t1.start()
time.sleep(1)
t2 = threading.Thread(target=work02) t2.start()
關於執行緒鎖
- 用threading.Lock()建立鎖,用acquire()申請鎖,每次只有一個執行緒獲得鎖,其他執行緒必須等此執行緒release()後才能獲得鎖
- RLock允許在同一執行緒中被多次acquire。而Lock卻不允許這種情況。
- 注意:如果使用RLock,那麼acquire和release必須成對出現,即同一執行緒中呼叫了n次acquire,必須呼叫n次的release才能真正釋放所佔用的瑣
下面例子中我們用到的是Lock(),當加完鎖之後,該方法同一時間內只能被一個執行緒呼叫。
import threading mylock=threading.Lock()#建立鎖 num = 0 def add_num(name): global num while True: mylock.acquire()#申請鎖也就是加鎖 print('thread %s locked! num=%d'%(name,num)) if num>=5: print('thread %s release! num=%d'%(name,num)) mylock.release()#釋放鎖 return num += 1 print('thread %s release! num = %d'%(name,num)) mylock.release() t1 = threading.Thread(target=add_num,args=('A',)) t2 = threading.Thread(target=add_num,args=('B',)) t1.start() t2.start()
列印結果:
thread A locked! num=0 thread A release! num = 1 thread A locked! num=1 thread A release! num = 2 thread A locked! num=2 thread A release! num = 3 thread A locked! num=3 thread A release! num = 4 thread A locked! num=4 thread A release! num = 5 thread A locked! num=5 thread A release! num=5 thread B locked! num=5 thread B release! num=5
關於程序:
- 程序是系統進行資源分配的最小單位,每個程序都有自己的獨立記憶體空間,不用程序通過程序間通訊來通訊。
- 但是程序佔據獨立空間,比較重量級,所以上下文程序間的切換開銷比較大,但是比較穩定安全。
程序建立:
第一種建立程序的方式:
from multiprocessing import Process import time import random import os def piao(name): print("%s is piaoping"%name) time.sleep(random.randint(0,1)) print("%s is piao end"%name) if __name__ == '__main__': print("CPU的個數是:%d"%os.cpu_count()) p1 = Process(target=piao,args=("alex",),name="程序1") print(p1.name) p1.start() print("父程序!") #執行速度要遠快於建立新程序的時間
列印結果:
CPU的個數是:2 程序1 父程序! alex is piaoping alex is piao end
第二種建立程序的方式:
from multiprocessing import Process import time import random #繼承Process類,並實現自己的run方法 class Piao(Process): def __init__(self,name): #必須呼叫父類的init方法 super().__init__() self.name = name def run(self): print("%s is piaoing"%self.name) time.sleep(random.randint(1,3)) print("%s is piaoeng"%self.name) if __name__ == '__main__': p1 = Piao("Alex") #開闢一個新的程序實際上就是執行本程序所對應的run()方法 p1.start() print("主程序!")
結果:
主程序! Alex is piaoing Alex is piaoeng
解析:join括號中不攜帶引數,表示父程序在這個位置要等待p1程序執行完成後,如果指定引數,也就是等待時間s,那麼主程序將在這個時間內結束,
用is_active() 方法即可檢測程序的狀態,不加join() 返回True,表示程序還在進行。
程序的方法,
start() 啟動程序例項(建立子程序); terminate():不管任務是否完成,立即終止; name: 當前程序例項別名,預設為Process-N,N為從1開始遞增的整數; pid: 當前程序例項的PID值; os.getpid() is_alive(): 判斷程序例項是否還在執行; join([timeout]):是否等待程序例項執行結束,或等待多少秒;
程序池:
在程式實際處理問題時,忙時會有成千上萬個任務需要執行,閒時有零星任務,建立時需要消耗時間,銷燬也需要時間,
即使開啟成千上萬個程序,作業系統也不能 讓他同時執行。這裡就用到了程序池,用於管理小塊記憶體的申請與釋放。
,
1,上程式碼:
from multiprocessing.pool import Pool from time import sleep def fun(a): sleep(1) print(a) if __name__ == '__main__': p = Pool() # 這裡不加引數,但是程序池的預設大小,等於電腦CPU的核數 # 也是建立子程序的個數,也是每次列印的數字的個數 for i in range(10): p.apply_async(fun, args=(i,))
p.close() p.join() # 等待所有子程序結束,再往後執行 print("end")
2,callback 舉例:
from multiprocessing import Process,Pool def func(i): i+=1 return i#普通程序處理過的資料返回給主程序p1 def call_back(p1): p1+=1 print(p1) if __name__ == '__main__': p = Pool() for i in range(10): p1 = p.apply_async(func,args=(i,),callback = call_back)#p呼叫普通程序並且接受其返回值,將返回值給要執行的回撥函式處理 p.close() p.join()
解析: 1,p.apply ( func,args = ()) 同步的效率,也就是說池中的程序一個一個的去執行任務
p.apply_async( func,args = () , callback = None) : 非同步的效率,也就是池中的程序一次性都去執行任務.
2,非同步處理任務時 : 必須要加上close和join. 程序池的所有程序都是守護程序(主程序程式碼執行結束,守護程序就結束).
3,func : 程序池中的程序執行的任務函式
4,args : 可迭代物件性的引數,是傳給任務函式的引數
5,callback : 回撥函式,就是每當程序池中有程序處理完任務了,返回的結果可以交給回撥函式,
由回撥函式進行進一步處理,回撥函式只非同步才有,同步沒有.回撥函式是父程序呼叫.
3. map( func,iterable) (該方法經常用到爬蟲)
from multiprocessing import Pool def func(num): num += 1 print(num) return num if __name__ == '__main__': p = Pool(2) res = p.map(func,[i for i in range(100)]) # p.close()#map方法自帶這兩種功能 # p.join() print('主程序中map的返回值',res)
func : 程序池中的程序執行的任務函式
iterable : 可迭代物件,是把可迭代物件那個中的每個元素一次傳給任務函式當引數.
map方法自帶close和join
程序間的通訊:
1)佇列
from multiprocessing import Queue,Process import os,time,random #新增資料函式 def proc_write(queue,urls): print("程序(%s)正在寫入..."%(os.getpid())) for url in urls: queue.put(url) print("%s被寫入到佇列中"%(url)) time.sleep(random.random()*3) #讀取資料函式 def proc_read(queue): print("程序(%s)正在讀取..."%(os.getpid())) while True: url = queue.get() print("從佇列中提取到:%s"%(url)) if __name__ =="__main__":
queue = Queue() proc_writer1 = Process(target=proc_write,args=(queue,["ur1","ur2","ur3","ur4"])) proc_writer2 = Process(target=proc_write,args=(queue,["ur5","ur6","ur7","ur8"])) proc_reader = Process(target=proc_read,args=(queue,)) proc_writer1.start() proc_writer1.join() proc_writer2.start() proc_writer2.join() proc_reader.start() proc_reader.terminate()
生產者與消費者模式(執行緒間的通訊):
from queue import Queue import threading,time class Producer(threading.Thread): def run(self): global queue count = 0 while True: if queue.qsize() < 1000: for i in range(100): count = count +1 msg = '生成產品'+str(count) queue.put(msg) print(msg) time.sleep(0.5) class Consumer(threading.Thread): def run(self): global queue while True: if queue.qsize() > 100: for i in range(3): msg = self.name + '消費了 '+queue.get() print(msg) time.sleep(1) if __name__ == '__main__': queue = Queue() for i in range(500): queue.put('初始產品'+str(i)) for i in range(2): p = Producer() p.start() for i in range(5): c = Consumer() c.start()
2) 程序間的通訊(管道)
from multiprocessing import Pipe,Process import random,time,os def proc_send(pipe,urls): for url in urls: print("程序(%s)傳送:%s"%(os.getpid(),url)) pipe.send(url) time.sleep(random.random()) def proc_recv(pipe): while True: print("程序(%s)接收到:%s"%(os.getpid(),pipe.recv())) time.sleep(random.random()) if __name__ == "__main__": pipe = Pipe() p1 = Process(target=proc_send,args=(pipe[0],["url_"+str(i) for i in range(10)],)) p2 = Process(target=proc_recv,args=(pipe[1],)) p1.start() p2.start() p1.join() p2.terminate()
解析:
pipe用於兩個程序間的通訊,兩個程序分別位於管道的兩端,Pipe方法返回(conn1,conn2)代表一個管道的兩端,
Pipe方法有dumplex引數,若該引數為True,管道為全雙工模式,
若為Fasle,conn1只負責接收訊息,conn2只負責傳送訊息.send和recv方法分別是傳送和接收訊息的方法
協程:
協程:是更小的執行單位,是一種輕量級的執行緒,協程的切換隻是單純的操作CPU的上下文,所以切換速度特別快,且耗能小。
gevent是第三方庫,通過greenlet實現協程,其基本思想是:
當一個greenlet遇到IO操作時,比如訪問網路,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由於IO操作非常耗時,經常使程式處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在執行,而不是等待IO。
由於切換是在IO操作時自動完成,所以gevent需要修改Python自帶的一些標準庫,這一過程在啟動時通過monkey patch完成:
from gevent import monkey monkey.patch_all() # 用來在執行時動態修改已有的程式碼,而不需要修改原始程式碼。 import gevent import requests def f(url): print('GET: %s' % url) html = requests.get(url).text print(url, len(html)) gevent.joinall([ gevent.spawn(f, 'http://i.maxthon.cn/'), # 先執行這個函式,傳送請求,等待的時候傳送第二個請求 gevent.spawn(f, 'http://www.jianshu.com/u/3cfeb3395a95'), gevent.spawn(f, 'http://edu.51cto.com/?jydh')])
執行結果:
GET: http://i.maxthon.cn/ GET: http://www.jianshu.com/u/3cfeb3395a95 GET: http://edu.51cto.com/?jydh http://i.maxthon.cn/ 461786 http://edu.51cto.com/?jydh 353858 http://www.jianshu.com/u/3cfeb3395a95 597
從結果看,3個網路操作是併發執行的,而且結束順序不同,但只有一個執行緒。
使用gevent,可以獲得極高的併發效能,但gevent只能在Unix/Linux下執行,在Windows下不保證正常安裝和執行。