1. 程式人生 > >多進程(了解),守護進程,互斥鎖,信號量,進程Queue與線程queue

多進程(了解),守護進程,互斥鎖,信號量,進程Queue與線程queue

生產 模塊 多進程 異常 參數 進程池 數據 div ssi

一、守護進程

  主進程創建守護進程,守護進程的主要的特征為:①守護進程會在主進程代碼執行結束時立即終止;②守護進程內無法繼續再開子進程,否則會拋出異常。

實例:

from multiprocessing import Process
from threading import Thread
import time
def foo():  # 守護進程
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if
__name__ == __main__: p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print("main-------") #打印該行則主進程代碼結束,則守護進程p1應該被終止,可能會有p1任務執行的打印信息123,因為主進程打印main----時,p1也執行了,但是隨即被終止

  註:打印最後一行主進程代碼結束,則守護進程p1應該被終止,可能會有p1任務執行的打印信息‘start123’,因為主進程打印main-時,p1也執行了,但是隨即被終止。

技術分享圖片
from threading import Thread
import time
def foo():  # 守護線程
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == __main__:
    p1=Thread(target=foo)
    p2=Thread(target=bar)

    p1.daemon=True
    p1.start()
    p2.start()
    
print("main-------") # 123 # 456 # main------- # end123 # end456
守護線程 技術分享圖片
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(5)
    print(end123)
def bar():
    print(start456)
    time.sleep(3)
    print(end456)
if __name__ == __main__:
    t1=Thread(target=foo)
    t2=Thread(target=bar)
    t1.daemon=True  #必須放在start()前
    t1.start()
    t2.start()
    print(main)

# 123
# start456
# main
# end456
與上比較

二、互斥鎖 

  進程之間數據不共享,但是共享同一套文件系統,所以訪問同一個文件,或同一個打印終端,是沒有問題的,競爭帶來的結果就是錯亂,如何控制,就是加鎖處理(即局部實行串行)。

模擬搶票實例:

from multiprocessing import Process,Lock
import json,os,time,random

def search():
    with open(db.txt,encoding=utf-8)as f:
        dict = json.load(f)
        print(%s 剩余票數 %s%(os.getpid(),dict[count]))

def get():
    with open(db.txt,encoding=utf-8) as reaf_f:
        dic = json.load(reaf_f)

    if dic[count]>0:
        dic[count] -= 1
        time.sleep(random.randint(1,3))  # 模擬手速,網速
        with open(db.txt,w,encoding=utf-8) as write_f:
            json.dump(dic,write_f)
            print(%s 搶票成功 %os.getpid())
    else:
        print(剩余票數為%s,購票失敗%dic[count])

def task(mutex):
    search()    # 20個人都可以並發的查詢票數
    mutex.acquire()    # 加鎖
    get()              #通過鎖,查詢到結果的20人通過競爭逐一買票。前一個釋放鎖後後一個才可以進入,即串行
    mutex.release()    # 解鎖

if __name__ == __main__:
    mutex = Lock()
    for i in range(20):  # 20個人搶票
        p = Process(target=task,args=(mutex,))
        p.start()

三、信號量

同進程的一樣

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數為5):

from multiprocessing import Process,Semaphore
# from threading import Thread,Semaphore
import time,random,os


def task(sm):
    with sm:
        print(%s 上廁所 %os.getpid())
        time.sleep(random.randint(1,3))

if __name__ == __main__:
    sm = Semaphore(3)
    for i in range(10):
        p= Process(target=task,args=(sm,))
        p.start()

與進程池是完全不同的概念,進程池Pool(4),最大只能產生4個進程,而且從頭到尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程

四、進程間通信機制(IPC)

  基於互斥鎖以上兩種缺點,multiprocessing模塊為我們提供了基於消息通信IPC機制:隊列和管道。隊列和管道都是將數據存放於內存中;隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,我們應該盡量避免使用共享數據,盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可獲展性。

1、隊列(推薦)

(1)隊列相關知識

  隊列創建介紹:

from multiprocessing import Queue      #引入Queue類
q=Queue(n)              #實例化,參數n代表隊列中最大允許存放數,省略則無限制

  常見使用方法:

q.put()                 #用於插入數據到隊列
q.get()                 #用於從隊列讀取並刪除一個數據
q.put_nowait()          #當隊列存在數據已超過最大限制數,則拋出Queue.full異常
q.get_nowait()          #當隊列中已經不存在可取數據時,則拋出Queue.empty異常

  例子:

from multiprocessing import Queue
q=Queue(3)
q.put({a:1})
q.put(bbbb)
q.put((3,2,1))
# q.put_nowait(1111111)  #queue.Full

print(q.get())
print(q.get())
print(q.get())
# print(q.get_nowait())  #queue.Empty

(2)生產消費者模型

  生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

  實例1:

from multiprocessing import Process,Queue
import time,random,os

def procducer(q):
    for i in range(10):
        res=包子%s %i
        time.sleep(0.5)
        q.put(res)
        print(%s 生產了 %s %(os.getpid(),res))

def consumer(q):
    while True:
        res=q.get()
        if res is None:
            break
        print(%s 吃 %s %(os.getpid(),res))
        time.sleep(random.randint(2,3))


if __name__ == __main__:
    q=Queue()
    p=Process(target=procducer,args=(q,))
    c=Process(target=consumer,args=(q,))

    p.start()
    c.start()
    print()

此時的問題是主進程永遠不會結束,原因是:生產者p在生產完後就結束了,但是消費者c在取空了q之後,則一直處於死循環中且卡在q.get()這一步。解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就可以break出死循環。

  例子2:

from multiprocessing import Process,Queue
import time,random,os

def procducer(q):
    for i in range(10):
        res=包子%s %i
        time.sleep(0.5)
        q.put(res)
        print(%s 生產了 %s %(os.getpid(),res))

def consumer(q):
    while True:
        res=q.get()
        if res is None:
            break
        print(%s 吃 %s %(os.getpid(),res))
        time.sleep(random.randint(2,3))


if __name__ == __main__:
    q=Queue()
    p=Process(target=procducer,args=(q,))
    c=Process(target=consumer,args=(q,))

    p.start()
    c.start()

    p.join()
    q.put(None)
    print()

註意:以上發送可以放在生產函數中循環完進行發送,當然也可以如上放在主進程中進行發送,但是前提是必須等生產子進程結束才可以。

多進程(了解),守護進程,互斥鎖,信號量,進程Queue與線程queue