1. 程式人生 > >python併發程式設計之多程序、多執行緒、非同步和協程

python併發程式設計之多程序、多執行緒、非同步和協程

一、多執行緒

  多執行緒就是允許一個程序記憶體在多個控制權,以便讓多個函式同時處於啟用狀態,從而讓多個函式的操作同時執行。即使是單CPU的計算機,也可以通過不停地在不同執行緒的指令間切換,從而造成多執行緒同時執行的效果。

  多執行緒相當於一個併發(concunrrency)系統。併發系統一般同時執行多個任務。如果多個任務可以共享資源,特別是同時寫入某個變數的時候,就需要解決同步的問題,比如多執行緒火車售票系統:兩個指令,一個指令檢查票是否賣完,另一個指令,多個視窗同時賣票,可能出現賣出不存在的票。

  在併發情況下,指令執行的先後順序由核心決定。同一個執行緒內部,指令按照先後順序執行,但不同執行緒之間的指令很難說清除哪一個會先執行。因此要考慮多執行緒同步的問題。同步(synchronization)是指在一定的時間內只允許某一個執行緒訪問某個資源。 

1、thread模組

2、threading模組

threading.Thread 建立一個執行緒。

給判斷是否有餘票和賣票,加上互斥鎖,這樣就不會造成一個執行緒剛判斷沒有餘票,而另外一個執行緒就執行賣票操作。

複製程式碼
#! /usr/bin/python
#-* coding: utf-8 -*
# __author__ ="tyomcat"
import threading
import time
import os

def booth(tid):
    global i
    global lock
    while True:
        lock.acquire()
        
if i!=0: i=i-1 print "視窗:",tid,",剩餘票數:",i time.sleep(1) else: print "Thread_id",tid,"No more tickets" os._exit(0) lock.release() time.sleep(1) i = 100 lock=threading.Lock() for k in range(10): new_thread = threading.Thread(target=booth,args=(k,)) new_thread.start()
複製程式碼

二、協程(又稱微執行緒,纖程)

  協程,與執行緒的搶佔式排程不同,它是協作式排程。協程也是單執行緒,但是它能讓原來要使用非同步+回撥方式寫的非人類程式碼,可以用看似同步的方式寫出來。

1、協程在python中可以由生成器(generator)來實現。

首先要對生成器和yield有一個紮實的理解.

  呼叫一個普通的python函式,一般是從函式的第一行程式碼開始執行,結束於return語句、異常或者函式執行(也可以認為是隱式地返回了None)。

一旦函式將控制權交還給呼叫者,就意味著全部結束。而有時可以建立能產生一個序列的函式,來“儲存自己的工作”,這就是生成器(使用了yield關鍵字的函式)。

能夠“產生一個序列”是因為函式並沒有像通常意義那樣返回。return隱含的意思是函式正將執行程式碼的控制權返回給函式被呼叫的地方。而"yield"的隱含意思是控制權的轉移是臨時和自願的,我們的函式將來還會收回控制權。

看一下生產者/消費者的例子:

複製程式碼
#! /usr/bin/python
#-* coding: utf-8 -*
# __author__ ="tyomcat"
import time
import sys
# 生產者
def produce(l):
    i=0
    while 1:
        if i < 10:
            l.append(i)
            yield i
            i=i+1
            time.sleep(1)
        else:
            return     
# 消費者
def consume(l):
    p = produce(l)
    while 1:
        try:
            p.next()
            while len(l) > 0:
                print l.pop()
        except StopIteration:
            sys.exit(0)
if __name__ == "__main__":
    l = []
    consume(l)
複製程式碼

當程式執行到produce的yield i時,返回了一個generator並暫停執行,當我們在custom中呼叫p.next(),程式又返回到produce的yield i 繼續執行,這樣 l 中又append了元素,然後我們print l.pop(),直到p.next()引發了StopIteration異常。

2、Stackless Python

3、greenlet模組

  基於greenlet的實現則效能僅次於Stackless Python,大致比Stackless Python慢一倍,比其他方案快接近一個數量級。其實greenlet不是一種真正的併發機制,而是在同一執行緒內,在不同函式的執行程式碼塊之間切換,實施“你執行一會、我執行一會”,並且在進行切換時必須指定何時切換以及切換到哪。

4、eventlet模組

三、多程序

1、子程序(subprocess包)

  在python中,通過subprocess包,fork一個子程序,並執行外部程式。

  呼叫系統的命令的時候,最先考慮的os模組。用os.system()和os.popen()來進行操作。但是這兩個命令過於簡單,不能完成一些複雜的操作,如給執行的命令提供輸入或者讀取命令的輸出,判斷該命令的執行狀態,管理多個命令的並行等等。這時subprocess中的Popen命令就能有效的完成我們需要的操作

>>>import subprocess
>>>command_line=raw_input()
ping -c 10 www.baidu.com
>>>args=shlex.split(command_line)
>>>p=subprocess.Popen(args)

  利用subprocess.PIPE將多個子程序的輸入和輸出連線在一起,構成管道(pipe):

import subprocess
child1 = subprocess.Popen(["ls","-l"], stdout=subprocess.PIPE)
child2 = subprocess.Popen(["wc"], stdin=child1.stdout,stdout=subprocess.PIPE)
out = child2.communicate()
print(out)

communicate() 方法從stdout和stderr中讀出資料,並輸入到stdin中。

2、多程序(multiprocessing包)

  (1)、multiprocessing包是Python中的多程序管理包。與threading.Thread類似,它可以利用multiprocessing.Process物件來建立一個程序。

  程序池 (Process Pool)可以建立多個程序。

  apply_async(func,args)  從程序池中取出一個程序執行func,args為func的引數。它將返回一個AsyncResult的物件,你可以對該物件呼叫get()方法以獲得結果。

  close()  程序池不再建立新的程序

  join()   wait程序池中的全部程序。必須對Pool先呼叫close()方法才能join。

複製程式碼
#! /usr/bin/env python
# -*- coding:utf-8   -*-
# __author__ == "tyomcat"
# "我的電腦有4個cpu"

from multiprocessing import Pool
import os, time

def long_time_task(name):
    print 'Run task %s (%s)...' % (name, os.getpid())
    start = time.time()
    time.sleep(3)
    end = time.time()
    print 'Task %s runs %0.2f seconds.' % (name, (end - start))

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Pool()
    for i in range(4):
        p.apply_async(long_time_task, args=(i,))
    print 'Waiting for all subprocesses done...'
    p.close()
    p.join()
    print 'All subprocesses done.'
複製程式碼

(2)、多程序共享資源


通過共享記憶體和Manager物件:用一個程序作為伺服器,建立Manager來真正存放資源。

其它的程序可以通過引數傳遞或者根據地址來訪問Manager,建立連線後,操作伺服器上的資源
複製程式碼
#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ == "tyomcat"

from multiprocessing import Queue,Pool
import multiprocessing,time,random

def write(q):

for value in ['A','B','C','D']:
print "Put %s to Queue!" % value
q.put(value)
time.sleep(random.random())


def read(q,lock):
while True:
lock.acquire()
if not q.empty():
value=q.get(True)
print "Get %s from Queue" % value
time.sleep(random.random())
else:
break
lock.release()

if __name__ == "__main__":
manager=multiprocessing.Manager()
q=manager.Queue()
p=Pool()
lock=manager.Lock()
pw=p.apply_async(write,args=(q,))
pr=p.apply_async(read,args=(q,lock))
p.close()
p.join()
print
print "所有資料都寫入並且讀完"
複製程式碼

四、非同步

  無論是執行緒還是程序,使用的都是同步進位制,當發生阻塞時,效能會大幅度降低,無法充分利用CPU潛力,浪費硬體投資,更重要造成軟體模組的鐵板化,緊耦合,無法切割,不利於日後擴充套件和變化。

  不管是程序還是執行緒,每次阻塞、切換都需要陷入系統呼叫(system call),先讓CPU跑作業系統的排程程式,然後再由排程程式決定該跑哪一個程序(執行緒)。多個執行緒之間在一些訪問互斥的程式碼時還需要加上鎖,

  現下流行的非同步server都是基於事件驅動的(如nginx)。

  非同步事件驅動模型中,把會導致阻塞的操作轉化為一個非同步操作,主執行緒負責發起這個非同步操作,並處理這個非同步操作的結果。由於所有阻塞的操作都轉化為非同步操作,理論上主執行緒的大部分時間都是在處理實際的計算任務,少了多執行緒的排程時間,所以這種模型的效能通常會比較好。