1. 程式人生 > >python的程序與執行緒

python的程序與執行緒

 

程序、執行緒的含義?

 

1.什麼是程序?

  程序是指執行中的應用程式,每個程序都有自己獨立的地址空間(記憶體空間)。比如使用者點選桌面的IE瀏覽器,就啟動了一個程序,作業系統就會為該程序分配獨立的地址空間。當用戶再次點選IE瀏覽器,又啟動了一個程序,作業系統將為新的程序分配新的獨立的地址空間。多程序就是“多工”,就像使用電腦時同時開啟瀏覽器上網、開啟播放器聽歌、後臺還默默執行著防毒軟體一樣。現代作業系統如Mac OS X,UNIX,Linux,Windows等都支援多程序,每啟動一個程序,作業系統便為該程序分配一個獨立的記憶體空間。

2.什麼是執行緒?

  執行緒是程序中的一個實體,是被系統獨立排程和分派的基本單位。一個程序可以有一個執行緒,也可以有多個執行緒。
  執行緒自己不擁有獨立的系統資源,只擁有一點在執行中必不可少的資源,它可與同屬一個程序的其它執行緒共享當前程序所擁有的全部資源。
  一個執行緒可以建立和撤消另一個執行緒,同一程序中的多個執行緒之間可以併發執行。
  執行緒有就緒(runnable)、阻塞(blocked)和執行(running)三種基本狀態以及新建(new)和死亡(dead)狀態。

為什麼要有多程序和多執行緒?

  每個程序至少要幹一件事,比如一個編輯器既要打字輸入同時又要檢測打錯的拼寫有時候還要區分一些關鍵字高亮顯示,它們同屬於編輯器這個程序,我們把編輯器作為一個程序,而以上這些工作就是它的子任務,如何實現他們同時工作呢?就是讓每個子任務即執行緒短暫執行交替執行,由於它們彼此之間交替太快了,看起來就像同時執行一樣。(真正的多執行緒需要多核CPU才能實現)

當我們要讓一個python程式執行多個任務時,我們可以用多個程序或多個執行緒來完成我們的任務,他們之間彼此同時交替進行甚至一個任務依賴於另一個任務執行的結果,他們需要相互通訊和協調,所以我們就需要用到多程序和多執行緒程式設計了。

實現多程序和多執行緒

1.多程序

  linux下可使用os模組的fork()。
  Unix/Linux作業系統提供了一個fork()系統呼叫,它非常特殊。普通的函式呼叫,呼叫一次,返回一次,但是fork()呼叫一次,返回兩次,因為作業系統自動把當前程序(稱為父程序)複製了一份(稱為子程序),然後,分別在父程序和子程序內返回。

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
  print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
  print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

 

  windows下可以使用multiprocessing模組
  multiprocessing模組提供了一個Process類來代表一個程序物件,下面的例子演示了啟動一個子程序並等待其結束:

from multiprocessing import Process
import os

# 子程序要執行的程式碼
def run_proc(name):
  print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
  print('Parent process %s.' % os.getpid())
  p = Process(target=run_proc, args=('test',))
  print('Child process will start.')
  p.start()
  p.join()
  print('Child process end.')

 

  建立子程序時,只需要傳入一個執行函式和函式的引數,建立一個Process例項,用start()方法啟動。
  join()方法可以等待子程序結束後再繼續往下執行,通常用於程序間的同步。

 

  Pool
  如果要啟動大量的子程序,可以用程序池的方式批量建立子程序:

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
  print('Run task %s (%s)...' % (name, os.getpid()))
  start = time.time()
  time.sleep(random.random() * 3)
  end = time.time()
  print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
  print('Parent process %s.' % os.getpid())
  p = Pool(4)
  for i in range(5):
  p.apply_async(long_time_task, args=(i,))
  print('Waiting for all subprocesses done...')
  p.close()
  p.join()
  print('All subprocesses done.')

 

  對Pool物件呼叫join()方法會等待所有子程序執行完畢,呼叫join()之前必須先呼叫close(),呼叫close()之後就不能繼續新增新的Process了。

 

  子程序
  很多時候,子程序並不是自身,而是一個外部程序。我們建立了子程序後,還需要控制子程序的輸入和輸出。

  subprocess模組可以讓我們非常方便地啟動一個子程序,然後控制其輸入和輸出。

  下面的例子演示瞭如何在Python程式碼中執行命令nslookup www.python.org,這和命令列直接執行的效果是一樣的:

import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

 

2.多執行緒

  使用threading模組實現多執行緒,Python的執行緒是真正的Posix Thread,而不是模擬出來的執行緒。

import time, threading

def loop():
  print('執行緒 %s 在執行' % threading.current_thread().name)
  n = 0
  while n < 5:
    n = n + 1
    print('執行緒 %s >>> %s' % (threading.current_thread().name, n))
    time.sleep(1)
  print('執行緒 %s 結束.' % threading.current_thread().name)

print('執行緒 %s 在執行' % threading.current_thread().name)
t = threading.Thread(target=loop, name='子執行緒1')
t2 = threading.Thread(target=loop, name='子執行緒2')
t.start()
t2.start()
t.join()
t2.join()
print('執行緒 %s 結束.' % threading.current_thread().name)

 

或者

import time, threading
num=0 lock = threading.Lock() def action_one():   global num   for i in range(3):     lock.acquire()     try:       print("執行緒1 %d"%num)       num+=1       time.sleep(1)     finally:       lock.release() def action_two():   global num   for i in range(3):     lock.acquire()     try:       print("執行緒2 %d"%num)       num+=1       time.sleep(1)     finally:       lock.release() t1 = threading.Thread(target=action_one, name='子執行緒1') t2 = threading.Thread(target=action_two, name='子執行緒2') t1.start() t2.start() t1.join() t2.join()

 

程序之間和執行緒之間的相互協調

1.程序間的通訊:

  Process之間肯定是需要通訊的,作業系統提供了很多機制來實現程序間的通訊。Python的multiprocessing模組包裝了底層的機制,提供了Queue、Pipes等多種方式來交換資料。

  以Queue為例,在父程序中建立兩個子程序,一個往Queue裡寫資料,一個從Queue裡讀資料:

from multiprocessing import Process, Queue
import os, time, random

# 寫資料程序執行的程式碼:
def write(q):
  print('Process to write: %s' % os.getpid())
  for value in ['A', 'B', 'C']:
    print('Put %s to queue...' % value)
    q.put(value)
    time.sleep(random.random())

# 讀資料程序執行的程式碼:
def read(q):
  print('Process to read: %s' % os.getpid())
  while True:
    value = q.get(True)
    print('Get %s from queue.' % value)

if __name__=='__main__':
  # 父程序建立Queue,並傳給各個子程序:
  q = Queue()
  pw = Process(target=write, args=(q,))
  pr = Process(target=read, args=(q,))
  # 啟動子程序pw,寫入:
  pw.start()
  # 啟動子程序pr,讀取:
  pr.start()
  # 等待pw結束:
  pw.join()
  # pr程序裡是死迴圈,無法等待其結束,只能強行終止:
  pr.terminate()

 

  在Unix/Linux下,multiprocessing模組封裝了fork()呼叫,使我們不需要關注fork()的細節。由於Windows沒有fork呼叫,因此,multiprocessing需要“模擬”出fork的效果,父程序所有Python物件都必須通過pickle序列化再傳到子程序去,所有,如果multiprocessing在Windows下呼叫失敗了,要先考慮是不是pickle失敗了。

2.執行緒間通訊

  1.Queue

  使用執行緒佇列有一個要注意的問題是,向佇列中新增資料項時並不會複製此資料項,執行緒間通訊實際上是線上程間傳遞物件引用。如果你擔心物件的共享狀態,那你最好只傳遞不可修改的資料結構(如:整型、字串或者元組)或者一個物件的深拷貝。

  Queue 物件提供一些在當前上下文很有用的附加特性。比如在建立 Queue 物件時提供可選的 size 引數來限制可以新增到佇列中的元素數量。對於“生產者”與“消費者”速度有差異的情況,為佇列中的元素數量新增上限是有意義的。比如,一個“生產者”產生專案的速度比“消費者”“消費”的速度快,那麼使用固定大小的佇列就可以在佇列已滿的時候阻塞佇列,以免未預期的連鎖效應擴散整個程式造成死鎖或者程式執行失常。在通訊的執行緒之間進行“流量控制”是一個看起來容易實現起來困難的問題。如果你發現自己曾經試圖通過擺弄佇列大小來解決一個問題,這也許就標誌著你的程式可能存在脆弱設計或者固有的可伸縮問題。 get() 和 put() 方法都支援非阻塞方式和設定超時。

import queue
q = queue.Queue() try:   data = q.get(block=False) except queue.Empty:   ... try:   q.put(item, block=False) except queue.Full:   ... try:   data = q.get(timeout=5.0) except queue.Empty:   ...

def producer(q):   ...   try:     q.put(item, block=False)   except queue.Full:     log.warning('queued item %r discarded!', item) _running = True

def consumer(q):   while _running:     try:       item = q.get(timeout=5.0)       # Process item       ...     except queue.Empty:       pass

 

  最後,有 q.qsize() , q.full() , q.empty() 等實用方法可以獲取一個佇列的當前大小和狀態。但要注意,這些方法都不是執行緒安全的。可能你對一個佇列使用empty() 判斷出這個佇列為空,但同時另外一個執行緒可能已經向這個佇列中插入一個數據項。所以,你最好不要在你的程式碼中使用這些方法。

  為了避免出現死鎖的情況,使用鎖機制的程式應該設定為每個執行緒一次只允許獲取一個鎖。如果不能這樣做的話,你就需要更高階的死鎖避免機制。在 threading 庫中還提供了其他的同步原語,比如 RLock 和 Semaphore 物件。

 

  Queue提供的方法:

task_done()

  意味著之前入隊的一個任務已經完成。由佇列的消費者執行緒呼叫。每一個get()呼叫得到一個任務,接下來的task_done()呼叫告訴佇列該任務已經處理完畢。

  如果當前一個join()正在阻塞,它將在佇列中的所有任務都處理完時恢復執行(即每一個由put()呼叫入隊的任務都有一個對應的task_done()呼叫)。

join()

  阻塞呼叫執行緒,直到佇列中的所有任務被處理掉。

  只要有資料被加入佇列,未完成的任務數就會增加。當消費者執行緒呼叫task_done()(意味著有消費者取得任務並完成任務),未完成的任務數就會減少。當未完成的任務數降到0,join()解除阻塞。

put(item[, block[, timeout]])

  將item放入佇列中。
    1.如果可選的引數block為True且timeout為空物件(預設的情況,阻塞呼叫,無超時)。
    2.如果timeout是個正整數,阻塞呼叫程序最多timeout秒,如果一直無空空間可用,丟擲Full異常(帶超時的阻塞呼叫)。
    3.如果block為False,如果有空閒空間可用將資料放入佇列,否則立即丟擲Full異常

  其非阻塞版本為put_nowait等同於put(item, False)

get([block[, timeout]])

  從佇列中移除並返回一個數據。block跟timeout引數同put方法

  其非阻塞方法為get_nowait()相當與get(False)

empty()

  如果佇列為空,返回True,反之返回False

 

  2.同步機制Event

  執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。如果程式中的其他執行緒需要通過斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就會變得非常棘手。為了解決這些問題,我們需要使用 threading 庫中的 Event 物件。

  Event 物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。在初始情況下,event 物件中的訊號標誌被設定假。如果有執行緒等待一個 event 物件,而這個 event 物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個 event 物件的訊號標誌設定為真,它將喚醒所有等待個 event 物件的執行緒。如果一個執行緒等待一個已經被設定為真的 event 物件,那麼它將忽略這個事件,繼續執行。

from threading import Thread, Event
import time

def countdown(n, start_evt):
  print('countdown is starting...')
  start_evt.set()
  while n > 0:
    print('T-minus', n)
    n -= 1
    time.sleep(5)

start_evt = Event() # 可通過Event 判斷執行緒的是否已執行
t = Thread(target=countdown, args=(10, start_evt))
t.start()

print('launching countdown...')
start_evt.wait() # 等待countdown執行

# event 物件的一個重要特點是當它被設定為真時會喚醒所有等待它的執行緒

print('countdown is running...')

 

  Semaphore(訊號量)
  在多執行緒程式設計中,為了防止不同的執行緒同時對一個公用的資源(比如全部變數)進行修改,需要進行同時訪問的數量(通常是1)的限制。訊號量同步基於內部計數器,每呼叫一次acquire(),計數器減1;每呼叫一次release(),計數器加1.當計數器為0時,acquire()呼叫被阻塞。

from threading import Semaphore, Lock, RLock, Condition, Event, Thread
import time

# 訊號量
sema = Semaphore(3) #限制同時能訪問資源的數量為3

def foo(tid):
  with sema:
    print('{} acquire sema'.format(tid))
    time.sleep(1)
  print('{} release sema'.format(tid))


threads = []

for i in range(5):
  t = Thread(target=foo, args=(i,))
  threads.append(t)
  t.start()

for t in threads:
  t.join()

 

  Lock(鎖)
  互斥鎖為資源引入一個狀態:鎖定/非鎖定。某個執行緒要更改共享資料時,先將其鎖定,此時資源的狀態為“鎖定”,其他執行緒不能更改;直到該執行緒釋放資源,將資源的狀態變成“非鎖定”,其他的執行緒才能再次鎖定該資源。

  互斥鎖保證了每次只有一個執行緒進行寫入操作,從而保證了多執行緒情況下資料的正確性。

#建立鎖
mutex = threading.Lock()
#鎖定
mutex.acquire([timeout])
#釋放
mutex.release()

  RLock(可重入鎖)
  為了支援在同一執行緒中多次請求同一資源,python提供了“可重入鎖”:threading.RLock。RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次acquire。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。

import threading
import time

class MyThread(threading.Thread):
  def run(self):
    global num 
    time.sleep(1)

    if mutex.acquire(1): 
      num = num+1
      msg = self.name+' set num to '+str(num)
      print msg
      mutex.acquire()
      mutex.release()
      mutex.release()

num = 0 mutex = threading.RLock()

def test():   for i in range(5):     t = MyThread()     t.start()
if __name__ == '__main__':   test()

 

  Condition(條件變數)
  Condition被稱為條件變數,除了提供與Lock類似的acquire和release方法外,還提供了wait和notify方法。執行緒首先acquire一個條件變數,然後判斷一些條件。如果條件不滿足則wait;如果條件滿足,進行一些處理改變條件後,通過notify方法通知其他執行緒,其他處於wait狀態的執行緒接到通知後會重新判斷條件。不斷的重複這一過程,從而解決複雜的同步問題。

  可以認為Condition物件維護了一個鎖(Lock/RLock)和一個waiting池。執行緒通過acquire獲得Condition物件,當呼叫wait方法時,執行緒會釋放Condition內部的鎖並進入blocked狀態,同時在waiting池中記錄這個執行緒。當呼叫notify方法時,Condition物件會從waiting池中挑選一個執行緒,通知其呼叫acquire方法嘗試取到鎖。

  Condition物件的建構函式可以接受一個Lock/RLock物件作為引數,如果沒有指定,則Condition物件會在內部自行建立一個RLock。

  除了notify方法外,Condition物件還提供了notifyAll方法,可以通知waiting池中的所有執行緒嘗試acquire內部鎖。由於上述機制,處於waiting狀態的執行緒只能通過notify方法喚醒,所以notifyAll的作用在於防止有執行緒永遠處於沉默狀態。

import threading
import time

class Producer:
  def run(self):
    global count
    while True:
      if con.acquire():
        if count > 1000:
          con.wait()
        else:
          count += 100
          msg = threading.current_thread().name + ' produce 100, count=' + str(count)
          print(msg)
          con.notify() # 通知 waiting執行緒池中的執行緒
        con.release()
        time.sleep(1)

count = 0
con = threading.Condition()

class Consumer:
  def run(self):
    global count
    while True:
    if con.acquire():
      if count < 100:
        con.wait()
      else:
        count -= 3
        msg = threading.current_thread().name + ' consumer 3, count=' + str(count)
        print(msg)
        con.notify()
      con.release()
      time.sleep(3)

producer = Producer()

 

程序和執行緒的比較

1.穩定性

  多程序模式最大的優點就是穩定性高,因為一個子程序崩潰了它擁有自己獨立的記憶體空間,不會影響主程序和其他子程序(主程序崩掉,子程序也難逃厄運)。多程序模式的缺點是建立程序的代價大,在Unix/Linux系統下,用fork呼叫還行,在Windows下建立程序開銷巨大。另外,作業系統能同時執行的程序數也是有限的,在記憶體和CPU的限制下,如果有幾千個程序同時執行,作業系統連排程都會成問題。
  多執行緒模式通常比多程序快,多執行緒模式致命的缺點就是任何一個執行緒掛掉都可能直接造成整個程序崩潰,因為所有執行緒共享程序的記憶體。

2.切換開銷

  首先上下文切換就是從當前執行任務切換到另一個任務執行的過程。但是,為了確保下次能從正確的位置繼續執行,在切換之前,會儲存上一個任務的狀態。
  作業系統在切換程序或者執行緒時需要先儲存當前執行的現場環境(CPU暫存器狀態、記憶體頁等),然後,把新任務的執行環境準備好(恢復上次的暫存器狀態,切換記憶體頁等),才能開始執行。這個切換過程雖然很快,但是也需要耗費時間。
  但是執行緒的切換虛擬空間記憶體是相同的,但是程序切換的虛擬空間記憶體則是不同的。所以執行緒上下文切換比程序上下文切換快的多。同時,這兩種上下文切換的處理都是通過作業系統核心來完成的。核心的這種切換過程伴隨的最顯著的效能損耗是將暫存器中的內容切換出。

3.計算密集型和IO密集型下的選擇

  我們可以把任務分為計算密集型和IO密集型。
  計算密集型任務的特點是要進行大量的計算,消耗CPU資源。IO密集型任務的特點是涉及到網路、磁碟IO,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成

 

對比維度

多程序

多執行緒

總結

資料共享、同步

資料共享複雜,需要用IPC;資料是分開的,同步簡單

因為共享程序資料,資料共享簡單,但也是因為這個原因導致同步複雜

各有優勢

記憶體、CPU

佔用記憶體多,切換複雜,CPU利用率低

佔用記憶體少,切換簡單,CPU利用率高

執行緒佔優

建立銷燬、切換

建立銷燬、切換複雜,速度慢

建立銷燬、切換簡單,速度很快

執行緒佔優

程式設計、除錯

程式設計簡單,除錯簡單

程式設計複雜,除錯複雜

程序佔優

可靠性

程序間不會互相影響

一個執行緒掛掉將導致整個程序掛掉

程序佔優

分散式

適應於多核、多機分散式;如果一臺機器不夠,擴充套件到多臺機器比較簡單

適應於多核分散式

程序佔優

 

(1)需要頻繁建立銷燬的優先用執行緒

  原因請看上面的對比。

  這種原則最常見的應用就是Web伺服器了,來一個連線建立一個執行緒,斷了就銷燬執行緒,要是用程序,建立和銷燬的代價是很難承受的

(2)需要進行大量計算的優先使用執行緒

  所謂大量計算,當然就是要耗費很多CPU,切換頻繁了,這種情況下執行緒是最合適的。

  這種原則最常見的是影象處理、演算法處理。

(3)強相關的處理用執行緒,弱相關的處理用程序

  什麼叫強相關、弱相關?理論上很難定義,給個簡單的例子就明白了。

  一般的Server需要完成如下任務:訊息收發、訊息處理。“訊息收發”和“訊息處理”就是弱相關的任務,而“訊息處理”裡面可能又分為“訊息解碼”、“業務處理”,這兩個任務相對來說相關性就要強多了。因此“訊息收發”和“訊息處理”可以分程序設計,“訊息解碼”、“業務處理”可以分執行緒設計。

  當然這種劃分方式不是一成不變的,也可以根據實際情況進行調整。

(4)可能要擴充套件到多機分佈的用程序,多核分佈的用執行緒

  原因請看上面對