1. 程式人生 > >Python學習【第21篇】:程序池以及回撥函式 python併發程式設計之多程序2-------------資料共享及程序池和回撥函式

Python學習【第21篇】:程序池以及回撥函式 python併發程式設計之多程序2-------------資料共享及程序池和回撥函式

python併發程式設計之多程序2-------------資料共享及程序池和回撥函式

一、資料共享

1.程序間的通訊應該儘量避免共享資料的方式

2.程序間的資料是獨立的,可以藉助佇列或管道實現通訊,二者都是基於訊息傳遞的。

雖然程序間資料獨立,但可以用過Manager實現資料共享,事實上Manager的功能遠不止於此。

?
1 2 3 4 命令就是一個程式,按回車就會執行(這個只是在windows情況下) tasklist 檢視程序 tasklist | findstr  pycharm    #(findstr是進行過濾的),|就是管道(tasklist執行的內容就放到管道里面了,
管道後面的findstr  pycharm就接收了)

3.(IPC)程序之間的通訊有兩種實現方式:管道和佇列

複製程式碼
 1 from multiprocessing import Manager,Process,Lock
 2 def work(dic,mutex):
 3     # mutex.acquire()
 4     # dic['count']-=1
 5     # mutex.release()
 6     # 也可以這樣加鎖
 7     with mutex:
 8         dic['count'] -= 1
 9 if __name__ == '__main__':
10     mutex = Lock()
11     m = Manager()  #實現共享,由於字典是共享的字典,所以得加個鎖
12     share_dic = m.dict({'count':100}) 13 p_l = [] 14 for i in range(100): 15 p = Process(target=work,args=(share_dic,mutex)) 16 p_l.append(p) #先新增進去 17  p.start() 18 for i in p_l: 19  i.join() 20 print(share_dic) 21 # 共享就意味著會有競爭,
複製程式碼

二、程序池

在利用Python進行系統管理的時候,特別是同時操作多個檔案目錄,或者遠端控制多臺主機,並行操作可以節約大量的時間。多程序是實現併發的手段之一,需要注意的問題是:

  1. 很明顯需要併發執行的任務通常要遠大於核數
  2. 一個作業系統不可能無限開啟程序,通常有幾個核就開幾個程序
  3. 程序開啟過多,效率反而會下降(開啟程序是需要佔用系統資源的,而且開啟多餘核數目的程序也無法做到並行)

例如當被操作物件數目不大時,可以直接利用multiprocessing中的Process動態成生多個程序,十幾個還好,但如果是上百個,上千個。。。手動的去限制程序數量卻又太過繁瑣,此時可以發揮程序池的功效。

那麼什麼是程序池呢?程序池就是控制程序數目

 ps:對於遠端過程呼叫的高階應用程式而言,應該使用程序池,Pool可以提供指定數量的程序,供使用者呼叫,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,就重用程序池中的程序。 

程序池的結構:

 建立程序池的類:如果指定numprocess為3,則程序池會從無到有建立三個程序,然後自始至終使用這三個程序去執行所有任務,不會開啟其他程序

1.建立程序池

?
1 Pool([numprocess  [,initializer [, initargs]]]):建立程序池

2.引數介紹

?
1 2 3 numprocess:要建立的程序數,如果省略,將預設為cpu_count()的值,可os.cpu_count()檢視 initializer:是每個工作程序啟動時要執行的可呼叫物件,預設為 None initargs:是要傳給initializer的引數組

3.方法介紹

?
1 2 3 4 5 6 7 8 9 10 11 12 13 p. apply (func [, args [, kwargs]]):在一個池工作程序中執行 func( * args, * * kwargs),然後返回結果。 需要強調的是:此操作並不會在所有池工作程序中並執行func函式。 如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p. apply () 函式或者使用p.apply_async()     p.apply_async(func [, args [, kwargs]]):在一個池工作程序中執行func( * args, * * kwargs),然後返回結果。此方法的結果是AsyncResult類的例項, callback是可呼叫物件,接收輸入引數。當func的結果變為可用時, 將理解傳遞給callback。callback禁止執行任何阻塞操作, 否則將接收其他非同步操作中的結果。      p.close():關閉程序池,防止進一步操作。禁止往程序池內在新增任務(需要注意的是一定要寫在close()的上方)
?
1 P.jion():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫

應用1:

複製程式碼
 1 from multiprocessing import Pool
 2 import os,time
 3 def task(n):
 4     print('[%s] is running'%os.getpid())
 5     time.sleep(2)
 6     print('[%s] is done'%os.getpid()) 7 return n**2 8 if __name__ == '__main__': 9 # print(os.cpu_count()) #檢視cpu個數 10 p = Pool(4) #最大四個程序 11 for i in range(1,7):#開7個任務 12 res = p.apply(task,args=(i,)) #同步的,等著一個執行完才執行另一個 13 print('本次任務的結束:%s'%res) 14 p.close()#禁止往程序池內在新增任務 15 p.join() #在等程序池 16 print('主')
複製程式碼

 

複製程式碼
 1 # ----------------
 2 # 那麼我們為什麼要用程序池呢?這是因為程序池使用來控制程序數目的,
 3 # 我們需要幾個就開幾個程序。如果不用程序池實現併發的話,會開很多的程序
 4 # 如果你開的程序特別多,那麼你的機器就會很卡,所以我們把程序控制好,用幾個就
 5 # 開幾個,也不會太佔用記憶體
 6 from multiprocessing import Pool
 7 import os,time
 8 def walk(n):
 9     print('task[%s] running...'%os.getpid())
10     time.sleep(3)
11     return n**2
12 if __name__ == '__main__': 13 p = Pool(4) 14 res_obj_l = [] 15 for i in range(10): 16 res = p.apply_async(walk,args=(i,)) 17 # print(res) #打印出來的是物件 18 res_obj_l.append(res) #那麼現在拿到的是一個列表,怎麼得到值呢?我們用個.get方法 19 p.close() #禁止往程序池裡新增任務 20  p.join() 21 # print(res_obj_l) 22 print([obj.get() for obj in res_obj_l]) #這樣就得到了
複製程式碼

 

 

 

那麼什麼是同步,什麼是非同步呢?

同步就是指一個程序在執行某個請求的時候,若該請求需要一段時間才能返回資訊,那麼這個程序將會一直等待下去,直到收到返回資訊才繼續執行下去

非同步是指程序不需要一直等下去,而是繼續執行下面的操作,不管其他程序的狀態。當有訊息返回時系統會通知程序進行處理,這樣可以提高執行的效率。

什麼是序列,什麼是並行呢?

舉例:能並排開幾輛車的就可以說是“並行”,只能一輛一輛開的就屬於“序列”了。很明顯,並行的速度要比序列的快得多。(並行互不影響,序列的等著一個完了才能接著另一個)

應用2:

使用程序池維護固定數目的程序(以前客戶端和服務端的改進)

複製程式碼
 1 from socket import *
 2 from multiprocessing import Pool
 3 s = socket(AF_INET,SOCK_STREAM)
 4 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #埠重用
 5 s.bind(('127.0.0.1',8081))
 6 s.listen(5)
 7 print('start running...')
 8 def talk(coon,addr): 9 while True: # 通訊迴圈 10 try: 11 cmd = coon.recv(1024) 12 print(cmd.decode('utf-8')) 13 if not cmd: break 14  coon.send(cmd.upper()) 15 print('傳送的是%s'%cmd.upper().decode('utf-8')) 16 except Exception: 17 break 18  coon.close() 19 if __name__ == '__main__': 20 p = Pool(4) 21 while True:#連結迴圈 22 coon,addr = s.accept() 23 print(coon,addr) 24 p.apply_async(talk,args=(coon,addr)) 25  s.close() 26 #因為是迴圈,所以就不用p.join了
複製程式碼 複製程式碼
 1 from socket import *
 2 c = socket(AF_INET,SOCK_STREAM)
 3 c.connect(('127.0.0.1',8081))
 4 while True:
 5     cmd = input('>>:').strip()
 6     if not cmd:continue
 7     c.send(cmd.encode('utf-8'))
 8     data = c.recv(1024) 9 print('接受的是%s'%data.decode('utf-8')) 10 c.close()
複製程式碼

三、回撥函式

?
1 2 3 4 5 6 7 回撥函式什麼時候用?(回撥函式在爬蟲中最常用) 造資料的非常耗時 處理資料的時候不耗時     你下載的地址如果完成了,就自動提醒讓主程序解析 誰要是好了就通知解析函式去解析(回撥函式的強大之處)

需要回調函式的場景:程序池中任何一個任務一旦處理完了,就立即告知主程序:我好了額,你可以處理我的結果了。主程序則呼叫一個函式去處理該結果,該函式即回撥函式

我們可以把耗時間(阻塞)的任務放到程序池中,然後指定回撥函式(主程序負責執行),這樣主程序在執行回撥函式時就省去了I/O的過程,直接拿到的是任務的結果。

複製程式碼
 1 from  multiprocessing import Pool
 2 import requests
 3 import os
 4 import time
 5 def get_page(url):
 6     print('<%s> is getting [%s]' %(os.getpid(),url)) 7 response = requests.get(url) #得到地址 8 time.sleep(2) 9 print('<%s> is done [%s]'%(os.getpid(),url)) 10 return {'url':url,'text':response.text} 11 def parse_page(res): 12 '''解析函式''' 13 print('<%s> parse [%s]'%(os.getpid(),res['url'])) 14 with open('db.txt','a') as f: 15 parse_res = 'url:%s size:%s\n' %(res['url'],len(res['text'])) 16  f.write(parse_res) 17 if __name__ == '__main__': 18 p = Pool(4) 19 urls = [ 20 'https://www.baidu.com', 21 'http://www.openstack.org', 22 'https://www.python.org', 23 'https://help.github.com/', 24 'http://www.sina.com.cn/' 25  ] 26 for url in urls: 27 obj = p.apply_async(get_page,args=(url,),callback=parse_page) 28  p.close() 29  p.join() 30 print('主',os.getpid()) #都不用.get()方法了
複製程式碼

如果在主程序中等待程序池中所有任務都執行完畢後,再統一處理結果,則無需回撥函式

複製程式碼
 1 from  multiprocessing import Pool
 2 import requests
 3 import os
 4 def get_page(url):
 5     print('<%os> get [%s]' %(os.getpid(),url))
 6     response = requests.get(url)  #得到地址  response響應
 7     return {'url':url,'text':response.text} 8 if __name__ == '__main__': 9 p = Pool(4) 10 urls = [ 11 'https://www.baidu.com', 12 'http://www.openstack.org', 13 'https://www.python.org', 14 'https://help.github.com/', 15 'http://www.sina.com.cn/' 16  ] 17 obj_l= [] 18 for url in urls: 19 obj = p.apply_async(get_page,args=(url,)) 20  obj_l.append(obj) 21  p.close() 22  p.join() 23 print([obj.get() for obj in obj_l])
複製程式碼

 

一、資料共享

1.程序間的通訊應該儘量避免共享資料的方式

2.程序間的資料是獨立的,可以藉助佇列或管道實現通訊,二者都是基於訊息傳遞的。

雖然程序間資料獨立,但可以用過Manager實現資料共享,事實上Manager的功能遠不止於此。

?
1 2 3 4 命令就是一個程式,按回車就會執行(這個只是在windows情況下) tasklist 檢視程序 tasklist | findstr  pycharm    #(findstr是進行過濾的),|就是管道(tasklist執行的內容就放到管道里面了, 管道後面的findstr  pycharm就接收了)

3.(IPC)程序之間的通訊有兩種實現方式:管道和佇列

複製程式碼
 1 from multiprocessing import Manager,Process,Lock
 2 def work(dic,mutex):
 3     # mutex.acquire()
 4     # dic['count']-=1
 5     # mutex.release()
 6     # 也可以這樣加鎖
 7     with mutex:
 8         dic['count'] -= 1
 9 if __name__ == '__main__':
10     mutex = Lock()
11     m = Manager()  #實現共享,由於字典是共享的字典,所以得加個鎖
12     share_dic = m.dict({'count':100}) 13 p_l = [] 14 for i in range(100): 15 p = Process(target=work,args=(share_dic,mutex)) 16 p_l.append(p) #先新增進去 17  p.start() 18 for i in p_l: 19  i.join() 20 print(share_dic) 21 # 共享就意味著會有競爭,
複製程式碼

二、程序池

在利用Python進行系統管理的時候,特別是同時操作多個檔案目錄,或者遠端控制多臺主機,並行操作可以節約大量的時間。多程序是實現併發的手段之一,需要注意的問題是:

  1. 很明顯需要併發執行的任務通常要遠大於核數
  2. 一個作業系統不可能無限開啟程序,通常有幾個核就開幾個程序
  3. 程序開啟過多,效率反而會下降(開啟程序是需要佔用系統資源的,而且開啟多餘核數目的程序也無法做到並行)

例如當被操作物件數目不大時,可以直接利用multiprocessing中的Process動態成生多個程序,十幾個還好,但如果是上百個,上千個。。。手動的去限制程序數量卻又太過繁瑣,此時可以發揮程序池的功效。

那麼什麼是程序池呢?程序池就是控制程序數目

 ps:對於遠端過程呼叫的高階應用程式而言,應該使用程序池,Pool可以提供指定數量的程序,供使用者呼叫,當有新的請求提交到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,就重用程序池中的程序。 

程序池的結構:

 建立程序池的類:如果指定numprocess為3,則程序池會從無到有建立三個程序,然後自始至終使用這三個程序去執行所有任務,不會開啟其他程序

1.建立程序池

?
1 Pool([numprocess  [,initializer [, initargs]]]):建立程序池

2.引數介紹

?
1 2 3 numprocess:要建立的程序數,如果省略,將預設為cpu_count()的值,可os.cpu_count()檢視 initializer:是每個工作程序啟動時要執行的可呼叫物件,預設為 None initargs:是要傳給initializer的引數組

3.方法介紹

?
1 2 3 4 5 6 7 8 9 10 11 12 13 p. apply (func [, args [, kwargs]]):在一個池工作程序中執行 func( * args, * * kwargs),然後返回結果。 需要強調的是:此操作並不會在所有池工作程序中並執行func函式。 如果要通過不同引數併發地執行func函式,必須從不同執行緒呼叫p. apply () 函式或者使用p.apply_async()     p.apply_async(func [, args [, kwargs]]):在一個池工作程序中執行func( * args, * * kwargs),然後返回結果。此方法的結果是AsyncResult類的例項, callback是可呼叫物件,接收輸入引數。當func的結果變為可用時, 將理解傳遞給callback。callback禁止執行任何阻塞操作, 否則將接收其他非同步操作中的結果。      p.close():關閉程序池,防止進一步操作。禁止往程序池內在新增任務(需要注意的是一定要寫在close()的上方)
?
1 P.jion():等待所有工作程序退出。此方法只能在close()或teminate()之後呼叫

應用1:

複製程式碼
 1 from multiprocessing import Pool
 2 import os,time
 3 def task(n):
 4     print('[%s] is running'%os.getpid())
 5     time.sleep(2)
 6     print('[%s] is done'%os.getpid()) 7 return n**2 8 if __name__ == '__main__': 9 # print(os.cpu_count()) #檢視cpu個數 10 p = Pool(4) #最大四個程序 11 for i in range(1,7):#開7個任務 12 res = p.apply(task,args=(i,)) #同步的,等著一個執行完才執行另一個 13 print('本次任務的結束:%s'%res) 14 p.close()#禁止往程序池內在新增任務 15 p.join() #在等程序池 16 print('主')
複製程式碼

 

複製程式碼
 1 # ----------------
 2 # 那麼我們為什麼要用程序池呢?這是因為程序池使用來控制程序數目的,
 3 # 我們需要幾個就開幾個程序。如果不用程序池實現併發的話,會開很多的程序
 4 # 如果你開的程序特別多,那麼你的機器就會很卡,所以我們把程序控制好,用幾個就
 5 # 開幾個,也不會太佔用記憶體
 6 from multiprocessing import Pool
 7 import os,time
 8 def walk(n):
 9     print('task[%s] running...'%os.getpid())
10     time.sleep(3)
11     return n**2
12 if __name__ == '__main__': 13 p = Pool(4) 14 res_obj_l = [] 15 for i in range(10): 16 res = p.apply_async(walk,args=(i,)) 17 # print(res) #打印出來的是物件 18 res_obj_l.append(res) #那麼現在拿到的是一個列表,怎麼得到值呢?我們用個.get方法 19 p.close() #禁止往程序池裡新增任務 20  p.join() 21 # print(res_obj_l) 22 print([obj.get() for obj in res_obj_l]) #這樣就得到了
複製程式碼

 

 

 

那麼什麼是同步,什麼是非同步呢?

同步就是指一個程序在執行某個請求的時候,若該請求需要一段時間才能返回資訊,那麼這個程序將會一直等待下去,直到收到返回資訊才繼續執行下去

非同步是指程序不需要一直等下去,而是繼續執行下面的操作,不管其他程序的狀態。當有訊息返回時系統會通知程序進行處理,這樣可以提高執行的效率。

什麼是序列,什麼是並行呢?

舉例:能並排開幾輛車的就可以說是“並行”,只能一輛一輛開的就屬於“序列”了。很明顯,並行的速度要比序列的快得多。(並行互不影響,序列的等著一個完了才能接著另一個)

應用2:

使用程序池維護固定數目的程序(以前客戶端和服務端的改進)

複製程式碼
 1 from socket import *
 2 from multiprocessing import Pool
 3 s = socket(AF_INET,SOCK_STREAM)
 4 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #埠重用
 5 s.bind(('127.0.0.1',8081))
 6 s.listen(5)
 7 print('start running...')
 8 def talk(coon,addr): 9 while True: # 通訊迴圈 10 try: 11 cmd = coon.recv(1024) 12 print(cmd.decode('utf-8')) 13 if not cmd: break 14  coon.send(cmd.upper()) 15 print('傳送的是%s'%cmd.upper().decode('utf-8')) 16 except Exception: 17 break 18  coon.close() 19 if __name__ == '__main__': 20 p = Pool(4) 21 while True:#連結迴圈 22 coon,addr = s.accept() 23 print(coon,addr) 24 p.apply_async(talk,args=(coon,addr)) 25  s.close() 26 #因為是迴圈,所以就不用p.join了
複製程式碼 複製程式碼
 1 from socket import *
 2 c = socket(AF_INET,SOCK_STREAM)
 3 c.connect(('127.0.0.1',8081))
 4 while True:
 5     cmd = input('>>:').strip()
 6     if not cmd:continue
 7     c.send(cmd.encode('utf-8'))
 8     data = c.recv(1024) 9 print('接受的是%s'%data.decode('utf-8')) 10 c.close()
複製程式碼

三、回撥函式

?
1 2 3 4 5 6 7 回撥函式什麼時候用?(回撥函式在爬蟲中最常用) 造資料的非常耗時 處理資料的時候不耗時     你下載的地址如果完成了,就自動提醒讓主程序解析 誰要是好了就通知解析函式去解析(回撥函式的強大之處)

需要回調函式的場景:程序池中任何一個任務一旦處理完了,就立即告知主程序:我好了額,你可以處理我的結果了。主程序則呼叫一個函式去處理該結果,該函式即回撥函式

我們可以把耗時間(阻塞)的任務放到程序池中,然後指定回撥函式(主程序負責執行),這樣主程序在執行回撥函式時就省去了I/O的過程,直接拿到的是任務的結果。

複製程式碼
 1 from  multiprocessing import Pool
 2 import requests
 3 import os
 4 import time
 5 def get_page(url):
 6     print('<%s> is getting [%s]' %(os.getpid(),url)) 7 response = requests.get(url) #得到地址 8 time.sleep(2) 9 print('<%s> is done [%s]'%(os.getpid(),url)) 10 return {'url':url,'text':response.text} 11 def parse_page(res): 12 '''解析函式''' 13 print('<%s> parse [%s]'%(os.getpid(),res['url'])) 14 with open('db.txt','a') as f: 15 parse_res = 'url:%s size:%s\n' %(res['url'],len(res['text'])) 16  f.write(parse_res) 17 if __name__ == '__main__': 18 p = Pool(4) 19 urls = [ 20 'https://www.baidu.com', 21 'http://www.openstack.org', 22 'https://www.python.org', 23 'https://help.github.com/', 24 'http://www.sina.com.cn/' 25  ] 26 for url in urls: 27 obj = p.apply_async(get_page,args=(url,),callback=parse_page) 28  p.close() 29  p.join() 30 print('主',os.getpid()) #都不用.get()方法了
複製程式碼

如果在主程序中等待程序池中所有任務都執行完畢後,再統一處理結果,則無需回撥函式

複製程式碼
 1 from  multiprocessing import Pool
 2 import requests
 3 import os
 4 def get_page(url):
 5     print('<%os> get [%s]' %(os.getpid(),url))
 6     response = requests.get(url)  #得到地址  response響應
 7     return {'url':url,'text':response.text} 8 if __name__ == '__main__': 9 p = Pool(4) 10 urls = [ 11 'https://www.baidu.com', 12 'http://www.openstack.org', 13 'https://www.python.org', 14 'https://help.github.com/', 15 'http://www.sina.com.cn/' 16  ] 17 obj_l= [] 18 for url in urls: 19 obj = p.apply_async(get_page,args=(url,)) 20  obj_l.append(obj) 21  p.close() 22  p.join() 23 print([obj.get() for obj in obj_l])
複製程式碼