python多線程和多進程(二)
---恢復內容開始---
一、多進程
1、multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。
import time from multiprocessing import Process def func(name): print(‘%s 函數開始,time:%s‘ %(name,time.ctime())) # time.sleep(2) print(‘%s 函數結束,time:%s‘ %(name,time.ctime())) if __name__ == "__main__": p1=Process(target=func,args=(‘one‘,)) p2=Process(target=func,args=(‘two‘,)) p3=Process(target=func,args=(‘three‘,)) p4=Process(target=func,args=(‘four‘,)) p5 = Process(target=func, args=(‘five‘,)) p1.start() p2.start() p3.start() p4.start() p5.start()print(u‘主進程%s‘%time.ctime())
結果:
one 函數開始,time:Sat Sep 08 17:15:40 2018 two 函數開始,time:Sat Sep 08 17:15:40 2018 主進程Sat Sep 08 17:15:40 2018 three 函數開始,time:Sat Sep 08 17:15:40 2018 four 函數開始,time:Sat Sep 08 17:15:40 2018 five 函數開始,time:Sat Sep 08 17:15:40 2018 one 函數結束,time:Sat Sep 08 17:15:42 2018 two 函數結束,time:Sat Sep 08 17:15:42 2018 three 函數結束,time:Sat Sep 08 17:15:42 2018 four 函數結束,time:Sat Sep 08 17:15:42 2018 five 函數結束,time:Sat Sep 08 17:15:42 2018
*所有函數並發執行(註意:在windows中Process()必須放到# if __name__ == ‘__main__‘:下#方法二(和上面效果一樣)
import time from multiprocessing import Process class Piao(Process): def __init__(self,name): Process.__init__(self) self.name=name def run(self): print(‘%s 函數開始,time:%s‘ %(self.name,time.ctime())) time.sleep(2) print(‘%s 函數結束,time:%s‘ %(self.name,time.ctime())) if __name__ == ‘__main__‘: p1=Piao(‘one‘) p2=Piao(‘two‘) p3=Piao(‘three‘) p4=Piao(‘four‘)
#p.daemon = True(進程守護和線程守護一樣)
p1.start()
p2.start()
p3.start() p4.start() print(‘主線程%s‘%time.ctime())
2、join()方法是用來讓主進程等待所有子進程結束並不影響子進程之間的並發:
import time from multiprocessing import Process class Piao(Process): def __init__(self,name): Process.__init__(self) self.name=name def run(self): print(‘%s 函數開始,time:%s‘ %(self.name,time.ctime())) time.sleep(2) print(‘%s 函數結束,time:%s‘ %(self.name,time.ctime())) if __name__ == ‘__main__‘: tasks = [] tasks.append(Piao(‘one‘)) tasks.append(Piao(‘two‘)) tasks.append(Piao(‘three‘)) tasks.append(Piao(‘four‘)) for p in tasks: p.start() for p in tasks: p.join() print(‘主線程%s‘%time.ctime())
結果:
one 函數開始,time:Sat Sep 08 17:37:43 2018
two 函數開始,time:Sat Sep 08 17:37:43 2018
three 函數開始,time:Sat Sep 08 17:37:43 2018
four 函數開始,time:Sat Sep 08 17:37:43 2018
one 函數結束,time:Sat Sep 08 17:37:45 2018
two 函數結束,time:Sat Sep 08 17:37:45 2018
three 函數結束,time:Sat Sep 08 17:37:45 2018
four 函數結束,time:Sat Sep 08 17:37:45 2018
主線程Sat Sep 08 17:37:45 2018 #主進程等所有子進程結束,子進程之間並不影響並發
5、進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就 是錯亂,如何控制,就是加鎖處理
#並發運行,效率高,但競爭同一打印終端,帶來了打印錯亂 #由並發變成了串行,犧牲了運行效率,但避免了競爭 from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() print(‘%s 函數開始,time:%s‘ %(os.getpid(),time.ctime())) time.sleep(2) print(‘%s 函數結束,time:%s‘ %(os.getpid(),time.ctime())) lock.release() if __name__ == ‘__main__‘: lock=Lock() for i in range(3): p=Process(target=work,args=(lock,)) p.start()
24464 函數開始,time:Sat Sep 08 18:02:17 2018 24464 函數結束,time:Sat Sep 08 18:02:19 2018 21072 函數開始,time:Sat Sep 08 18:02:19 2018 21072 函數結束,time:Sat Sep 08 18:02:21 2018 13536 函數開始,time:Sat Sep 08 18:02:21 2018 13536 函數結束,time:Sat Sep 08 18:02:23 2018
之前打印是同一時間,是多個進程共享同一打印終端,但是加了鎖後就不能共享了,但是依舊是並發,是鎖限制了共享終端,在讀寫文件是需要枷鎖,不然容易造成錯亂
6、隊列
Queue([maxsize]):創建共享的進程隊列,Queue是多進程安全的隊列,常用來在生產者和消費者線程之間的信息傳遞。queue.put方法用以插入數據到隊列中,queue.get方法用來沖數據隊列去除數據(先進先出)
產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
基於隊列實現生產者消費者模型
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get()
if res in None: break time.sleep(random.randint(1,3)) print(‘\033[45m%s 吃 %s\033[0m‘ %(os.getpid(),res)) def producer(q): for i in range(20): time.sleep(random.randint(1,3)) res=‘包子%s‘ %i q.put(res) print(‘\033[44m%s 生產了 %s\033[0m‘ %(os.getpid(),res))
q.put(None) if __name__ == ‘__main__‘: q=Queue() #生產者 p1=Process(target=producer,args=(q,)) #消費者 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print(‘主進程‘)
7、進程池
Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那麽就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那麽該請求就會等待,直到池中有進程結束,就重用進程池中的進程。
from multiprocessing import Pool import os,time def work(n): print(‘%s run,time:%s‘ %(os.getpid(),time.ctime())) time.sleep(3) return n**2 if __name__ == ‘__main__‘: p=Pool(3) #進程池中從無到有創建三個進程,以後一直是這三個進程在執行任務 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) #同步運行,阻塞、直到本次任務執行完畢拿到res res_l.append(res) #異步apply_async用法:如果使用異步提交的任務,主進程需要使用jion,等待進程池內任務都處理完,然後可以用get收集結果,否則,主進程結束,進程池可能還沒來得及執行,也就跟著一起結束了 p.close() p.join() for res in res_l: print res.get(), #使用get來獲取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻獲取結果,也根本無需get,‘,’無須換行打印
結果:
6684 run,time:Mon Sep 10 11:41:22 2018 732 run,time:Mon Sep 10 11:41:22 2018 10748 run,time:Mon Sep 10 11:41:22 2018 6684 run,time:Mon Sep 10 11:41:25 2018 732 run,time:Mon Sep 10 11:41:25 2018 10748 run,time:Mon Sep 10 11:41:25 2018 6684 run,time:Mon Sep 10 11:41:28 2018 732 run,time:Mon Sep 10 11:41:28 2018 10748 run,time:Mon Sep 10 11:41:28 2018 6684 run,time:Mon Sep 10 11:41:31 2018 0 1 4 9 16 25 36 49 64 81
#進程池最大容量設置三,因此每次只有三個異步進程執行,等待執行結束再安排
---恢復內容結束---
python多線程和多進程(二)