1. 程式人生 > >Python學習筆記整理併發程式設計

Python學習筆記整理併發程式設計

多併發程式設計之程序

0x0程序的概念

0x0什麼是程序?

正在進行的過程或任務,而執行該任務的是CPU。

0x1程序與程式的區別?

程式只是單純的程式碼集,而程式一旦執行則該程式執行的過程則一個程序,在同一個程式執行多次的情況下出現的是多個程序,而不是一個程序,由此得出結論,每執行一次程式就會產生一個程序。

0x2 併發與並行

無論是併發還是並行,在使用者看來都是在(同時)執行的,不管是程序還是執行緒都只是一個任務,真正執行的是CPU,而一個CPU同一時間只能執行一個任務。

併發:併發可以看做是偽並行,併發並不是真正意義上的同時執行,而是看上去是同時執行。實現方式:單個CPU+多道技術

並行:真正意義上的同時執行。實現方式:多個CPU

0x4 程序的建立

只要是硬體就需要作業系統來管理,然而又作業系統就必定有程序的概念,就需要有建立程序的方式。

對於通用的作業系統,需要有系統在執行過程中(作業系統本身上也是一個程序)有建立或撤銷程序的能力,主要有4中建立程序的形式:

  1. 系統初始化(檢視程序Linux中使用ps命令,Windows則可以直接使用工作管理員檢視)
  2. 在一個程序執行過程中開啟一個子程序(如Nginx開啟多程序,os.fork,subprocess.Popen)
  3. 使用者以互動式的請求建立程序(如開啟一個應用程式)
  4. 批處理作業的初始化(只在大型機的批處理系統中應用)

 

無論哪一種,新程序的建立都是由一個已經純在的程序執行了一個用於建立程序的系統呼叫而建立的:

  • 在Unix中該系統呼叫的是:fork,fork會建立一個與父程序一模一樣的副本,二者有相同的儲存映像,同樣的環境字串和同樣的開啟檔案(在shell直譯器程序中,執行一個命令就會建立一個子程序)
  • 在Windows中該系統呼叫的是:CreateProcess,CreateProcess即負責程序的建立,也負責把正確的程式裝入新的程序

關於建立子程序,Unix與Windows的區別:

  • 相同的是:程序建立後,父程序與子程序有各自不同的地址空間(多道技術要求物理層面實現程序與程序間的記憶體隔離),任何一個程序的在其他地址空間中修改都不會影響到另一個程序
  • 不同的是:在Unix中,子程序的初始地址空間是父程序的一個副本,子程序和父程序是可以有隻讀的記憶體共享區的。但對Windows而言,從一開始父程序與子程序的地址空間就是不同的

0x5 程序的結束

  • 正常退出(使用者正常關閉該程式)
  • 出錯退出(開啟一個不存在的程式)
  • 驗證錯誤(執行非法指令,如引用不存在的記憶體,但可以通過異常捕獲來處理)
  • 被其他程序殺死(如Linux中的命令:kill -9 )

0x6程序的層次結構

無論是Unix還是Windows,程序只有一個父程序,區別是:

  • 在Unix中的所有程序都是以init程序為根(頂級)組成樹形結構。父子程序共同組成一個程序組
  • 在Windows中,沒有程序層次的概念,所有程序的地位都是相同的,唯一類似於程序層次的暗示,就是在建立程序時,父程序會得到一個控制代碼,該控制代碼可以用來控制子程序,但父程序有許可權把該控制代碼交給其他程序

 

0x7 程序的狀態

程序的三種狀態:

  • 執行態
  • 阻塞態
  • 就緒態

程序間的狀態關係:

  • 程序阻塞
  • 排程程式選擇其他程序
  • 程式再次選擇該程序
  • 該程序執行

 

 0x8 程序併發的實現

程序的併發實現在於,硬體中斷一個正在執行的程序,把該程序的執行狀態儲存下來,因此作業系統維護一張表格:即程序表(Process table),每一個程序佔用一個程序表項,該表項也被稱為程序控制塊

 

 該表存放了程序的狀態資訊,因此該程序再次啟動時,就像該程序沒有被中斷過一樣》

 

0x1 開啟程序

0x0 multiprocessing模組

python提供了multiprocessing。 multiprocessing模組用來開啟子程序,並在子程序中執行我們定製的任務(比如函式),該模組與多執行緒模組threading的程式設計介面類似。multiprocessing模組的功能眾多:支援子程序、通訊和共享資料、執行不同形式的同步,>提供了Process、Queue、Pipe、Lock等元件。

需要注意的是:程序沒有共享狀態,程序修改的資料,改動僅限制於該程序內

0x1 Process類

建立程序的類:

Process(target=task,args=("子程序1",))

引數:

group:未設定時,預設為None
target::呼叫者物件,即該程序要執行的程式碼塊可以是函式形式
args:呼叫物件的引數,以元組形式傳入。位置引數
kwargs:呼叫物件的引數,以字典傳入
name:該程序的名稱

  

 方法:

p.start():啟動程序
p.run():程序啟動時執行的方法
p.terminate():強行終止程序
p.is_alive():檢視該程序是否存活 ,存活返回Ture
p.join():主執行緒等待該程序結束

  

屬性:

p.daemon:預設為False,設定為True則表示該程序是守護程序
p.name:檢視程序的名字
p.pid:檢視程序pid

  

0x2 Process類的使用(開啟程序)

在Windows中Process()必須放到 if __name__ == '__main__'下

開啟子程序的兩種方式:

! -*- coding:utf-8 -*-
方式一

from multiprocessing import Process
import time
def task(name):
    print("%s in running " %name)
    time.sleep(3)
    print("%s is done" %name)

# windos 必須這種形式
if __name__ == '__main__':
    p1 = Process(target=task,args=("子程序1",))
    p1.start()   # 給作業系統傳送訊號
    p2  = Process(target=task,kwargs={'name':"子程序2"})
    p2.start()
    print("主程序")
方式一
# 方式二

from multiprocessing import Process
import time
class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name

    def run(self):
        print("%s is  running " %self.name)
        time.sleep(3)
        print("%s is done" %self.name)

if __name__ == '__main__':
    p = MyProcess("子程序1")
    p.start()
    print("主程序")
方式二

 

0x2互斥鎖

 互斥鎖的作用:解決程序之的資料錯亂

互斥鎖的原理:把併發改成序列,降低效率確保資料的安全性

0X0 lOCK物件的使用

mutex = lock():示例化物件
mutex.acquire():加鎖
mutex.release():釋放鎖 

0x1互斥鎖的使用

from multiprocessing import Process,Lock
import time
def task(name,mutex):
    mutex.acquire()
    print("%s 1"%name)
    time.sleep(1)
    print("%s 2"%name)
    time.sleep(1)
    print("%s 3"%name)
    mutex.release()
if __name__ == '__main__':
    mutex = Lock()
    for i in range(3):
        p = Process(target=task,args=("程序%s"%i,mutex))
        p.start()
互斥鎖

0x2互斥鎖的小案例:

模擬搶票

#! -*- coding:utf-8 -*-

import json,time
from multiprocessing import Process,Lock
def search(name):
    """查詢餘票"""
    time.sleep(1)
    dic = json.load(open('db','r',encoding="utf-8"))
    print('<%s> 查詢到餘票[%s]張' %(name,dic['count']))

def get(name):
    """購買"""
    time.sleep(1)
    dic = json.load(open("db", 'r', encoding="utf-8"))
    if dic['count'] > 0:
        dic['count'] -= 1
        time.sleep(3)
        json.dump(dic,open('db','w',encoding='utf-8'))
        print("%s 購買成功" %name)
    else:
        print("%s 購買失敗" %name)

def task(name,mutex):
    search(name)
    # mutex.acquire()
    # get(name)
    # mutex.release()
    with mutex:
        get(name)
if __name__ == '__main__':
    mutex = Lock()
    for i in range(1,11):
        p = Process(target=task,args=("路人%s" %i,mutex))
        p.start()
互斥鎖案例
<路人1> 查詢到餘票[1]張
<路人2> 查詢到餘票[1]張
<路人4> 查詢到餘票[1]張
<路人3> 查詢到餘票[1]張
<路人5> 查詢到餘票[1]張
<路人7> 查詢到餘票[1]張
<路人6> 查詢到餘票[1]張
<路人8> 查詢到餘票[1]張
<路人9> 查詢到餘票[1]張
<路人10> 查詢到餘票[1]張
路人1 購買成功
路人2 購買失敗
路人4 購買失敗
路人3 購買失敗
路人5 購買失敗
路人7 購買失敗
路人6 購買失敗
路人8 購買失敗
路人9 購買失敗
路人10 購買失敗

0X3互斥鎖與Join的區別

雖然互斥鎖都是把並行變成序列但唯一不同的是join是把該程序下的所有功能都變成序列,而互斥鎖是吧區域性的功能變成序列,這樣既保證了資料的安全性,又保證了程式的執行效率,相對join而言

 

0x3佇列

0x0 佇列的概念:

由於程序之間是相互隔離的,要實現程序間的通訊,必須要有一種解決方案,而multiprocess模組提供的兩種形式的解決方法即:佇列,與管道這兩種方法都是進行訊息傳遞的.而不同的是佇列自動解決了鎖的問題

0x1 佇列類的使用

q = Queue(3) 

引數:

maxsize:佇列中最大數,不設定則無大小

方法:

q.put():插入資料 
q.get():取出資料,並刪除佇列中的該資料
q.full():該佇列是否已存放最大限制,

佇列的使用

#! -*- coding:utf-8 -*-
from multiprocessing import Queue
q = Queue(3)
q.put('helo')
q.put({'a':1})
q.put([3,3,3])
print(q.full())
print(q.get())
print(q.get())
print(q.get())
print(q.empty())
print(q.get())
# q.put(1)
佇列的使用

0x4生產者消費者模型

0X0 為什麼要使用生產者消費者模型

生產者就是產生資料的任務,消費者是處理資料的任務,在併發程式設計中,如果生產者的處理熟讀很快,而消費者處理速度很慢,那麼生產者就必需等待消費者處理完畢才能繼續生產資料.。同理如果消費者的能力大於生產者就必需等待生產者,因此未來解決該問題需要使用生產者消費者模式來解決該問題.

0x1 什麼是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者與消費者的強耦合為題的。生產者與消費者並不直接通訊,而是需要通過阻塞佇列來進行通訊,因此生產者生產完資料並不需要消費者等待,而是直接扔給阻塞佇列,消費者不找生產者索要資料,而是直接從阻塞佇列裡取,阻塞佇列就相當於一個緩衝區,因此平衡了生產者與消費者的處理能力.

0x2 生產者與消費者模型的實現

#! -*- coding:utf-8 -*-
from multiprocessing import Process,Queue
import time

def producer(q):
    for i in range(10):
        res='包子%s' %i
        time.sleep(0.5)
        print('生產者生產了%s' %res)

        q.put(res)

def consumer(q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(1)
        print('消費者吃了%s' % res)



if __name__ == '__main__':
    #容器
    q=Queue()

    #生產者們
    p1=Process(target=producer,args=(q,))
    p2=Process(target=producer,args=(q,))
    p3=Process(target=producer,args=(q,))

    #消費者們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    print('')
生產者消費者模型

 

0x3 JoinableQueue的使用

引數:

maxsize:佇列的最大存放數 

方法:

q.task_done():使用者傳送訊號,表示q.get()的返回項已被處理。此方法的使用數大於maxsize的值,
會丟擲異常ValueErro q.join():生產者呼叫該方法進行阻塞,直到佇列中所有專案被處理,阻塞持續到佇列中所有專案都呼叫
q.task_done()

 通過JoinableQueue解決上面的問題

#! -*- coding:utf-8 -*-
from multiprocessing import Process,JoinableQueue
import time
def producer(q):
    for i in range(2):
        res = "包子%s" %i
        time.sleep(0.5)
        print("生產者生產了%s" %res)
        q.put(res)
    q.join()

def consumer(q):
    while True:
        res = q.get()
        if res is None:break
        time.sleep(1)
        print("消費者吃了%s" %res)
        q.task_done()

if __name__ == '__main__':
    q = JoinableQueue()

    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))
    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    p3.join()
    print("主執行緒")
JoinableQueue