1. 程式人生 > >網編的訊號量,管道,事件,程序池

網編的訊號量,管道,事件,程序池

管道(from multiprocessing import Process,Pipe):
管道的介紹:
 1 #建立管道的類:
 2 Pipe([duplex]):在程序之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連線物件,強調一點:必須在產生Process物件之前產生管道
 3 #引數介紹:
 4 dumplex:預設管道是全雙工的,如果將duplex射成False,conn1只能用於接收,conn2只能用於傳送。
 5 #主要方法:
 6     conn1.recv():接收conn2.send(obj)傳送的物件。如果沒有訊息可接收,recv方法會一直阻塞。如果連線的另外一端已經關閉,那麼recv方法會丟擲EOFError。
7 conn1.send(obj):通過連線傳送物件。obj是與序列化相容的任意物件 8 #其他方法: 9 conn1.close():關閉連線。如果conn1被垃圾回收,將自動呼叫此方法 10 conn1.fileno():返回連線使用的整數檔案描述符 11 conn1.poll([timeout]):如果連線上的資料可用,返回True。timeout指定等待的最長時限。如果省略此引數,方法將立即返回結果。如果將timeout射成None,操作將無限期地等待資料到達。 12 13 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法傳送的一條完整的位元組訊息。maxlength指定要接收的最大位元組數。如果進入的訊息,超過了這個最大值,將引發IOError異常,並且在連線上無法進行進一步讀取。如果連線的另外一端已經關閉,再也不存在任何資料,將引發EOFError異常。
14 conn.send_bytes(buffer [, offset [, size]]):通過連線傳送位元組資料緩衝區,buffer是支援緩衝區介面的任意物件,offset是緩衝區中的位元組偏移量,而size是要傳送位元組數。結果資料以單條訊息的形式發出,然後呼叫c.recv_bytes()函式進行接收 15 16 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的位元組訊息,並把它儲存在buffer物件中,該物件支援可寫入的緩衝區介面(即bytearray物件或類似的物件)。offset指定緩衝區中放置訊息處的位元組位移。返回值是收到的位元組數。如果訊息長度大於可用的緩衝區空間,將引發BufferTooShort異常。

方法總結:

Conn1,conn2 = Pipe() #建立一個管道物件,全雙工,返回管道的兩端,但是一端傳送的訊息,只能另外一端接收,自己這一端是不能接收的
Conn1.recv() #接收
Conn1.send() #傳送
資料接收一次就沒有了
圖例:

關於管道會造成資料不安全問題的官方解釋:
    The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
    
由Pipe方法返回的兩個連線物件表示管道的兩端。每個連線物件都有send和recv方法(除其他之外)。注意,如果兩個程序(或執行緒)試圖同時從管道的同一端讀取或寫入資料,那麼管道中的資料可能會損壞。當然,在使用管道的不同端部的過程中不存在損壞風險。

 

多個消費者競爭會出現資料不安全的問題的解決方案:加鎖
 1 from multiprocessing import Process,Pipe,Lock
 2 
 3 def consumer(p,name,lock):
 4     produce, consume=p
 5     produce.close()
 6     while True:
 7         lock.acquire()
 8         baozi=consume.recv()
 9         lock.release()
10         if baozi:
11             print('%s 收到包子:%s' %(name,baozi))
12         else:
13             consume.close()
14             break
15 
16 
17 def producer(p,n):
18     produce, consume=p
19     consume.close()
20     for i in range(n):
21         produce.send(i)
22     produce.send(None)
23     produce.send(None)
24     produce.close()
25 
26 if __name__ == '__main__':
27     produce,consume=Pipe()
28     lock = Lock()
29     c1=Process(target=consumer,args=((produce,consume),'c1',lock))
30     c2=Process(target=consumer,args=((produce,consume),'c2',lock))
31     p1=Process(target=producer,args=((produce,consume),10))
32     c1.start()
33     c2.start()
34     p1.start()
35 
36     produce.close()
37     consume.close()
38 
39     c1.join()
40     c2.join()
41     p1.join()
42     print('主程序')

 

事件(from multiprocessing import Process,Event):
  E = Event()  #初識狀態是false
  E.wait() 當事件物件e的狀態為false的時候,在wait的地方會阻塞程式,當物件狀態為true的時候,直接在這個wait地方繼續往下執行
  E.set() 將事件物件的狀態改為true,
  E.is_set() 檢視狀態
  E.clear() 將事件物件的狀態改為false
時間方法的使用:
 1 from multiprocessing import Process,Semaphore,Event
 2 import time,random
 3 
 4 e = Event() #建立一個事件物件
 5 print(e.is_set())  #is_set()檢視一個事件的狀態,預設為False,可通過set方法改為True
 6 print('look here!')
 7 # e.set()          #將is_set()的狀態改為True。
 8 # print(e.is_set())#is_set()檢視一個事件的狀態,預設為False,可通過set方法改為Tr
 9 # e.clear()        #將is_set()的狀態改為False
10 # print(e.is_set())#is_set()檢視一個事件的狀態,預設為False,可通過set方法改為Tr
11 e.wait()           #根據is_set()的狀態結果來決定是否在這阻塞住,is_set()=False那麼就阻塞,is_set()=True就不阻塞
12 print('give me!!')
13 
14 #set和clear  修改事件的狀態 set-->True   clear-->False
15 #is_set     用來檢視一個事件的狀態
16 #wait       依據事件的狀態來決定是否阻塞 False-->阻塞  True-->不阻塞

通過事件來模擬紅綠燈:

 1 from multiprocessing import Process, Event
 2 import time, random
 3 
 4 def car(e, n):
 5     while True:
 6         if not e.is_set():  # 程序剛開啟,is_set()的值是Flase,模擬訊號燈為紅色
 7             print('\033[31m紅燈亮\033[0m,car%s等著' % n)
 8             e.wait()    # 阻塞,等待is_set()的值變成True,模擬訊號燈為綠色
 9             print('\033[32m車%s 看見綠燈亮了\033[0m' % n)
10             time.sleep(random.randint(2,4))
11             if not e.is_set():   #如果is_set()的值是Flase,也就是紅燈,仍然回到while語句開始
12                 continue
13             print('車開遠了,car', n)
14             break
15 
16 # def police_car(e, n):
17 #     while True:
18 #         if not e.is_set():# 程序剛開啟,is_set()的值是Flase,模擬訊號燈為紅色
19 #             print('\033[31m紅燈亮\033[0m,car%s等著' % n)
20 #             e.wait(0.1) # 阻塞,等待設定等待時間,等待0.1s之後沒有等到綠燈就闖紅燈走了
21 #             if not e.is_set():
22 #                 print('\033[33m紅燈,警車先走\033[0m,car %s' % n)
23 #             else:
24 #                 print('\033[33;46m綠燈,警車走\033[0m,car %s' % n)
25 #         break
26 
27 def traffic_lights(e, inverval):
28     while True:
29         time.sleep(inverval)
30         if e.is_set():
31             print('######', e.is_set())
32             e.clear()  # ---->將is_set()的值設定為False
33         else:
34             e.set()    # ---->將is_set()的值設定為True
35             print('***********',e.is_set())
36 
37 
38 if __name__ == '__main__':
39     e = Event()
40     for i in range(10):
41         p=Process(target=car,args=(e,i,))  # 建立10個程序控制10輛車
42         time.sleep(random.random(1, 3))    #車不是一下子全過來
43         p.start()
44 
45     # for i in range(5):
46     #     p = Process(target=police_car, args=(e, i,))  # 建立5個程序控制5輛警車
47     #     p.start()
48 
49     #訊號燈必須是單獨的程序,因為它不管你車開到哪了,我就按照我紅綠燈的規律來閃爍變換,對吧
50     t = Process(target=traffic_lights, args=(e, 5))  # 建立一個程序控制紅綠燈
51     t.start()
52 
53     print('預備~~~~開始!!!')
訊號量(from multiprocessing import Process,Semaphore):
互斥鎖同時只允許一個執行緒更改資料,而訊號量Semaphore是同時允許一定數量的執行緒更改資料 。
假設商場裡有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。
實現:
訊號量同步基於內部計數器,每呼叫一次acquire(),計數器減1;每呼叫一次release(),計數器加1.當計數器為0時,acquire()呼叫被阻塞。這是迪科斯徹(Dijkstra)訊號量概念P()和V()的Python實現。訊號量同步機制適用於訪問像伺服器這樣的有限資源。
訊號量與程序池的概念很像,但是要區分開,訊號量涉及到加鎖的概念
 1 from multiprocessing import Process,Semaphore
 2 import time,random
 3 
 4 def go_ktv(sem,user):
 5     sem.acquire()
 6     print('%s 佔到一間ktv小屋' %user)
 7     time.sleep(random.randint(0,3)) #模擬每個人在ktv中待的時間不同
 8     sem.release()
 9 
10 if __name__ == '__main__':
11     sem=Semaphore(4)
12     p_l=[]
13     for i in range(13):
14         p=Process(target=go_ktv,args=(sem,'user%s' %i,))
15         p.start()
16         p_l.append(p)
17 
18     for i in p_l:
19         i.join()
20     print('============》')
訊號量中的常用方法:
  S = semphore(4),內部維護了一個計數器,acquire-1,release+1,為0的時候,其他的程序都要在acquire之前等待
  S.acquire()
  需要鎖住的程式碼
  S.release()
程序池(from multiprocessing import Process,Pool):

為什麼要有程序池?程序池的概念。

  在程式實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閒時可能只有零星任務。那麼在成千上萬個任務需要被執行的時候,我們就需要去建立成千上萬個程序麼?首先,建立程序需要消耗時間,銷燬程序(空間,變數,檔案資訊等等的內容)也需要消耗時間。第二即便開啟了成千上萬的程序,作業系統也不能讓他們同時執行,維護一個很大的程序列表的同時,排程的時候,還需要進行切換並且記錄每個程序的執行節點,也就是記錄上下文(各種變數等等亂七八糟的東西,雖然你看不到,但是作業系統都要做),這樣反而會影響程式的效率。因此我們不能無限制的根據任務開啟或者結束程序。就看我們上面的一些程式碼例子,你會發現有些程式是不是執行的時候比較慢才出結果,就是這個原因,那麼我們要怎麼做呢?

  在這裡,要給大家介紹一個程序池的概念,定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序並不關閉,而是將程序再放回程序池中繼續等待任務。如果有很多工需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閒程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在執行。這樣不會增加作業系統的排程難度,還節省了開閉程序的時間,也一定程度上能夠實現併發效果

程序池的出現:程序的建立和銷燬是很有消耗的,影響程式碼執行效率

程序池的常用方法:
  Map:非同步提交任務,並且傳參需要可迭代型別的資料,自帶close和join功能
  Res = Apply(f1,args=(i,)) #同步執行任務,必須等任務執行結束才能給程序池提交下一個任務,可以直接拿到返回結果res
  Res_obj = Apply_async(f1,args=(i,)) #非同步提交任務,可以直接拿到結果物件,從結果物件裡面拿結果,要用get方法,get方法會阻塞程式,沒有拿到結果會一直等待   
  Close : 鎖住程序池,防止有其他的新的任務在提交給程序池
  Join : 等待著程序池將自己裡面的任務都執行完
程式碼展示:
 1 import time
 2 from multiprocessing import Process,Pool
 3 
 4 # def f1(n):
 5 #     time.sleep(1)
 6 #     print(n)
 7 
 8 #對比多程序和程序池的效率
 9 def f1(n):
10     for i in range(5):
11         n = n + i
12 if __name__ == '__main__':
13     #統計程序池執行100個任務的時間
14     s_time = time.time()
15     pool = Pool(4)  #裡面這個引數是指定程序池中有多少個程序用的,4表示4個程序,如果不傳引數,預設開啟的程序數一般是cpu的個數
16     # pool.map(f1,[1,2])  #引數資料必須是可迭代的
17     pool.map(f1,range(100))  #引數資料必須是可迭代的,非同步提交任務,自帶join功能
18     e_time = time.time()
19     dif_time = e_time - s_time
20 
21     #統計100個程序,來執行100個任務的執行時間
22     p_s_t = time.time() #多程序起始時間
23     p_list = []
24     for i in range(100):
25         p = Process(target=f1,args=(i,))
26         p.start()
27         p_list.append(p)
28         # p.join()
29     [pp.join() for pp in p_list]
30     p_e_t = time.time()
31     p_dif_t = p_e_t - p_s_t
32     print('程序池的時間:',dif_time)
33     print('多程序的執行時間:',p_dif_t)
34     # 結果:
35     # 程序池的時間: 0.40102291107177734
36     # 多程序的執行時間: 9.247529029846191