1. 程式人生 > >進程(並發,並行)

進程(並發,並行)

stream 切換 了解 windows系統 分享圖片 聊天 字符 main lob

一、背景知識

  顧名思義,進程即正在執行的一個過程。進程是對正在運行程序的一個抽象。進程的概念起源於操作系統,是操作系統最核心的概念,也是操作系統提供的最古老也是最重要的抽象概念之一。操作系統的其他所有內容都是圍繞進程的概念展開的。

  PS:即使可以利用的cpu只有一個(早期的計算機確實如此),也能保證支持(偽)並發的能力。將一個單獨的cpu變成多個虛擬的cpu(多道技術:時間多路復用和空間多路復用+硬件上支持隔離),沒有進程的抽象,現代計算機將不復存在。

必備理論

#一 操作系統的作用:
    1:隱藏醜陋復雜的硬件接口,提供良好的抽象接口
    2:管理、調度進程,並且將多個進程對硬件的競爭變得有序

#二 多道技術: 1.產生背景:針對單核,實現並發 ps: 現在的主機一般是多核,那麽每個核都會利用多道技術 有4個cpu,運行於cpu1的某個程序遇到io阻塞,會等到io結束再重新調度,會被調度到4個 cpu中的任意一個,具體由操作系統調度算法決定。 核心 2.空間上的復用:如內存中同時有多道程序 3.時間上的復用:復用一個cpu的時間片 強調:遇到io切,占用cpu時間過長也切,核心在於切之前將進程的狀態保存下來,這樣 才能保證下次切換回來時,能基於上次切走的位置繼續運行

二、多進程概念

1、進程:正在進行的一個過程或者說一個任務。而負責執行任務則是cpu。進程有如下三個狀態:

技術分享圖片

  其實在兩種情況下會導致一個進程在邏輯上不能運行,

  1. 進程掛起是自身原因,遇到I/O阻塞,便要讓出CPU讓其他進程去執行,這樣保證CPU一直在工作

  2. 與進程無關,是操作系統層面,可能會因為一個進程占用時間過多,或者優先級等原因,而調用其他的進程去使用CPU。

2、進程與程序的區別:程序僅僅只是一堆代碼而已,而進程指的是程序的運行過程。

 強調:同一個程序執行兩次,那也是兩個進程,比如打開網易雲音樂,雖然都是同一個軟件,但是一個可以播放音樂,一個可以播放mv。

舉例(單核+多道,實現多個進程的並發執行):
    egon在一個時間段內有很多任務要做:python備課的任務,寫書的任務,交女朋友的任務,王者榮耀上分的任務,  
    但egon同一時刻只能做一個任務(cpu同一時間只能幹一個活),如何才能玩出多個任務並發執行的效果?
    egon備一會課,再去跟李傑的女朋友聊聊天,再去打一會王者榮耀....這就保證了每個任務都在進行中

3、並發與並行:

無論是並行還是並發,在用戶看來都是‘同時‘運行的,不管是進程還是線程,都只是一個任務而已,真是幹活的是cpu,cpu來做這些任務,而一個cpu同一時刻只能執行一個任務

  並發:針對只有一個cpu執行多個進程的情況。是偽並行,即看起來是同時運行。單個cpu+多道技術就可以實現並發。

  並行:針對多個cpu執行多個進程的情況,並行也屬於並發。

  單核下,可以利用多道技術,多個核,每個核也都可以利用多道技術(多道技術是針對單核而言的

有四個核,六個任務,這樣同一時間有四個任務被執行,假設分別被分配給了cpu1,cpu2,cpu3,cpu4,

一旦任務1遇到I/O就被迫中斷執行,此時任務5就拿到cpu1的時間片去執行,這就是單核下的多道技術

而一旦任務1的I/O結束了,操作系統會重新調用它(需知進程的調度、分配給哪個cpu運行,由操作系統說了算),可能被分配給四個cpu中的任意一個去執行

技術分享圖片

三、同步\異步and阻塞\非阻塞

同步:

#所謂同步,就是在發出一個功能調用時,在沒有得到結果之前,該調用就不會返回。按照這個定義,其實絕大多數函數都是同步調用。但是一般而言,我們在說同步、異步的時候,特指那些需要其他部件協作或者需要一定時間完成的任務。
#舉例:
#1. multiprocessing.Pool下的apply #發起同步調用後,就在原地等著任務結束,根本不考慮任務是在計算還是在io阻塞,總之就是一股腦地等任務結束
#2. concurrent.futures.ProcessPoolExecutor().submit(func,).result()
#3. concurrent.futures.ThreadPoolExecutor().submit(func,).result()

異步:

#異步的概念和同步相對。當一個異步功能調用發出後,調用者不能立刻得到結果。當該異步功能完成後,通過狀態、通知或回調來通知調用者。如果異步功能用狀態來通知,那麽調用者就需要每隔一定時間檢查一次,效率就很低(有些初學多線程編程的人,總喜歡用一個循環去檢查某個變量的值,這其實是一 種很嚴重的錯誤)。如果是使用通知的方式,效率則很高,因為異步功能幾乎不需要做額外的操作。至於回調函數,其實和通知沒太多區別。
#舉例:
#1. multiprocessing.Pool().apply_async() #發起異步調用後,並不會等待任務結束才返回,相反,會立即獲取一個臨時結果(並不是最終的結果,可能是封裝好的一個對象)。
#2. concurrent.futures.ProcessPoolExecutor(3).submit(func,)
#3. concurrent.futures.ThreadPoolExecutor(3).submit(func,)

阻塞:

#阻塞調用是指調用結果返回之前,當前線程會被掛起(如遇到io操作)。函數只有在得到結果之後才會將阻塞的線程激活。有人也許會把阻塞調用和同步調用等同起來,實際上他是不同的。對於同步調用來說,很多時候當前線程還是激活的,只是從邏輯上當前函數沒有返回而已。
#舉例:
#1. 同步調用:apply一個累計1億次的任務,該調用會一直等待,直到任務返回結果為止,但並未阻塞住(即便是被搶走cpu的執行權限,那也是處於就緒態);
#2. 阻塞調用:當socket工作在阻塞模式的時候,如果沒有數據的情況下調用recv函數,則當前線程就會被掛起,直到有數據為止。

非阻塞:

#非阻塞和阻塞的概念相對應,指在不能立刻得到結果之前也會立刻返回,同時該函數不會阻塞當前線程。

小結:

#1. 同步與異步針對的是函數/任務的調用方式:同步就是當一個進程發起一個函數(任務)調用的時候,一直等到函數(任務)完成,而進程繼續處於激活狀態。而異步情況下是當一個進程發起一個函數(任務)調用的時候,不會等函數返回,而是繼續往下執行當,函數返回的時候通過狀態、通知、事件等方式通知進程任務完成。

#2. 阻塞與非阻塞針對的是進程或線程:阻塞是當請求不能滿足的時候就將進程掛起,而非阻塞則不會阻塞當前進程

四、多進程實現

1、multiprocessing模塊介紹

  Python提供了multiprocessing。multiprocessing模塊用來開啟子進程,並在子進程中執行我們定制的任務(比如函數),該模塊與多線程模塊threading的編程接口類似。 multiprocessing模塊的功能眾多:支持子進程、通信和共享數據、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件。 需要再次強調的一點是:與線程不同,進程沒有任何共享狀態,進程修改的數據,改動僅限於該進程內。

2、Process類介紹:為創建進程的類

主要方法:

#p為子進程

p.start():啟動進程,並調用該子進程中的p.run() 
p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法  
p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那麽也將不會被釋放,進而導致死鎖
p.is_alive():如果p仍然運行,返回True
p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程

主要屬性:

#p為子進程
p.daemon:默認值為False,如果設為True,代表p為後臺運行的守護進程,當p的父進程終止時,p也隨之終止,並且設定為True後,p不能創建自己的新進程,必須在p.start()之前設置
p.name:進程的名稱
p.pid:進程的pid
p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可)
p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)

3、創建並開啟子進程

  方式一:

from multiprocessing import Process
import time
def work(name):
    print(%s is piaoing %name)
    time.sleep(5)
    print(%s piao end %name)
if __name__==__main__:                 #windows系統必須加
    p=Process(target=work,args=(egon,))#args傳參必須是個元組,也可以以字典形式傳:kwargs={‘name‘:‘egon‘}
    p.start()
    print(主進程)

方式二:

from multiprocessing import Process
import time
class work(Process):
    def __init__(self,name):
        super(work, self).__init__()
        self.name=name
    def run(self):                            #方法名run()不可以更換
        print(%s is piaoing %self.name)
        time.sleep(5)
        print(% piao end %self.name)
if __name__==__main__:                      #windows系統必須加
    p=work(egon)
    p.start()
    print(主進程)

註意:進程的直接內存空間是彼此隔離的,如下例:

from multiprocessing import Process
n=100                 
#在windows系統中應該把全局變量定義在if __name__ == ‘__main__‘之上 def work(): global n n=0 print(子進程內: ,n) if __name__ == __main__: p=Process(target=work) p.start() #結果總為:0 print(主進程內: ,n) #結果總為:100

4、socket並發編程實例

  可以實現多個客戶端與服務端進行交流。

服務端:

from socket import *
from multiprocessing import Process

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind((127.0.0.1,8080))
server.listen(5)

def talk(conn):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break

if __name__ == __main__:
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,))
        p.start()

客戶端:

#多個客戶端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect((127.0.0.1,8080))
while True:
    msg=input(>>: ).strip()
    if not msg:continue

    client.send(msg.encode(utf-8))
    msg=client.recv(1024)
    print(msg.decode(utf-8))

5、jion()方法詳解

  join方法的主要作用是等待子進程結束後執行主進程。

並行效果:

from multiprocessing import Process
import time
def piao(name):
    print(%s is piaoing %name)
    time.sleep(3)
    print(%s is piao end %name)
if __name__==__main__:
    p1=Process(target=piao,args=(egon,))
    p2=Process(target=piao,args=(alex,))
    p3=Process(target=piao,args=(yuanhao,))
    p4=Process(target=piao,args=(wupeiqi,))
    start_time=time.time()
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p1.join()
    p2.join()
    p3.join()
    p4.join()
    end_time=time.time()
    print(end_time-start_time)                   #結果為:3.多
    print(主線程)                               #最後才被打印

  解釋:p.join()是讓主線程等待p的結束,卡住的是主線程而絕非進程p,進程只要start就會在開始運行了,所以p1-p4.start()時,系統中已經有四個並發的進程了而我們p1.join()是在等p1結束,沒錯p1只要不結束主線程就會一直卡在原地,這也是問題的關鍵join是讓主線程等,而p1-p4仍然是並發執行的,p1.join的時候,其余p2,p3,p4仍然在運行,等p1.join結束,可能p2,p3,p4早已經結束了,這樣p2.join,p3.join.p4.join直接通過檢測,無需等待所以4個join花費的總時間仍然是耗費時間最長的那個進程運行的時間。

串行效果:

from multiprocessing import Process
import time
def piao(name):
    print(%s is piaoing %name)
    time.sleep(3)
    print(%s is piao end %name)
if __name__==__main__:
    p1=Process(target=piao,args=(egon,))
    p2=Process(target=piao,args=(alex,))
    p3=Process(target=piao,args=(yuanhao,))
    p4=Process(target=piao,args=(wupeiqi,))
    start_time=time.time()
    p1.start()
    p1.join()
    p2.start()
    p2.join()
    p3.start()
    p3.join()
    p4.start()
    p4.join()
    end_time=time.time()
    print(end_time-start_time)  #結果為:12.多
    print(主線程)

解釋:以上p1,p2,p3,p4進程是上一個子進程執行完才逐一被啟動,形成串行效果。

進程(並發,並行)