1. 程式人生 > >併發程式設計之多程序

併發程式設計之多程序

一、什麼是程序

一個正在執行的程式稱之為程序 是一種抽象概念 表示一個執行某件事情的過程程序的概念 起源於作業系統

第一代計算機 程式是固定 無法修改 某種計算機只能幹某種活

第二代批處理系統 需要人工參與 將程式攢成一批 統一執行序列執行 提高計算機的的利用率 但是除錯麻煩

第三代計算機 為了更好利用計算機資源,產生了

多道技術: (重點) 1.空間複用 記憶體分割為多個區域 每個區域儲存不同的應用程式

2.時間的複用 1.當一個程式遇到了I/O操作時 會切換到其他程式 (切換前需要儲存當前執行狀態 以便恢復執行)

提高效率

2.當你的應用程式執行時間過長 作業系統會強行切走 以保證其他程式也能正常執行 當然因為cpu速度賊快 使用者感覺不到,降低效率

3.有一個優先順序更高的任務需要處理 此時也會切走降低了效率

我們編寫程式時 只能儘量減少I/O操作

總的來說 有了多道技術之後 作業系統可以同時執行多個程式吧 這種情形稱之為併發 但是本質好 這些程式還是一個一個排隊執行。

#一 作業系統的作用:
    1:隱藏醜陋複雜的硬體介面,提供良好的抽象介面
    2:管理、排程程序,並且將多個程序對硬體的競爭變得有序

#二 多道技術:
    1.產生背景:針對單核,實現併發
    ps:
    現在的主機一般是多核,那麼每個核都會利用多道技術
    有4個cpu,運行於cpu1的某個程式遇到io阻塞,會等到io結束再重新排程,會被排程到4個
    cpu中的任意一個,具體由作業系統排程演算法決定。
    
    
2.空間上的複用:如記憶體中同時有多道程式 3.時間上的複用:複用一個cpu的時間片 強調:遇到io切,佔用cpu時間過長也切,核心在於切之前將程序的狀態儲存下來,這樣 才能保證下次切換回來時,能基於上次切走的位置繼續執行

程序的並行與併發

併發 看起來像是同時執行中 本質是不停切換執行 多個程序隨機執行並行 同一時刻 多個程序 同時進行 只有多核處理器才有真正的並行

區別:

並行是從微觀上,也就是在一個精確的時間片刻,有不同的程式在執行,這就要求必須有多個處理器。併發是從巨集觀上,在一個時間段上可以看出是同時執行的,比如一個伺服器同時處理多個session。

序列 一個一個 依次執行排隊 阻塞 遇到了I/O操作 看起來就是程式碼卡住了非阻塞 不會卡住程式碼的執行 阻塞 和 非阻塞 說的是同一個程序的情況下

同步 一個呼叫必須獲得返回結果才能繼續執行 非同步 一個呼叫發起後 發起方不需要等待它的返回結果

同步和非同步 必須存在多個程序(執行緒) 無論是程序還是執行緒都是兩條獨立的執行路徑

多程序的執行順序 主程序必然先執行子程序應該在主程序執行後執行一旦子程序啟動了 後續的順序就無法控制了

python如何使用多程序 1.直接建立Process物件 同時傳入要做的事情就是一個函式

p = Process(taget=一個函式,args=(函式的引數))p.start() 讓作業系統啟動這個程序 2.建立一個類 繼承自Process 把要做的任務放在run方法中

常用屬性 start 開啟程序 join 父程序等待子程序 name 程序名稱 is_alive是否存活 terminate 終止程序 pid 獲取程序id

啟動程序的方式 1.系統初始化 會產生一個根程序 2.使用者的互動請求 滑鼠雙擊某個程式 3.在一個程序中 發起了系統呼叫啟動了另一個程序 4.批處理作業開始 某些專用計算機可能還在使用

不同作業系統建立程序的方式不同 unix < centos mac linux 完全拷貝父程序的所有資料 子程序可以訪問父程序的資料嗎?不可以 但是可以訪問拷貝過來資料副本 windows 建立子程序 載入父程序中所有可執行的檔案

二、在python程式中的程序操作

一 multiprocessing模組介紹

python中的多執行緒無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu\_count\(\)檢視),在python中大部分情況需要使用多程序。

Python提供了multiprocessing。 multiprocessing模組用來開啟子程序,並在子程序中執行我們定製的任務(比如函式),該模組與多執行緒模組threading的程式設計介面類似。multiprocessing模組的功能眾多:支援子程序、通訊和共享資料、執行不同形式的同步,>提供了Process、Queue、Pipe、Lock等元件。

需要再次強調的一點是:與執行緒不同,程序沒有任何共享狀態,程序修改的資料,改動僅限於該程序內

二 Process類的介紹

Process([group [, target [, name [, args [, kwargs]]]]]),由該類例項化得到的物件,可用來開啟一個子程序

強調:
1. 需要使用關鍵字的方式來指定引數
2. args指定的為傳給target函式的位置引數,是一個元組形式,必須有逗號

引數介紹:

group引數未使用,值始終為None

target表示呼叫物件,即子程序要執行的任務

args表示呼叫物件的位置引數元組,args=(1,2,'egon',)

kwargs表示呼叫物件的字典,kwargs={'name':'egon','age':18}

name為子程序的名稱

方法介紹:

p.start():啟動程序,並呼叫該子程序中的p.run() 
p.run():程序啟動時執行的方法,正是它去呼叫target指定的函式,我們自定義類的類中一定要實現該方法  
p.terminate():強制終止程序p,不會進行任何清理操作,如果p建立了子程序,該子程序就成了殭屍程序,使用該方法需要特別小心這種情況。如果p還儲存了一個鎖那麼也將不會被釋放,進而導致死鎖
p.is_alive():如果p仍然執行,返回True
p.join([timeout]):主執行緒等待p終止(強調:是主執行緒處於等的狀態,而p是處於執行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的程序,而不能join住run開啟的程序

屬性介紹:

p.daemon:預設值為False,如果設為True,代表p為後臺執行的守護程序,當p的父程序終止時,p也隨之終止,並且設定為True後,p不能建立自己的新程序,必須在p.start()之前設定

p.name:程序的名稱

p.pid:程序的pid

三 Process類的使用

注意:在windows中Process()必須放到# if __name__ == '__main__':下

建立並開啟子程序的方式

import time
import random
from multiprocessing import Process

def piao(name):
    print('%s piaoing' %name)
    time.sleep(random.randrange(1,5))
    print('%s piao end' %name)

if __name__ == '__main__':
    #例項化得到四個物件
    p1=Process(target=piao,args=('egon',)) #必須加,號
    p2=Process(target=piao,args=('alex',))
    p3=Process(target=piao,args=('wupeqi',))
    p4=Process(target=piao,args=('yuanhao',))

    #呼叫物件下的方法,開啟四個程序
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print('')
方式一
import time
import random
from multiprocessing import Process

class Piao(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print('%s piaoing' %self.name)

        time.sleep(random.randrange(1,5))
        print('%s piao end' %self.name)

if __name__ == '__main__':
    #例項化得到四個物件
    p1=Piao('egon')
    p2=Piao('alex')
    p3=Piao('wupeiqi')
    p4=Piao('yuanhao')

    #呼叫物件下的方法,開啟四個程序
    p1.start() #start會自動呼叫run
    p2.start()
    p3.start()
    p4.start()
    print('')
方式二

四、Process物件的join方法

在主程序執行過程中如果想併發地執行其他的任務,我們可以開啟子程序,此時主程序的任務與子程序的任務分兩種情況

情況一:在主程序的任務與子程序的任務彼此獨立的情況下,主程序的任務先執行完畢後,主程序還需要等待子程序執行完畢,然後統一回收資源。

情況二:如果主程序的任務在執行到某一個階段時,需要等待子程序執行完畢後才能繼續執行,就需要有一種機制能夠讓主程序檢測子程序是否執行完畢,在子程序執行完畢後才繼續執行,否則一直在原地阻塞,這就是join方法的作用

import time
from multiprocessing import Process


def f(name):
    print('hello', name)
    time.sleep(1)


if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
        p.join()
    # [p.join() for p in p_lst]
    print('父程序在執行')
多個程序同時執行,再談join方法(1)
import time
from multiprocessing import Process

def f(name):
    print('hello', name)
    time.sleep(1)

if __name__ == '__main__':
    p_lst = []
    for i in range(5):
        p = Process(target=f, args=('bob',))
        p.start()
        p_lst.append(p)
    # [p.join() for p in p_lst]
    print('父程序在執行')
多個程序同時執行,再談join方法(2)

除了上面這些開啟程序的方法,還有一種以繼承Process類的形式開啟程序的方式

import os
from multiprocessing import Process


class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print(os.getpid())
        print('%s 正在和女主播聊天' %self.name)

p1=MyProcess('wupeiqi')
p2=MyProcess('yuanhao')
p3=MyProcess('nezha')

p1.start() #start會自動呼叫run
p2.start()
# p2.run()
p3.start()


p1.join()
p2.join()
p3.join()

print('主執行緒')
通過繼承Process類開啟程序

程序之間的資料隔離問題

from multiprocessing import Process

def work():
    global n
    n=0
    print('子程序內: ',n)


if __name__ == '__main__':
    n = 100
    p=Process(target=work)
    p.start()
    print('主程序內: ',n)
程序之間的資料隔離問題

三、守護程序

主程序建立子程序,然後將該程序設定成守護自己的程序,守護程序就好比崇禎皇帝身邊的老太監,崇禎皇帝已死老太監就跟著殉葬了。

關於守護程序需要強調兩點:

其一:守護程序會在主程序程式碼執行結束後就終止

其二:守護程序內無法再開啟子程序,否則丟擲異常:AssertionError: daemonic processes are not allowed to have children

如果我們有兩個任務需要併發執行,那麼開一個主程序和一個子程序分別去執行就ok了,如果子程序的任務在主程序任務結束後就沒有存在的必要了,那麼該子程序應該在開啟前就被設定成守護程序。主程序程式碼執行結束,守護程序隨即終止

from multiprocessing import Process
import time
import random

def task(name):
    print('%s is piaoing' %name)
    time.sleep(random.randrange(1,3))
    print('%s is piao end' %name)


if __name__ == '__main__':
    p=Process(target=task,args=('egon',))
    p.daemon=True #一定要在p.start()前設定,設定p為守護程序,禁止p建立子程序,並且父程序程式碼執行結束,p即終止執行
    p.start()
    print('') #只要終端打印出這一行內容,那麼守護程序p也就跟著結束掉了
守護程序

守護 就是看著 陪著  在程式碼中 程序只能由程序類守護 一個程序守護者另一個程序 指的是兩個程序之間的關聯關係 特點:守護程序(妃子) 在被守護程序(皇帝)死亡時 會跟隨被守護程序死亡

什麼時候需要使用守護程序?例如: qq中有個下載視訊 應該用子程序去做 但是 下載的過程中 qq退出 那麼下載也沒必要繼續了

四、互斥鎖

from multiprocessing import Process,Lock

# 程序間 記憶體空間是相互獨立的
def task1(lock):
    lock.acquire()
    for i in range(10000):
        print("===")
    lock.release()

def task2(lock):
    lock.acquire()
    for i in range(10000):
        print("===============")
    lock.release()

def task3(lock):
    lock.acquire()
    for i in range(10000):
        print("======================================")
    lock.release()

if __name__ == '__main__':
    # 買了一把鎖
    mutex = Lock()

    # for i in range(10):
    #     p = Process(target=)
    p1 = Process(target=task1,args=(mutex,))
    p2 = Process(target=task2,args=(mutex,))
    p3 = Process(target=task3,args=(mutex,))

    # p1.start()
    # p1.join()
    # p2.start()
    # p2.join()
    # p3.start()
    # p3.join()

    p1.start()
    p2.start()
    p3.start()

    print("over!")
互斥鎖案例
     # 什麼時候用鎖?
    # 當多個程序 同時讀寫同一份資料 資料很可能就被搞壞了
    # 第一個程序寫了一箇中文字元的一個位元組 cpu被切到另一個程序
    # 另一個程序也寫了一箇中文字元的一個位元組
    # 最後檔案解碼失敗
    # 問題之所以出現 是因為併發 無法控住順序
    # 目前可以使用join來將所有程序併發改為序列

    # 與join的區別?
    # 多個程序併發的訪問了同一個資源  將導致資源競爭(同時讀取不會產生問題 同時修改才會出問題)
    # 第一個方案 加上join  但是這樣就導致了 不公平  相當於 上廁所得按照顏值來
    # 第二個方案 加鎖  誰先搶到資源誰先處理[
    # 相同點: 都變成了序列
    # 不同點:
    # 1.join順序固定 鎖順序不固定!
    # 2.join使整個程序的任務全部序列  而鎖可以指定哪些程式碼要序列

    # 鎖使是什麼?
    # 鎖本質上就是一個bool型別的識別符號  大家(多個程序) 在執行任務之前先判斷識別符號
    # 互斥鎖 兩個程序相互排斥

    # 注意 要想鎖住資源必須保證 大家拿到鎖是同一把

    # 怎麼使用?
    # 在需要加鎖的地方 lock.acquire() 表示鎖定
    # 在程式碼執行完後 一定要lock.release() 表示釋放鎖
    # lock.acquire()
    # 放需要競爭資源的程式碼 (同時寫入資料)
    # lock.release()

程序之間資料不共享,但是共享同一套檔案系統,所以訪問同一個檔案,或者列印終端是沒有問題的,但是帶來的是競爭,競爭帶來的結果是錯亂,如下:多個程序模擬多個人執行搶票任務

#檔案db.txt的內容為:{"count":1}
#注意一定要用雙引號,不然json無法識別
from multiprocessing import Process
import time,json

def search(name):
    dic=json.load(open('db.txt'))
    time.sleep(1)
    print('\033[43m%s 查到剩餘票數%s\033[0m' %(name,dic['count']))

def get(name):
    dic=json.load(open('db.txt'))
    time.sleep(1) #模擬讀資料的網路延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(1) #模擬寫資料的網路延遲
        json.dump(dic,open('db.txt','w'))
        print('\033[46m%s 購票成功\033[0m' %name)

def task(name):
    search(name)
    get(name)

if __name__ == '__main__':
    for i in range(10): #模擬併發10個客戶端搶票
        name='<路人%s>' %i
        p=Process(target=task,args=(name,))
        p.start()

併發執行,效率高,但競爭寫同一檔案,資料寫入錯亂,只有一張票,賣成功給了10個人

<路人0> 查到剩餘票數1
<路人1> 查到剩餘票數1
<路人2> 查到剩餘票數1
<路人3> 查到剩餘票數1
<路人4> 查到剩餘票數1
<路人5> 查到剩餘票數1
<路人6> 查到剩餘票數1
<路人7> 查到剩餘票數1
<路人8> 查到剩餘票數1
<路人9> 查到剩餘票數1
<路人0> 購票成功
<路人4> 購票成功
<路人1> 購票成功
<路人5> 購票成功
<路人3> 購票成功
<路人7> 購票成功
<路人2> 購票成功
<路人6> 購票成功
<路人8> 購票成功
<路人9> 購票成功
執行結果

加鎖處理:購票行為由併發變成了序列,犧牲了執行效率,但保證了資料安全

#把檔案db.txt的內容重置為:{"count":1}
from multiprocessing import Process,Lock
import time,json

def search(name):
    dic=json.load(open('db.txt'))
    time.sleep(1)
    print('\033[43m%s 查到剩餘票數%s\033[0m' %(name,dic['count']))

def get(name):
    dic=json.load(open('db.txt'))
    time.sleep(1) #模擬讀資料的網路延遲
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(1) #模擬寫資料的網路延遲
        json.dump(dic,open('db.txt','w'))
        print('\033[46m%s 購票成功\033[0m' %name)

def task(name,lock):
    search(name)
    with lock: #相當於lock.acquire(),執行完自程式碼塊自動執行lock.release()
        get(name)

if __name__ == '__main__':
    lock=Lock()
    for i in range(10): #模擬併發10個客戶端搶票
        name='<路人%s>' %i
        p=Process(target=task,args=(name,lock))
        p.start()
<路人0> 查到剩餘票數1
<路人1> 查到剩餘票數1
<路人2> 查到剩餘票數1
<路人3> 查到剩餘票數1
<路人4> 查到剩餘票數1
<路人5> 查到剩餘票數1
<路人6> 查到剩餘票數1
<路人7> 查到剩餘票數1
<路人8> 查到剩餘票數1
<路人9> 查到剩餘票數1
<路人0> 購票成功
執行結果

加鎖可以保證多個程序修改同一塊資料時,同一時間只能有一個任務可以進行修改,即序列地修改,沒錯,速度是慢了,但犧牲了速度卻保證了資料安全。

雖然可以用檔案共享資料實現程序間通訊,但問題是:

1、效率低(共享資料基於檔案,而檔案是硬碟上的資料)

2、需要自己加鎖處理

因此我們最好找尋一種解決方案能夠兼顧:

1、效率高(多個程序共享一塊記憶體的資料)

2、幫我們處理好鎖問題。

這就是mutiprocessing模組為我們提供的基於訊息的IPC通訊機制:佇列和管道。

佇列和管道都是將資料存放於記憶體中,而佇列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,因而佇列才是程序間通訊的最佳選擇。

我們應該儘量避免使用共享資料,儘可能使用訊息傳遞和佇列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往往可以獲得更好的可獲展性。

五、程序間的通訊

IPC 指的是程序間通訊 之所以開啟子程序 肯定需要它幫我們完成任務 很多情況下 需要將資料返回給父程序 然而 程序記憶體是物理隔離的 解決方案: 1.將共享資料放到檔案中 就是慢 2.管道 subprocess中的那個 管道只能單向通訊 必須存在父子關係 3.共享一塊記憶體區域 得作業系統幫你分配 速度快

from multiprocessing import  Process,Manager
import time

def task(dic):
    print("子程序xxxxx")
    # li[0] = 1
    # print(li[0])
    dic["name"] = "xx"

if __name__ == '__main__':
    m = Manager()
    # li = m.list([100])
    dic = m.dict({})
    # 開啟子程序
    p = Process(target=task,args=(dic,))
    p.start()
    time.sleep(3)
    print(dic)

六、殭屍程序和孤兒程序

一個程序任務執行完就死亡了 但是作業系統不會立即將其清理 為的是 開啟這個子程序的父程序可以訪問到這個子程序的資訊這樣的 任務完成的 但是沒有被作業系統清理的程序稱為殭屍程序 越少越好

孤兒程序 無害!  沒有爹的稱為孤兒 一個父程序已經死亡 然而他的子孫程序 還在執行著 這時候 作業系統會接管這些孤兒程序

七、佇列介紹

程序彼此之間互相隔離,要實現程序間通訊(IPC),multiprocessing模組支援兩種形式:佇列和管道,這兩種方式都是使用訊息傳遞的

建立佇列的類(底層就是以管道和鎖定的方式實現)

Queue([maxsize]):建立共享的程序佇列,Queue是多程序安全的佇列,可以使用Queue實現多程序之間的資料傳遞。

引數介紹:

maxsize是佇列中允許最大項數,省略則無大小限制。
但需要明確:
    1、佇列記憶體放的是訊息而非大資料
    2、佇列佔用的是記憶體空間,因而maxsize即便是無大小限制也受限於記憶體大小

主要方法介紹:

q.put方法用以插入資料到佇列中。
q.get方法可以從佇列讀取並且刪除一個元素。

佇列使用:

from multiprocessing import Process,Queue

q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(1)
q.put(2)
q.put(3)
print(q.full()) #滿了
# q.put(4) #再放就阻塞住了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
# print(q.get()) #再取就阻塞住了
"""
    程序間通訊的另一種方式 使用queue
    queue  佇列
    佇列的特點:
        先進的先出
        後進後出
        就像扶梯
"""
from multiprocessing import Process,Queue


# 基礎操作 必須要掌握的
# 建立一個佇列
# q = Queue()
# # 存入資料
# q.put("hello")
# q.put(["1","2","3"])
# q.put(1)
# # 取出資料
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get())

# 阻塞操作 必須掌握
# q = Queue(3)
# # # 存入資料
# q.put("hello",block=False)
# q.put(["1","2","3"],block=False)
# q.put(1,block=False)
# 當容量滿的時候 再執行put 預設會阻塞直到執行力了get為止
# 如果修改block=False 直接報錯 因為沒地方放了
# q.put({},block=False)
#
#  取出資料
# print(q.get(block=False))
# print(q.get(block=False))
# print(q.get(block=False))
# # 對於get   當佇列中中沒有資料時預設是阻塞的  直達執行了put
# # 如果修改block=False 直接報錯 因為沒資料可取了
# print(q.get(block=False))

八、生產消費者模型:

1.生產者消費者模型 模型 設計模式 三層結構 等等表示的都是一種程式設計套路 生產者指的是能夠產生資料的一類任務 消費者指的是處理資料的一類任務

需求: 資料夾裡有十個文字文件 要求你找出檔案中包含習大大關鍵字的檔案開啟並讀取檔案資料就是生產者查詢關鍵字的過程就是消費者

生產者消費者模型為什麼出現? 生產者的處理能力與消費者的處理能力 不匹配不平衡 導致了一方等待另一方 浪費時間 目前我們通過多程序將生產 和 消費 分開處理 然後將生產者生產的資料通過佇列交給消費者

總結一下在生產者消費者模型中 不僅需要生產者消費者 還需要一個共享資料區域 1.將生產方和消費方耦合度降低 2.平衡雙方的能力 提高整體效率

上程式碼 : 搞兩個程序 一個負責生產 一個負責消費

from multiprocessing import Process,Queue
# 製作熱狗
def make_hotdog(queue,name):
    for i in range(3):
        time.sleep(random.randint(1,2))
        print("%s 製作了一個熱狗 %s" % (name,i))
        # 生產得到的資料
        data = "%s生產的熱狗%s" % (name,i)
        # 存到佇列中
        queue.put(data)
    # 裝入一個特別的資料 告訴消費方 沒有了
    #queue.put(None)


# 吃熱狗
def eat_hotdog(queue,name):
    while True:
        data = queue.get()
        if not data:break
        time.sleep(random.randint(1, 2))
        print("%s 吃了%s" % (name,data))

if __name__ ==