1. 程式人生 > >Python多程序原理與實現

Python多程序原理與實現

1 程序的基本概念

什麼是程序?

​ 程序就是一個程式在一個數據集上的一次動態執行過程。程序一般由程式、資料集、程序控制塊三部分組成。我們編寫的程式用來描述程序要完成哪些功能以及如何完成;資料集則是程式在執行過程中所需要使用的資源;程序控制塊用來記錄程序的外部特徵,描述程序的執行變化過程,系統可以利用它來控制和管理程序,它是系統感知程序存在的唯一標誌。

程序的生命週期:建立(New)、就緒(Runnable)、執行(Running)、阻塞(Block)、銷燬(Destroy)

程序的狀態(分類):(Actived)活動程序、可見程序(Visiable)、後臺程序(Background)、服務程序(Service)、空程序

2 父程序和子程序

​ Linux 作業系統提供了一個 fork() 函式用來建立子程序,這個函式很特殊,呼叫一次,返回兩次,因為作業系統是將當前的程序(父程序)複製了一份(子程序),然後分別在父程序和子程序內返回。子程序永遠返回0,而父程序返回子程序的 PID。我們可以通過判斷返回值是不是 0 來判斷當前是在父程序還是子程序中執行。

​ 在 Python 中同樣提供了 fork() 函式,此函式位於 os 模組下。

# -*- coding: utf-8 -*-  
__author__ = 'diesn'
__date__ = '2018/5/31 下午5:17' 

import os
import time
print("在建立子程序前: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) pid = os.fork() if pid == 0: print("子程序資訊: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) time.sleep(5) else: print("父程序資訊: pid=%s, ppid=%s" % (os.getpid(), os.getppid())) # pid表示回收的子程序的pid #pid, result = os.wait() # 回收子程序資源  阻塞
time.sleep(5) #print("父程序:回收的子程序pid=%d" % pid) #print("父程序:子程序退出時 result=%d" % result) # 下面的內容會被列印兩次,一次是在父程序中,一次是在子程序中。 # 父程序中拿到的返回值是建立的子程序的pid,大於0 print("fork建立完後: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
2.1 父子程序如何區分?

​ 子程序是父程序通過fork()產生出來的,pid = os.fork()

​ 通過返回值pid是否為0,判斷是否為子程序,如果是0,則表示是子程序

​ 由於 fork() 是 Linux 上的概念,所以如果要跨平臺,最好還是使用 subprocess 模組來建立子程序。

2.2 子程序如何回收?

python中採用os.wait()方法用來回收子程序佔用的資源

pid, result = os.wait() # 回收子程序資源  阻塞,等待子程序執行完成回收

如果有子程序沒有被回收的,但是父程序已經死掉了,這個子程序就是殭屍程序。

3 Python程序模組

​ python的程序multiprocessing模組有多種建立程序的方式,每種建立方式和程序資源的回收都不太相同,下面分別針對Process,Pool及系統自帶的fork三種程序分析。

3.1 fork()
import os
pid = os.fork() # 建立一個子程序
os.wait() # 等待子程序結束釋放資源
pid為0的代表子程序。

缺點:
​ 1.相容性差,只能在類linux系統下使用,windows系統不可使用;
​ 2.擴充套件性差,當需要多條程序的時候,程序管理變得很複雜;
​ 3.會產生“孤兒”程序和“殭屍”程序,需要手動回收資源。
優點:
​ 是系統自帶的接近低層的建立方式,執行效率高。

3.2 Process程序

multiprocessing模組提供Process類實現新建程序

# -*- coding: utf-8 -*-
import os
from multiprocessing  import Process
import time

def fun(name):
    print("2 子程序資訊: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    print("hello " + name)


def test():
    print('ssss')


if __name__ == "__main__":
    print("1 主程序資訊: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    ps = Process(target=fun, args=('jingsanpang', ))
    print("111 ##### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
    print("3 程序資訊: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    print(ps.is_alive())  # 啟動之前 is_alive為False(系統未建立)
    ps.start()
    print(ps.is_alive())  # 啟動之後,is_alive為True(系統已建立)

    print("222 #### ps pid: " + str(ps.pid) + ", ident:" + str(ps.ident))
    print("4 程序資訊: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    ps.join() # 等待子程序完成任務   類似於os.wait()
    print(ps.is_alive())
    print("5 程序資訊: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))
    ps.terminate()  #終斷程序
    print("6 程序資訊: pid=%s, ppid=%s" % (os.getpid(), os.getppid()))

特點:
​ 1.注意:Process物件可以建立程序,但Process物件不是程序,其刪除與否與系統資源是否被回收沒有直接的關係。
2.主程序執行完後會預設等待子程序結束後回收資源,不需要手動回收資源;join()函式用來控制子程序結束的順序,其內部也有一個清除殭屍程序的函式,可以回收資源;
3.Process程序建立時,子程序會將主程序的Process物件完全複製一份,這樣在主程序和子程序各有一個 Process物件,但是p.start()啟動的是子程序,主程序中的Process物件作為一個靜態物件存在,不執行。

4.當子程序執行完畢後,會產生一個殭屍程序,其會被join函式回收,或者再有一條程序開啟,start函式也會回收殭屍程序,所以不一定需要寫join函式。
5.windows系統在子程序結束後會立即自動清除子程序的Process物件,而linux系統子程序的Process物件如果沒有join函式和start函式的話會在主程序結束後統一清除。

另外還可以通過繼承Process物件來重寫run方法建立程序

3.3 程序池POOL (多個程序)
# -*- coding: utf-8 -*-
__author__ = 'disen'
__date__ = '2018/5/31 下午9:16'

import multiprocessing
import time

def work(msg):
    mult_proces_name = multiprocessing.current_process().name
    print('process: ' + mult_proces_name + '-' + msg)


if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=5) # 建立5個程序
    for i in range(20):
        msg = "process %d" %(i)
        pool.apply_async(work, (msg, ))
    pool.close() # 關閉程序池,表示不能在往程序池中新增程序
    pool.join() # 等待程序池中的所有程序執行完畢,必須在close()之後呼叫
    print("Sub-process all done.")

​ 上述程式碼中的pool.apply_async()apply()函式的變體,apply_async()apply()的並行版本,apply()apply_async()的阻塞版本,使用apply()主程序會被阻塞直到函式執行結束,所以說是阻塞版本。apply()既是Pool的方法,也是Python內建的函式,兩者等價。可以看到輸出結果並不是按照程式碼for迴圈中的順序輸出的。

多個子程序並返回值

apply_async()本身就可以返回被程序呼叫的函式的返回值。上一個建立多個子程序的程式碼中,如果在函式func中返回一個值,那麼pool.apply_async(func, (msg, ))的結果就是返回pool中所有程序的值的物件(注意是物件,不是值本身)

import multiprocessing
import time

def func(msg):
    return multiprocessing.current_process().name + '-' + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4) # 建立4個程序
    results = []
    for i in range(20):
        msg = "process %d" %(i)
        results.append(pool.apply_async(func, (msg, )))
    pool.close() # 關閉程序池,表示不能再往程序池中新增程序,需要在join之前呼叫
    pool.join() # 等待程序池中的所有程序執行完畢
    print ("Sub-process(es) done.")

    for res in results:
        print (res.get())

​ 與之前的輸出不同,這次的輸出是有序的。

​ 如果電腦是八核,建立8個程序,在Ubuntu下輸入top命令再按下大鍵盤的1,可以看到每個CPU的使用率是比較平均的

4 程序間通訊方式

  1. 管道pipe:管道是一種半雙工的通訊方式,資料只能單向流動,而且只能在具有親緣關係的程序間使用。程序的親緣關係通常是指父子程序關係。
  2. 命名管道FIFO:有名管道也是半雙工的通訊方式,但是它允許無親緣關係程序間的通訊。
  3. 訊息佇列MessageQueue:訊息佇列是由訊息的連結串列,存放在核心中並由訊息佇列識別符號標識。訊息佇列克服了訊號傳遞資訊少、管道只能承載無格式位元組流以及緩衝區大小受限等缺點。
  4. 共享儲存SharedMemory:共享記憶體就是對映一段能被其他程序所訪問的記憶體,這段共享記憶體由一個程序建立,但多個程序都可以訪問。共享記憶體是最快的 IPC 方式,它是針對其他程序間通訊方式執行效率低而專門設計的。它往往與其他通訊機制,如訊號兩,配合使用,來實現程序間的同步和通訊。

以上幾種程序間通訊方式中,訊息佇列是使用的比較頻繁的方式。

(1)管道pipe**

import multiprocessing

def foo(conn):
   conn.send('hello father')   #向管道pipe發訊息
   print(conn.recv())

if __name__ == '__main__':
   conn1,conn2=multiprocessing.Pipe(True)    #開闢兩個口,都是能進能出,括號中如果False即單向通訊
   p=multiprocessing.Process(target=foo,args=(conn1,))  #子程序使用sock口,呼叫foo函式
   p.start()
   print(conn2.recv())  #主程序使用conn口接收,從管道(Pipe)中讀取訊息
   conn2.send('hi son') #主程序使用conn口傳送

(2)訊息佇列Queue

Queue是多程序的安全佇列,可以使用Queue實現多程序之間的資料傳遞。

Queue的一些常用方法:

  • Queue.qsize():返回當前佇列包含的訊息數量;
  • Queue.empty():如果佇列為空,返回True,反之False ;
  • Queue.full():如果佇列滿了,返回True,反之False;
  • Queue.get():獲取佇列中的一條訊息,然後將其從列隊中移除,可傳參超時時長。
  • Queue.get_nowait():相當Queue.get(False),取不到值時觸發異常:Empty;
  • Queue.put():將一個值新增進數列,可傳參超時時長。
  • Queue.put_nowait():相當於Queue.get(False),當佇列滿了時報錯:Full。

案例:

from multiprocessing import Process, Queue
import time


def write(q):
   for i in ['A', 'B', 'C', 'D', 'E']:
      print('Put %s to queue' % i)
      q.put(i)
      time.sleep(0.5)


def read(q):
   while True:
      v = q.get(True)
      print('get %s from queue' % v)


if __name__ == '__main__':
   q = Queue()
   pw = Process(target=write, args=(q,))
   pr = Process(target=read, args=(q,))
   print('write process = ', pw)
   print('read  process = ', pr)
   pw.start()
   pr.start()
   pw.join()
   pr.join()
   pr.terminate()
   pw.terminate()

Queue和pipe只是實現了資料互動,並沒實現資料共享,即一個程序去更改另一個程序的資料

注:程序間通訊應該儘量避免使用共享資料的方式

5 多程序實現生產者消費者

以下通過多程序實現生產者,消費者模式

import multiprocessing
from multiprocessing import Process
from time import sleep
import time


class MultiProcessProducer(multiprocessing.Process):
   def __init__(self, num, queue):
      """Constructor"""
      multiprocessing.Process.__init__(self)
      self.num = num
      self.queue = queue

   def run(self):
      t1 = time.time()
      print('producer start ' + str(self.num))
      for i in range(1000):
         self.queue.put((i, self.num))
      # print 'producer put', i, self.num
      t2 = time.time()

      print('producer exit ' + str(self.num))
      use_time = str(t2 - t1)
      print('producer ' + str(self.num) + ', 
      use_time: '+ use_time)



class MultiProcessConsumer(multiprocessing.Process):
   def __init__(self, num, queue):
      """Constructor"""
      multiprocessing.Process.__init__(self)
      self.num = num
      self.queue = queue

   def run(self):
      t1 = time.time()
      print('consumer start ' + str(self.num))
      while True:
         d = self.queue.get()
         if d != None:
            # print 'consumer get', d, self.num
            continue
         else:
            break
      t2 = time.time()
      print('consumer exit ' + str(self.num))
      print('consumer ' + str(self.num) + ', use time:' + str(t2 - t1))


def main():
   # create queue
   queue = multiprocessing.Queue()

   # create processes
   producer = []
   for i in range(5):
      producer.append(MultiProcessProducer(i, queue))

   consumer = []
   for i in range(5):
      consumer.append(MultiProcessConsumer(i, queue))

   # start processes
   for i in range(len(producer)):
      producer[i].start()

   for i in range(len(consumer)):
      consumer[i].start()

   # wait for processs to exit
   for i in range(len(producer)):
      producer[i].join()

   for i in range(len(consumer)):
      queue.put(None)

   for i in range(len(consumer)):
      consumer[i].join()

   print('all done finish')


if __name__ == "__main__":
   main()

6 總結

​ python中的多程序建立有以下兩種方式:

(1)fork子程序

(2)採用 multiprocessing 這個庫建立子程序

​ 需要注意的是佇列中queue.Queue是執行緒安全的,但並不是程序安全,所以多程序一般使用執行緒、程序安全的multiprocessing.Queue()

​ 另外, 程序池使用 multiprocessing.Pool實現,pool = multiprocessing.Pool(processes = 3),產生一個程序池,pool.apply_async實現非租塞模式,pool.apply實現阻塞模式。

apply_async和 apply函式,前者是非阻塞的,後者是阻塞。可以看出執行時間相差的倍數正是程序池數量。

​ 同時可以通過result.append(pool.apply_async(func, (msg, )))獲取非租塞式呼叫結果資訊的。