1. 程式人生 > >Python開發基礎--- 進程間通信、進程池、協程

Python開發基礎--- 進程間通信、進程池、協程

start seq n+1 解釋 producer 其他 top 同時 pipe

進程間通信

進程彼此之間互相隔離,要實現進程間通信(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的。

進程隊列queue

不同於線程queue,進程queue的生成是用multiprocessing模塊生成的。

在生成子進程的時候,會將代碼拷貝到子進程中執行一遍,及子進程擁有和主進程內容一樣的不同的名稱空間。

示例1:

技術分享
 1 import multiprocessing
 2 def foo():
 3     q.put([11,‘hello‘,True])
 4     print(q.qsize())
 5 
 6 q=multiprocessing.Queue() #全局定義一個q進程隊列,在產生子進程時候會在子進程裏生成,可以指定最大數,限制隊列長度
 7 if __name__ == ‘__main__‘:
 8     p=multiprocessing.Process(target=foo,args=()) #因為名稱空間不同,子進程的主線程創建的q隊列,主進程get不到,所以會阻塞住
 9     p.start()
10     # foo() #主進程執行一下函數就可以訪問到了
11     print(q.get())
技術分享

示例2:

技術分享
 1 import multiprocessing
 2 
 3 def foo():
 4     q.put([11,‘hello‘,True])
 5     print(q.qsize())
 6 
 7 if __name__ == ‘__main__‘:
 8     q = multiprocessing.Queue() #主進程創建一個q進程隊列
 9     p=multiprocessing.Process(target=foo,args=()) #因為名稱空間不同,子進程的主線程找不到q隊列,所以會報錯提示沒有q
10     p.start()
11     print(q.get())
技術分享

示例3:

技術分享
 1 import multiprocessing
 2 
 3 def foo(argument):      #定義函數處理進程隊列
 4     argument.put([11,‘hello‘,True])
 5     print(argument.qsize())
 6 q = multiprocessing.Queue() #全局定義一個進程隊列
 7 print(‘test‘)
 8 
 9 if __name__ == ‘__main__‘:
10     x = multiprocessing.Queue()   #主進程定義一個進程隊列
11     p=multiprocessing.Process(target=foo,args=(x,))     #主進程把值傳給子進程就可以處理了
12     p.start()
13     print(x.get())
14     # foo(q)
15     # print(q.get())
技術分享

常用方法

技術分享
q.put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩余的空間。如果超時,會拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即拋出Queue.Full異常。
q.get方法可以從隊列讀取並且刪除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(默認值),並且timeout為正值,那麽在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即返回該值,否則,如果隊列為空,則立即拋出Queue.Empty異常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():調用此方法時q為空則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中又加入了項目。
q.full():調用此方法時q已滿則返回True,該結果不可靠,比如在返回True的過程中,如果隊列中的項目被取走。
q.qsize():返回隊列中目前項目的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
技術分享

其他方法

技術分享
q.cancel_join_thread():不會在進程退出時自動連接後臺線程。可以防止join_thread()方法阻塞
q.close():關閉隊列,防止隊列中加入更多數據。調用此方法,後臺線程將繼續寫入那些已經入隊列但尚未寫入的數據,但將在此方法完成時馬上關閉。如果q被垃圾收集,將調用此方法。關閉隊列不會在隊列使用者中產生任何類型的數據結束信號或異常。例如,如果某個使用者正在被阻塞在get()操作上,關閉生產者中的隊列不會導致get()方法返回錯誤。
q.join_thread():連接隊列的後臺線程。此方法用於在調用q.close()方法之後,等待所有隊列項被消耗。默認情況下,此方法由不是q的原始創建者的所有進程調用。調用q.cancel_join_thread方法可以禁止這種行為
技術分享

另一個創建進程隊列的類

http://www.cnblogs.com/zero527/p/7211909.html

管道pipe

管道就是管道,就像生活中的管道,兩頭都能進能出

默認管道是全雙工的,如果創建管道的時候映射成False,左邊只能用於接收,右邊只能用於發送,類似於單行道

最簡單的管道雙向通信示例:

技術分享
 1 import multiprocessing
 2 
 3 def foo(sk):
 4     sk.send(‘hello world‘)
 5     print(sk.recv())
 6 
 7 if __name__ == ‘__main__‘:
 8     conn1,conn2=multiprocessing.Pipe()    #開辟兩個口,都是能進能出,括號中如果False即單向通信
 9     p=multiprocessing.Process(target=foo,args=(conn1,))  #子進程使用sock口,調用foo函數
10     p.start()
11     print(conn2.recv())  #主進程使用conn口接收
12     conn2.send(‘hi son‘) #主進程使用conn口發送
技術分享

常用方法

conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接的另外一端已經關閉,那麽recv方法會拋出EOFError。
conn1.send(obj):通過連接發送對象。obj是與序列化兼容的任意對象
註意:send()和recv()方法使用pickle模塊對對象進行序列化

其他方法

技術分享
conn1.close():關閉連接。如果conn1被垃圾回收,將自動調用此方法,不用的時候兩邊都要close

conn1.fileno():返回連接使用的整數文件描述符

conn1.poll([timeout]):如果連接上的數據可用,返回True。timeout指定等待的最長時限。如果省略此參數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待數據到達。

conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。如果進入的消息,超過了這個最大值,將引發IOError異常,並且在連接上無法進行進一步讀取。如果連接的另外一端已經關閉,再也不存在任何數據,將引發EOFError異常。

conn.send_bytes(buffer [, offset [, size]]):通過連接發送字節數據緩沖區,buffer是支持緩沖區接口的任意對象,offset是緩沖區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,然後調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩沖區接口(即bytearray對象或類似的對象)。offset指定緩沖區中放置消息處的字節位移。返回值是收到的字節數。如果消息長度大於可用的緩沖區空間,將引發BufferTooShort異常。
技術分享

註意:生產者和消費者都沒有使用管道的某個端點,就應該將其關閉,如在生產者中關閉管道的右端,在消費者中關閉管道的左端。如果忘記執行這些步驟,程序可能再消費者中的recv()操作上掛起。管道是由操作系統進行引用計數的,必須在所有進程中關閉管道後才能生產EOFError異常。因此在生產者中關閉管道不會有任何效果,付費消費者中也關閉了相同的管道端點。

 1 from multiprocessing import Process,Pipe
 2 
 3 import time,os
 4 def consumer(p,name):
 5     left,right=p
 6     left.close()
 7     while True:
 8         try:
 9             baozi=right.recv()
10             print(‘%s 收到包子:%s‘ %(name,baozi))
11         except EOFError:
12             right.close()
13             break
14 def producer(seq,p):
15     left,right=p
16     right.close()
17     for i in seq:
18         left.send(i)
19         # time.sleep(1)
20     else:
21         left.close()
22 if __name__ == ‘__main__‘:
23     left,right=Pipe()
24     c1=Process(target=consumer,args=((left,right),‘c1‘))
25     c1.start()
26     seq=(i for i in range(10))
27     producer(seq,(left,right))
28     right.close()
29     left.close()
30     c1.join()
31     print(‘主進程‘)

技術分享
 1 from multiprocessing import Process,Pipe
 2 
 3 import time,os
 4 def consumer(p,name):
 5     left,right=p
 6     left.close()
 7     while True:
 8         try:
 9             baozi=right.recv()
10             print(‘%s 收到包子:%s‘ %(name,baozi))
11         except EOFError:
12             right.close()
13             break
14 def producer(seq,p):
15     left,right=p
16     right.close()
17     for i in seq:
18         left.send(i)
19         # time.sleep(1)
20     else:
21         left.close()
22 if __name__ == ‘__main__‘:
23     left,right=Pipe()
24     c1=Process(target=consumer,args=((left,right),‘c1‘))
25     c1.start()
26     seq=(i for i in range(10))
27     producer(seq,(left,right))
28     right.close()
29     left.close()
30     c1.join()
31     print(‘主進程‘)
技術分享

共享數據manage

Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另一個進程的數據

註:進程間通信應該盡量避免使用共享數據的方式

共享數據:列表

技術分享
 1 from multiprocessing import Manager,Process
 2 def foo(l,i):
 3     l.append(i**i)
 4 if __name__ == ‘__main__‘:
 5     man=Manager()
 6     ml=man.list([11,22,33])
 7     l=[]
 8     for i in range(5):
 9         p=Process(target=foo,args=(ml,i))
10         p.start()
11         l.append(p)
12     for i in l: #必須要join,不然會執行報錯,處理一個數據必須要一個個來,不能同時處理一個數據
13         i.join()
14     print(ml)
技術分享

共享數據:字典

技術分享
 1 from multiprocessing import Manager,Process
 2 def foo(d,k,v):
 3     d[k]=v
 4 if __name__ == ‘__main__‘:
 5     man=Manager()
 6     md=man.dict({‘name‘:‘bob‘})
 7     l=[]
 8     for i in range(5):
 9         p=Process(target=foo,args=(md,i,‘a‘))
10         p.start()
11         l.append(p)
12     for i in l: #必須要join,不然會執行報錯,處理一個數據必須要一個個來,不能同時處理一個數據
13         i.join()
14     print(md)
技術分享

進程池

開多進程是為了並發,通常有幾個cpu核心就開幾個進程,但是進程開多了會影響效率,主要體現在切換的開銷,所以引入進程池限制進程的數量。

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那麽程序就會等待,直到進程池中有可用進程為止。

示例:

技術分享
 1 from multiprocessing import Pool
 2 import time
 3 
 4 def foo(n):
 5     print(n)
 6     time.sleep(1)
 7 
 8 if __name__ == ‘__main__‘:
 9     pool_obj=Pool(5)    #
10     for i in range(47):
11         # pool_obj.apply_async(func=foo,args=(i,))
12         pool_obj.apply(func=foo,args=(i,))    #子進程的生成是靠進程池對象維護的
13         # apply同步,子進程一個個執行
14         # apply_async異步,多個子進程一起執行
15     pool_obj.close()
16     pool_obj.join()
17     print(‘ending‘)
技術分享

常用方法:

技術分享
pool_obj.apply(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。需要強調的是:此操作並不會在所有池工作進程中並執行func函數。如果要通過不同參數並發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()
pool_obj.apply_async(func [, args [, kwargs]]):在一個池工作進程中執行func(*args,**kwargs),然後返回結果。此方法的結果是AsyncResult類的實例,callback是可調用對象,接收輸入參數。當func的結果變為可用時,將理解傳遞給callback。callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果。
pool_obj.close():關閉進程池,防止進一步操作。如果所有操作持續掛起,它們將在工作進程終止前完成
pool_obj.jion():等待所有工作進程退出。此方法只能在close()或teminate()之後調用
技術分享

其他方法:

技術分享
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具有以下方法
obj.get():返回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間內還沒有到達,將引發一場。如果遠程操作中引發了異常,它將在調用此方法時再次被引發。
obj.ready():如果調用完成,返回True
obj.successful():如果調用完成且沒有引發異常,返回True,如果在結果就緒之前調用此方法,引發異常
obj.wait([timeout]):等待結果變為可用。
obj.terminate():立即終止所有工作進程,同時不執行任何清理或結束任何掛起工作。如果p被垃圾回收,將自動調用此函數
技術分享

協程

協程:是單線程下的並發,又稱微線程,纖程。英文名Coroutine。

一句話說明什麽是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序自己控制調度的。

協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

註意:

  1. python的線程屬於內核級別的,即由操作系統控制調度(如單線程一旦遇到io就被迫交出cpu執行權限,切換其他線程運行)

  2. 單線程內開啟協程,一旦遇到io,從應用程序級別(而非操作系統)控制切換

協程優點:

  1. 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級

  2. 單線程內就可以實現並發的效果,最大限度地利用cpu

協程缺點:

  1.協程的本質是單線程下,無法利用多核,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程

  2.協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程

yield實現協程並發

技術分享
 1 import time
 2 def consumer():
 3     r=‘‘
 4     while True:
 5         n=yield r
 6         if not n:
 7             return
 8         print(‘[CONSUMER] ←← Consuming %s...‘ % n)
 9         time.sleep(1)
10         r=‘200 Ok‘
11 
12 def produce(c):
13     next(c) #1.啟動生成器
14     n=0
15     while n < 5:
16         n=n+1
17         print(‘[PRODUCER] →→ Producing %s...‘ % n)
18         cr=c.send(n)    
19         #2.將n傳入到consumer的對象,yield接收到傳入值開始執行代碼,遇到yield執行代碼返回r的值
20         print(‘[PRODUCER] Consumer return: %s‘ % cr)
21     #3.produce沒有值了,關閉整個過程
22     c.close()
23 
24 if __name__ == ‘__main__‘:
25     c=consumer()    #生成生成器對象
26     produce(c)      #執行調用
技術分享

greenlet框架實現協程(封裝yield的基礎庫)

greenlet機制的主要思想是:生成器函數或者協程函數中的yield語句掛起函數的執行,直到稍後使用next()或send()操作進行恢復為止。可以使用一個調度器循環在一組生成器函數之間協作多個任務。greentlet是python中實現我們所謂的"Coroutine(協程)"的一個基礎庫。

示例1:

技術分享
 1 from greenlet import  greenlet
 2 def foo():
 3     print(‘ok1‘)
 4     g2.switch() #阻斷
 5     print(‘ok3‘)
 6     g2.switch()
 7 def bar():
 8     print(‘ok2‘)
 9     g1.switch()
10     print(‘ok4‘)
11 
12 g1=greenlet(foo)    #生成foo函數的greenlet對象
13 g2=greenlet(bar)    #生成bar函數的greenlet對象
14 g1.switch() #1、執行g1對象,打印ok1
15             #2、遇到g2.switch(),轉到g2執行打印ok2
16             #3、遇到g1.switch(),轉到g1的阻斷處繼續執行打印ok3
17             #4、遇到g2.switch(),轉到g2執行打印ok4
技術分享

示例2:

技術分享
 1 def eat(name):
 2     print(‘%s eat food 1‘ %name)
 3     gr2.switch(‘bob‘)
 4     print(‘%s eat food 2‘ %name)
 5     gr2.switch()
 6 def play_phone(name):
 7     print(‘%s play 1‘ %name)
 8     gr1.switch()
 9     print(‘%s play 2‘ %name)
10 
11 gr1=greenlet(eat)
12 gr2=greenlet(play_phone)
13 gr1.switch(name=‘natasha‘)#可以在第一次switch時傳入參數,以後都不需要
技術分享

這種方法不會節省時間,因為不是io操作,而greenlet遇到io操作不會跳轉,仍然要io阻斷

基於greenlet框架的高級庫gevent模塊

gevent是第三方庫,通過greenlet實現協程,其基本思想是:

當一個greenlet遇到IO操作時,比如訪問網絡,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由於IO操作非常耗時,經常使程序處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO。

由於切換是在IO操作時自動完成,所以gevent需要修改Python自帶的一些標準庫,這一過程在啟動時通過monkey patch完成:

簡單示例:

技術分享
 1 import gevent
 2 def foo():
 3     print(‘ok1‘)
 4     gevent.sleep(4) #模擬io操作
 5     print(‘ok3‘)
 6 def bar():
 7     print(‘ok2‘)
 8     gevent.sleep(2)
 9     print(‘ok4‘)
10 
11 g1=gevent.spawn(foo)
12 g2=gevent.spawn(bar)
13 gevent.joinall([g1,g2]) #全部阻塞,或者單獨一個個join
技術分享

spawn括號內第一個參數是函數名,如foo,後面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數foo的

註意:

gevent.sleep(4)模擬的是gevent可以識別的io阻塞,

而time.sleep(2)或其他的阻塞,gevent是不能直接識別的需要用下面一行代碼,打補丁,就可以識別了

1 #補丁
2 from gevent import monkey
3 monkey.patch_all()

必須放到被打補丁者的前面,如time,socket模塊之前

或者我們幹脆記憶成:要用gevent,需要將補丁放到文件的開頭

爬蟲示例:

技術分享
 1 from gevent import monkey;monkey.patch_all()
 2 import gevent
 3 import requests
 4 import time
 5 
 6 def get_page(url):
 7     print(‘GET: %s‘ %url)
 8     response=requests.get(url)
 9     if response.status_code == 200:
10         print(‘%d bytes received from %s‘ %(len(response.text),url))
11 
12 
13 start_time=time.time()
14 gevent.joinall([
15     gevent.spawn(get_page,‘https://www.python.org/‘),
16     gevent.spawn(get_page,‘https://www.yahoo.com/‘),
17     gevent.spawn(get_page,‘https://github.com/‘),
18 ])
19 stop_time=time.time()
20 print(‘run time is %s‘ %(stop_time-start_time))
技術分享

技術分享
gevent是一個基於協程(coroutine)的Python網絡函數庫,通過使用greenlet提供了一個在libev事件循環頂部的高級別並發API。

主要特性有以下幾點:

<1> 基於libev的快速事件循環,Linux上面的是epoll機制

<2> 基於greenlet的輕量級執行單元

<3> API復用了Python標準庫裏的內容

<4> 支持SSL的協作式sockets

<5> 可通過線程池或c-ares實現DNS查詢

<6> 通過monkey patching功能來使得第三方模塊變成協作式

gevent.spawn()方法spawn一些jobs,然後通過gevent.joinall將jobs加入到微線程執行隊列中等待其完成,設置超時為2秒。執行後的結果通過檢查gevent.Greenlet.value值來收集。


===========================二
1、關於Linux的epoll機制:

epoll是Linux內核為處理大批量文件描述符而作了改進的poll,是Linux下多路復用IO接口select/poll的
增強版本,它能顯著提高程序在大量並發連接中只有少量活躍的情況下的系統CPU利用率。epoll的優點:

(1)支持一個進程打開大數目的socket描述符。select的一個進程所打開的FD由FD_SETSIZE的設置來限定,而epoll沒有這個限制,它所支持的FD上限是
最大可打開文件的數目,遠大於2048。

(2)IO效率不隨FD數目增加而線性下降:由於epoll只會對“活躍”的socket進行操作,於是,只有”活躍”的socket才會主動去調用 callback函數,其他
idle狀態的socket則不會。

(3)使用mmap加速內核與用戶空間的消息傳遞。epoll是通過內核於用戶空間mmap同一塊內存實現的。

(4)內核微調。

2、libev機制

提供了指定文件描述符事件發生時調用回調函數的機制。libev是一個事件循環器:向libev註冊感興趣的事件,比如socket可讀事件,libev會對所註冊的事件
的源進行管理,並在事件發生時觸發相應的程序。

===========================三


‘’‘

import gevent

            from gevent import socket

            urls = [‘www.google.com.hk’,’www.example.com’, ‘www.python.org’ ]

            jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]

            gevent.joinall(jobs, timeout=2)

            [job.value for job in jobs]


[‘74.125.128.199’, ‘208.77.188.166’, ‘82.94.164.162’]

            ’‘’

gevent.spawn()方法spawn一些jobs,然後通過gevent.joinall將jobs加入到微線程執行隊列中等待其完成,設置超時為2秒。執行後的結果通過檢查gevent.Greenlet.value值來收集。gevent.socket.gethostbyname()函數與標準的socket.gethotbyname()有相同的接口,但它不會阻塞整個解釋器,因此會使得其他的greenlets跟隨著無阻的請求而執行。

Monket patching

Python的運行環境允許我們在運行時修改大部分的對象,包括模塊、類甚至函數。雖然這樣做會產生“隱式的副作用”,而且出現問題很難調試,但在需要修改Python本身的基礎行為時,Monkey patching就派上用場了。Monkey patching能夠使得gevent修改標準庫裏面大部分的阻塞式系統調用,包括socket,ssl,threading和select等模塊,而變成協作式運行。



from gevent import monkey ;

monkey . patch_socket ()

import urllib2



通過monkey.patch_socket()方法,urllib2模塊可以使用在多微線程環境,達到與gevent共同工作的目的。

事件循環

不像其他網絡庫,gevent和eventlet類似, 在一個greenlet中隱式開始事件循環。沒有必須調用run()或dispatch()的反應器(reactor),在twisted中是有 reactor的。當gevent的API函數想阻塞時,它獲得Hub實例(執行時間循環的greenlet),並切換過去。如果沒有集線器實例則會動態 創建。

libev提供的事件循環默認使用系統最快輪詢機制,設置LIBEV_FLAGS環境變量可指定輪詢機制。LIBEV_FLAGS=1為select, LIBEV_FLAGS = 2為poll, LIBEV_FLAGS = 4為epoll,LIBEV_FLAGS = 8為kqueue。

Libev的API位於gevent.core下。註意libev API的回調在Hub的greenlet運行,因此使用同步greenlet的API。可以使用spawn()和Event.set()等異步API。
技術分享

Python開發基礎--- 進程間通信、進程池、協程