1. 程式人生 > >python -- 異步編程

python -- 異步編程

協程 復雜 patch odi block wait 多核 del 恢復

我們在生產中,常用的處理任務模型有三種:
  單線程
  多線程
  異步(單線程內,串行,特點是遇到阻塞(或IO之類的)就切換到其他任務)

其中一般如果都符合要求,那麽異步是最好的選擇。
  單線程:遇到阻塞整個程序都等待
  多線程:以空間換取時間,且有時候伴隨著數據安全問題(通常加鎖來處理)
  異步:在單個線程內,且是串行執行,但是一旦遇到阻塞(IO之類的),就會切換到線程內的其他任務(把IO操作交給操作系統處理)

當我們面對如下的環境時,事件驅動模型(異步模型)通常是一個好的選擇(and):
  1、程序中有許多任務
  2、任務之間高度獨立(因此它們不需要互相通信,或者等待彼此)
  3、在等待事件到來時,某些任務會阻塞。

常用的異步IO模型:select、poll、Epoll (windows下只支持select)
nginx就是Epoll模型實現的。單線程,多進程(為了利用多核,單線程只能跑在單個cup核心上)

前面我們說了多線程與多進程,總結其特點就是:

  單線程串行執行,遇到IO就阻塞,效率低。

  多線程並發執行,遇到IO就切換(用空間換取執行時間),效率上去了,但是耗費資源,操作復雜。

針對以上問題,出現了一種新的替代品,協程。

協程  

  協程又名微線程,纖程。協程是一種用戶態的輕量級線程(協程由用戶切換,線程由cpu時間片控制切換)協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。跟yield過程差不多


協程的定義標準:
  1、必須在一個單線程裏面實現並發
  2、修改共享數據不需要加鎖(因為協程是串行的)
  3、用戶程序裏自己保存多個控制流的上下文棧
  4、一個協程遇到IO操作(阻塞也一樣)就自動切換到其他協程

協程的好處:
  1、無需線程上下文切換的開銷
  2、無需原子操作鎖定及同步的開銷
       "原子操作(atomic operation)是不需要synchronized",所謂原子操作是指不會被線程調度機制打斷的操作;這種操作一旦開始,就一直運行到結束,中間不會有任何 context switch (切換到另一個線程)。原子操作可以是一個步驟,也可以是多個操作步驟,但是其順序是不可以被打亂,或者切割掉只執行部分。視作整體是原子性的核心。

  3、方便切換控制流,簡化編程模型
  4、高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。

缺點:
  1、無法利用多核資源:協程的本質是個單線程,它不能同時將單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  2、進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序

協程例子

  1、用yield實現協程操作的例子:

 1 import time
 2 
 3 def consumer(name):
 4     print("\033[32;1m ---> starting eating baozi... \033[0m")
 5     while True:
 6         new_baozi = yield
 7         print("[%s] is eating baozi %s " %(name,new_baozi))
 8         # time.sleep(1)  #在yield裏面並沒有實現阻塞切換
 9 
10 def producer():
11     c = con1.__next__()
12     c = con2.__next__()
13     n = 0
14     while n < 5:
15         n += 1
16         print("\033[32;1m [producer]\033[0m is making baozi %s " % n)
17         con1.send(n)
18         con2.send(n)     #用send可以想yield發送數據
19 
20 if __name__ == __main__:
21     con1 = consumer(c1)
22     con2 = consumer(c2)
23     p = producer()

  執行結果

 ---> starting eating baozi... 
 ---> starting eating baozi... 
 [producer] is making baozi 1 
[c1] is eating baozi 1 
[c2] is eating baozi 1 
 [producer] is making baozi 2 
[c1] is eating baozi 2 
[c2] is eating baozi 2 
 [producer] is making baozi 3 
[c1] is eating baozi 3 
[c2] is eating baozi 3 
 [producer] is making baozi 4 
[c1] is eating baozi 4 
[c2] is eating baozi 4 
 [producer] is making baozi 5 
[c1] is eating baozi 5 
[c2] is eating baozi 5 

2、使用gevent實現協程

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 import gevent
 5 import time
 6 
 7 def func1():
 8     print("\033[32;1m func1函數的第1部分... \033[0m")
 9     gevent.sleep(2)
10     # time.sleep(2)   #用time.sleep(2)是不行的
11     print("\033[32;1m func1函數的第2部分... \033[0m")
12 
13 def func2():
14     print("\033[31;1m func2函數的第1部分... \033[0m")
15     gevent.sleep(1)
16     # time.sleep(1)
17     print("\033[31;1m func2函數的第2部分... \033[0m")
18 
19 def func3():
20     print("\033[34;1m func3函數的第1部分... \033[0m")
21     gevent.sleep(3)
22     # time.sleep(3)
23     print("\033[34;1m func3函數的第2部分... \033[0m")
24 
25 
26 if __name__ == __main__:
27     gevent.joinall([
28         gevent.spawn(func1),
29         gevent.spawn(func2),
30         gevent.spawn(func3),
31     ])

  執行結果

 func1函數的第1部分... 
 func2函數的第1部分... 
 func3函數的第1部分... 
 func2函數的第2部分... 
 func1函數的第2部分... 
 func3函數的第2部分... 

3、協程結合urllib模塊爬網站

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 from gevent import monkey
 5 monkey.patch_all()  #遇到阻塞就切換全靠它(作用:把要用到的接口全部變為非阻塞模式)
 6 
 7 import gevent
 8 from urllib.request import urlopen
 9 
10 def f(url):
11     print("GET: %s " %url)
12     resp = urlopen(url)
13     data = resp.read()
14     print("%s bytes received from %s " %(len(data),url))
15 
16 if __name__ == __main__:
17     gevent.joinall([
18         gevent.spawn(f, http://www.cdu.edu.cn/),
19         gevent.spawn(f,https://www.python.org/),
20         gevent.spawn(f,https://www.jd.com/),
21         gevent.spawn(f,https://www.vip.com/)
22     ])

  執行結果

GET: http://www.cdu.edu.cn/ 
GET: https://www.python.org/ 
GET: https://www.jd.com/ 
GET: https://www.vip.com/ 
137584 bytes received from https://www.jd.com/ 
69500 bytes received from https://www.vip.com/ 
47984 bytes received from http://www.cdu.edu.cn/ 
47695 bytes received from https://www.python.org/ 

Process finished with exit code 0

4、利用協程實現高並發服務器

技術分享
 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 import gevent
 5 import socket
 6 from gevent import monkey; monkey.patch_all()
 7 
 8 def server(port):
 9     s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
10     s.bind((0.0.0.0,port))
11     s.listen(5000)
12 
13     while True:
14         # print("\033[32;1m server is waiting... \033[0m")
15         print(" server is waiting... ")
16         conn, addr = s.accept()
17         gevent.spawn(handle_request,conn)
18 
19 def handle_request(conn):
20     try:
21         while True:
22             data = conn.recv(1024)
23             print("recvied:",data.decode(utf8))
24             conn.send(data)
25             if not data:
26                 conn.shutdown(socket.SHUT_WR)
27 
28     except Exception as ex:
29         print(ex)
30 
31     finally:
32         conn.close()
33 
34 
35 if __name__ == __main__:
36     server(9999)
server 技術分享
 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 import socket
 5 import threading
 6 
 7 def run(n):
 8     ‘‘‘這裏是啟動多線程,然後每個線程死循環發包給服務器,測試協程服務器的高並發和穩定性‘‘‘
 9     while True:
10         # msg = input(">>:").strip()
11         # if len(msg) == 0:continue
12         # if msg == ‘q‘:break
13         msg = hello %s %n
14         sk.send(bytes(msg,utf8))
15         data = sk.recv(1024)
16         print("Received:",data.decode(utf8))
17 
18     sk.close()
19 
20 if __name__ == __main__:
21     IP_Port = (127.0.0.1, 9999)
22     sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
23     sk.connect(IP_Port)
24 
25     thread_list = []
26     for i in range(2000):
27         t = threading.Thread(target=run,args=[i,])
28         t.start()
29         thread_list.append(t)
30 
31     for thread in thread_list:
32         thread.join()
client

python -- 異步編程