1. 程式人生 > >python3程式設計基礎:多程序(二)建立程序

python3程式設計基礎:多程序(二)建立程序

之前的文章中對多程序的一些基礎概念,程序的生命週期和python程序操作的模組做了說明,本篇文章直接上程式碼,結束python中建立多程序的一些方法。

os.fork()(Linux)

fork()函式,只在Linux系統下存在。而且它非常特殊,普通的函式呼叫,呼叫一次,返回一次,但是fork()呼叫一次,返回兩次,因為作業系統自動把當前程序(稱為父程序)複製了一份(稱為子程序),然後分別在父程序和子程序內返回。子程序永遠返回0,而父程序返回子程序的PID。這樣一個父程序可以fork()出很多子程序,所以父程序要記下每個子程序的ID,而子程序只需要呼叫getppid()就可以拿到父程序的ID,呼叫os.getpid()函式可以獲取自己的程序號。
程式碼示例:os.fork()

import os
print (os.getpid())

pid = os.fork() # 建立一個子程序
print (pid)  #子程序id和0
if pid == 0:
  print ('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
  print ('I (%s) just created a child process (%s).' % (os.getpid(), pid))

在這裡插入圖片描述

multiprocessing模組

multiprocessing模組就是跨平臺版本的多程序管理包,支援子程序、通訊和共享資料、執行不同形式的同步。該模組中有以下類/和方法:
在這裡插入圖片描述

Process類

Multiprocessing模組建立程序使用的是Process類。
Process類的構造方法
init(self, group=None, target=None, name=None, args=(), kwargs={})
引數說明
group:程序所屬組,基本不用。
target:表示呼叫物件,一般為函式。
args:表示呼叫物件的位置引數元組。
name:程序別名。
kwargs:表示呼叫物件的字典。
程式碼示例1

#coding=utf-8
import multiprocessing
def do(n) :
  #獲取當前執行緒的名字
  name = multiprocessing.current_process().name
  print(name,'starting')
  print("worker ", n)
  return 

if __name__ == '__main__' :
  numList = []
  for i in range(8) :
    p = multiprocessing.Process(target=do, args=(i,))
    numList.append(p)
    p.start()#就緒狀態
    #子程序執行完畢了才會執行主程序後面的語句。p程序通過join方法通知主程序死等我結束再繼續執行。
    print("Process end.")
  for i in numList:
    i.join()#每個程序執行結束才會開始下一個迴圈
  print(numList)#5個程序全部執行完畢才執行print語句,也就是主程序死等

在這裡插入圖片描述
程式碼示例2

# -*- coding: utf-8 -*- 
from multiprocessing import Process  
import os  
import time

def sleeper(name, seconds):  
    print("Process ID# %s" % (os.getpid()))  #獲取當前程序ID
    print("Parent Process ID# %s" % (os.getppid())) #獲取父程序ID
    print("%s will sleep for %s seconds" % (name, seconds))  
    time.sleep(seconds)
    
if __name__ == "__main__":  
  child_proc = Process(target = sleeper, args = ('bob', 5))  
  child_proc.start()  
  print("in parent process after child process start")  
  print("parent process about to join child process")
  child_proc.join()
  print("in parent process after child process join" ) 
  print("the parent's parent process: %s" % (os.getppid()))

在這裡插入圖片描述

程式碼示例3: 多程序模板程式

#coding=utf-8
import urllib.request
import time
import multiprocessing

def func1(url) :
  response = urllib.request.urlopen(url) 
  html = response.read()
  print(html[0:20])
  time.sleep(1) 

def func2(url) :
  response = urllib.request.urlopen(url) 
  html = response.read()
  print(html[0:20])
  time.sleep(1)
 
if __name__ == '__main__' :
    p1 = multiprocessing.Process(target=func1,args=("http://www.sogou.com",),name="gloryroad1")
    p2 = multiprocessing.Process(target=func2,args=("http://www.baidu.com",),name="gloryroad2")
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    time.sleep(1)
    print("done!")

在這裡插入圖片描述
程式碼示例4: 單程序和多程序的執行效率對比

#coding: utf-8
import multiprocessing
import time 
def m1(x):
    time.sleep(0.05)
    return x * x

if __name__ == '__main__':
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    i_list = range(1000)
    time1=time.time()
    pool.map(m1, i_list)
    time2=time.time()
    print('time elapse:',time2-time1)

    time1=time.time()
    list(map(m1, i_list))
    time2=time.time()
    print('time elapse:',time2-time1)

在這裡插入圖片描述

程序池Pool

Pool類可以提供指定數量(一般為CPU的核數)的程序供使用者呼叫,當有新的請求提交到Pool中時,如果池還沒有滿,就會建立一個新的程序來執行請求。如果池滿,請求就會告知先等待,直到池中有程序結束,才會建立新的程序來執行這些請求
如果操作的物件數目上百個甚至更多,那手動去限制程序數量就顯得特別的繁瑣,此時就可以交給程序池自動管理多個程序。
注意:程序池中的程序是不能共享佇列和資料的,而Process生成的子程序可以共享佇列
Pool類中常用方法

  • apply():
    函式原型:apply(func[, args=()[, kwds={}]])
    該函式用於傳遞不定引數,主程序會被阻塞直到函式執行結束(不建議使用,並且3.x以後不再使用)。阻塞的,和單程序沒有什麼區別
  • apply_async():
    函式原型:apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
    非同步非阻塞的,不用等待當前程序執行完畢,隨時根據系統排程來進行程序切換(建議使用)
    注意:apply_async每次只能提交一個程序的請求
  • map()
    函式原型:map(func, iterable[, chunksize=None])
    Pool類中的map方法,與內建的map函式用法行為基本一致,它會使程序阻塞直到返回結果。
    注意:
    雖然第二個引數是一個迭代器,但在實際使用中,必須在整個佇列都就緒後,程式才會執行子程序;
    map返回的是一個列表,由func函式的返回值組成
  • close()
    關閉程序池(Pool),使其不再接受新的任務。
  • terminate()
    立刻結束工作程序,不再處理未處理的任務
  • join()
    使主程序阻塞等待子程序的退出,join方法必須在close或terminate之後使用

程式碼示例1:簡單的程序池apply_async+map

#encoding =utf-8
import time
import multiprocessing

def mul(x):
      return x*x

if __name__ =="__main__":
      pool = multiprocessing.Pool(multiprocessing.cpu_count())# start 4 worker processes
      result = pool.apply_async(mul, [10])
      print(result)#multiprocessing.pool.ApplyResult object 
      print(dir(result))
      print(result.get(timeout = 1))#用get方法取出返回結果
      print(pool.map(mul,range(10)))

在這裡插入圖片描述

程式碼示例2:map藉助類實現傳遞多個引數的情況

from multiprocessing import Pool

def f(object):
    return object.x * object.y

class A:
    def __init__(self,a,b):
        self.x =a
        self.y =b

if __name__ == '__main__':
    pool = Pool(processes = 4)      # start 4 worker processes
    params = [A(i,i) for i in range(10)]
    print(pool.map(f,params))   # prints "[0, 1, 4,..., 81]"

在這裡插入圖片描述
說明:呼叫map的時候不需要先執行close關閉程序池,但是join不呼叫close的話會報錯

程式碼示例3:多程序與單程序執行時間比較

#encoding=utf-8
import time
from multiprocessing import Pool
import os,multiprocessing
 
def run(num):
      time.sleep(1)
      return num * num
 
if __name__ == "__main__":
      testList = [1,2,3,4,5,6,7]
      print('單程序執行')#順序執行
      t1 = time.time()
      for i in testList:
            run(i)
      t2 = time.time()
      print('順序執行的時間為:',int(t2-t1))
      
      print('多程序執行')#並行執行
      pool = Pool(multiprocessing.cpu_count())#建立擁有4個程序數量的程序池
      result = pool.map(run,testList)
      pool.close()#關閉程序池,不再接受新的任務
      pool.join()#主程序阻塞等待子程序的退出
      t3 = time.time()
      print('並行執行的時間為:',int(t3-t2))
      print(result)

在這裡插入圖片描述
說明:
併發執行的時間明顯比順序執行要快很多,但是程序是要耗資源的,所以程序數也不能開太大。
程式中的result表示全部程序執行結束後全域性的返回結果集,run函式有返回值,所以一個程序對應一個返回結果,這個結果存在一個列表中,也就是一個結果堆中,實際上是用了佇列的原理,等待所有程序都執行完畢,就返回這個列表(列表的順序不定)。
對Pool物件呼叫join()方法會等待所有子程序執行完畢,呼叫join()之前必須先呼叫close(),讓其不再接受新的Process。

程式碼示例4:程序池中的程序不能共享佇列

#encoding=utf-8
import time
from multiprocessing import Pool,Queue,Process
 
def func( q):
      print('*'*30)
      q.put('1111')
      
if  __name__ == "__main__":
      queue = Queue()  #直接用multiprocessing.Queue生成佇列
      pool = Pool(4)#生成一個容量為4的程序池
      for i in range(5):
            pool.apply_async(func, (queue,))#向程序池提交目標請求
      pool.close()
      pool.join()
            
      print(queue.qsize())

在這裡插入圖片描述
說明:
print(queue.qsize())輸出的結果是0,而且func函式中的print也沒有執行,如果不傳入q的話,func函式就能正確執行,原因是程序池中每個程序都有自己獨立的佇列,是不共享的,解決方法有兩種:一是用Process生成程序,二是用multiprocessing.Manager.Queue()生成共享佇列。修改後的程式碼如下:

方法一:Process生成程序

#encoding=utf-8
import time
from multiprocessing import Pool,Queue,Process
import multiprocessing
def func( q):
      print('*'*30)
      q.put('1111')

if  __name__ == "__main__":
      queue = Queue()
      p_list = []
      for i in range(5):
            p = Process(target = func, args=(queue,))
            p_list.append(p)
      for p in p_list:
            p.start()
      for p in p_list:
            p.join()
            
      print(queue.qsize())
      print(queue.get())   

在這裡插入圖片描述

方法二:multiprocessing.Manager.Queue()

#encoding=utf-8
import time
from multiprocessing import Pool,Queue,Process
import multiprocessing
def func( q):
      print('*'*30)
      q.put('1111')
       
if  __name__ == "__main__":
      m = multiprocessing.Manager()
      queue = m.Queue()
      pool = Pool(4)#生成一個容量為4的程序池
      for i in range(5):
            pool.apply_async(func, (queue,))#向程序池提交目標請求
      pool.close()
      pool.join()
            
      print(queue.qsize())

在這裡插入圖片描述