1. 程式人生 > >並發編程之——多進程

並發編程之——多進程

代碼 -c lob 終端 概念 安全 producer 基本 search

  一、基本概念

  1.1 進程

  其實進程就是正在進行的一個程序或者任務,而負責執行任務的是CPU,執行任務的地方是內存。跟程序相比,程序僅僅是一堆代碼而已,而程序運行時的過程才是進程。另外同一個程序執行兩次就是兩個進程了。

  1.2 並發與並行

  無論是並行還是並發,在用戶看來都是‘同時‘運行的,不管是進程還是線程,都只是一個任務而已,真是幹活的是cpu,cpu來做這些任務,而一個cpu同一時刻只能執行一個任務。對於“並發”而言,是偽並行,即看起來是同時運行,單個cpu+多道技術就可以實現並發;而“並行”才是真正意義上的“同時運行”——僅有多核才能夠實現“並行”。

  需要強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。

  二、Multiprocessing模塊

  python中的多線程無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu\_count\(\)查看),在python中大部分情況需要使用多進程。

  Python提供了multiprocessing。 multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,>提供了Process、Queue、Pipe、Lock等組件。

  2.1 開啟子進程的兩種方式

  2.1.1 直接在Multiprocessing模塊中導入Process類,利用這個類實例化進程對象

技術分享圖片
# -*- coding: utf-8  -*-
# -*- Author: WangHW -*-
#方式一
from multiprocessing import Process
import time
import os

def task(name):
    print(%s is running...%name)
    print(子進程的id為:,os.getpid())
    print(子進程的父進程的id為:,os.getppid())
    time.sleep(
3) print(%s is done%name) if __name__ == __main__: #Process(target=task, kwargs={‘name‘:‘子進程1‘}) #得到一個對象 p = Process(target=task,args=(子進程1,)) #start僅僅只是給操作系統發送了一個信號,發完信號以後父進程不會等子進程 #是完全獨立的兩個進程 p.start() print(主進程) print(主進程的id為:,os.getpid()) print(主進程的父進程id為:,os.getppid())
View Code

  2.2.2 利用類的繼承,自己定義一個MyProcessing類,繼承自Process,但是需要註意的是:裏面必須要有一個名為run的方法去執行主體:

技術分享圖片
# -*- coding: utf-8  -*-
# -*- Author: WangHW -*-

from multiprocessing import Process
import time

#用類的繼承方式實現
class MyProcessing(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name

    #註意名字必須叫run
    def run(self):
        print(%s is running......%self.name)
        time.sleep(3)
        print(%s is done...%self.name)


if __name__ == __main__:
    p = MyProcessing(進程1)
    p.start()

    print(主進程)
View Code

  2.2 Process類實例化出對象的join方法 

  在主進程運行過程中如果想並發地執行其他的任務,我們可以開啟子進程,此時主進程的任務與子進程的任務分兩種情況

  情況一:在主進程的任務與子進程的任務彼此獨立的情況下,主進程的任務先執行完畢後,主進程還需要等待子進程執行完畢,然後統一回收資源。

  情況二:如果主進程的任務在執行到某一個階段時,需要等待子進程執行完畢後才能繼續執行,就需要有一種機制能夠讓主進程檢測子進程是否運行完畢,在子進程執行完畢後才繼續執行,否則一直在原地阻塞,這就是join方法的作用

技術分享圖片
# -*- coding: utf-8  -*-
# -*- Author: WangHW -*-
from multiprocessing import Process
import time
import os

def task(name,n):
    print(%s is running...%name)
    time.sleep(n)
    print(%s is done...% name)


if __name__ == __main__:
    start_time = time.time()
    # for i in range(5,8):
    #     p = Process(target=task,args=(‘p%s‘%(i+1),i))
    #     p.start()
        #p.join()
    p1 = Process(target=task,args=(p1,5))
    p2 = Process(target=task,args=(p2,2))
    p3 = Process(target=task,args=(p3,3))
    #start僅僅是向操作系統發出信號,具體誰先執行不一定,由操作系統決定
    p1.start()
    p2.start()
    p3.start()
    #保證有序,看著像“串行”,但實際上還是並行:最後一行的運行時間可以驗證
    p1.join()
    p2.join()
    p3.join()

    print(主進程開啟,id為:,os.getpid())
    #打印出來的結果可知,程序仍然是並發執行的,不是串行執行的
    print(運行時間:,time.time()-start_time)
View Code

  關於join方法,需要註意的一點是:雖然我們看著像“串行”,但實際上還是並行:由上面程序最後一行的運行時間可以驗證:

技術分享圖片

  三、互斥鎖

  3.1 雖然進程之間數據不共享,但是可以共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,而共享帶來的是競爭,競爭帶來的結果就是錯亂。

  如何控制,就是加鎖處理。而互斥鎖的意思就是互相排斥,如果把多個進程比喻為多個人,互斥鎖的工作原理就是多個人都要去爭搶同一個資源:衛生間,一個人搶到衛生間後上一把鎖,其他人都要等著,等到這個完成任務後釋放鎖,其他人才有可能有一個搶到......所以互斥鎖的原理,就是把並發改成穿行,降低了效率,但保證了數據安全不錯亂

  這裏有一個利用互斥鎖模擬搶票的程序(whw.json文件的內容為:{"count": 2}):

技術分享圖片
# -*- coding: utf-8  -*-
# -*- Author: WangHW -*-
from multiprocessing import Process,Lock
import json
import time

#查票
def search(name):
    time.sleep(1)
    with open(whw.json,r) as f:
        ticket_dict =  json.load(f)
        print(<%s>查看到余票為:<%s>%(name,ticket_dict[count]))

#買票
def get(name):
    time.sleep(1)
    f = open(whw.json,r)
    ticket_dict =  json.load(f)
    print(<%s>查看到余票還剩余:<%s>%(name,ticket_dict[count]))
    if ticket_dict[count] > 0:
        ticket_dict[count] -= 1
        print(<%s>購票成功! % name)
        time.sleep(1)
    else:
        print(余票不足~購票失敗)
    f.close()
    #保存
    f_new = open(whw.json,w)
    json.dump(ticket_dict,f_new)
    f_new.close()


def task(name,mutex):
    search(name)
    #在購票前加鎖
    mutex.acquire()
    get(name)
    #釋放鎖
    mutex.release()

if __name__ == __main__:
    mutex = Lock()
    for i in range(5):
        p = Process(target=task,args=(路人%s%(i+1),mutex))
        p.start()
View Code

  結果展示:

技術分享圖片

  3.2關於互斥鎖與join的區別:

  用一句話來簡單概括:“互斥鎖”是將代碼的“局部變成串行”,而如果用join的話會整個功能代碼變為串行,所以對於本例而言互斥鎖要靈活一些。

  四、隊列

  4.1對於多進程有一個問題需要我們考慮:是否有一種方案能夠同時兼顧一下兩點:一是效率高(多個進程共享一塊內存數據),另外一點是能夠幫我們處理好鎖的問題。

  答案就是~~利用隊列!

  首先,隊列是將數據存到內存中處理,這就滿足了“效率高”這個要求,另外,隊列是基於“管道+鎖”設計的,所以另外一點也滿足了。事實上,隊列才是進程間通信(IPC)的最佳選擇

  另外需要大家註意的是:隊列是一種先進先出的數據結構

  創建隊列用以下方式:

技術分享圖片
# -*- coding: utf-8  -*-
# -*- Author: WangHW -*-
from multiprocessing import Queue

#隊列中不應該放大文件,發的只是精簡的消息
#可以不指定大小,但最終受限於內存的大小
q = Queue(3)
q.put(hello)
q.put({a:1})
q.put(3333333)
#判斷一下隊列滿沒有
print(q.full())
#取出來~先進先出
print(q.get())
print(q.get())
print(q.get())
View Code

  4.2 隊列的應用——生產者消費者模型

  “生產者消費者模型”是並發編程的非常重要的一個模型,也是隊列的一個非常重要的應用之一:

  技術分享圖片

  上圖是一個簡單的生產者與消費者模型:生產者將生產的DATA先放到隊列裏,消費者從隊列中獲取生產者生產的數據,這樣使得程序的耦合性大大降低,而且也平衡了生產者與消費者之間的速度差:

  具體代碼如下:

技術分享圖片
# -*- coding: utf-8  -*-
# -*- Author: WangHW -*-
from multiprocessing import Process,Queue
import time

def producer(q):
    for i in range(5):
        res = 包子%s%i
        time.sleep(0.5)
        print(生產者生產了%s%res)
        q.put(res)

def consumer(q):
    while 1:
        res = q.get()
        if res is None:
            break
        time.sleep(1)
        print(消費者吃了%s%res)


if __name__ == __main__:
    q = Queue()
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=producer,args=(q,))
    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    #有兩個消費者,需要最後put兩次None
    q.put(None)
    q.put(None)
    #print(‘主進程‘.center(20,‘*‘))
View Code

  實現效果如下:

技術分享圖片

  當然上面的代碼可以利用“守護進程”優化一下(作為了解),將消費者進程設置為守護進程,隨著主程序進程一起消除:

技術分享圖片
# -*- coding: utf-8  -*-
# -*- Author: WangHW -*-
from multiprocessing import Process,JoinableQueue
import time

def producer(q):
    for i in range(5):
        res = 包子%s%i
        time.sleep(0.5)
        print(生產者生產了%s%res)
        q.put(res)
    q.join()

def consumer(q):
    while 1:
        res = q.get()
        if res is None:
            break
        time.sleep(1)
        print(消費者吃了%s%res)
        q.task_done()

if __name__ == __main__:
    q = JoinableQueue()
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=producer,args=(q,))
    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))
    #將消費者進程設置為守護進程,隨著主程序一起消除
    c1.daemon = True
    c2.daemon = True
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
View Code

  五、其他補充

  5.1 需要註意的一點是:進程之間的內存空間是相互隔離的,看如下程序:

from multiprocessing import Process

n = 100

def work():
    global n
    n = 0
    print(子進程內的n為:,n)


if __name__ == __main__:
    p = Process(target=work)
    p.start()
    print(主進程的n為:,n)

  運行結果為:

技術分享圖片

並發編程之——多進程