1. 程式人生 > >Python實戰之processing程序(多程序,程序池,lock鎖,程序間的通訊)

Python實戰之processing程序(多程序,程序池,lock鎖,程序間的通訊)

首先須知

什麼是IO?

從硬碟讀塊資料,從網路讀塊資料屬於IO操作(IO操作不佔用cpu)

計算佔用cpu,如:1+1

Python多執行緒其實就是一個執行緒,由於利用CUP的上下文切換看起來就像是併發..上下文切換消耗資源

Python多執行緒 不適合CPU密集操作型的任務,適合IO操作密集型的任務

大量運算佔CPU儘量少用多執行緒,用單程序更快

sockeserver接收多個網路併發的就是IO操作密集型的

如果一定要使用CPU密集操作型的任務呢?Python怎麼解決?

使用多程序來解決CPU密集操作型的任務。

Python的執行緒是呼叫作業系統的原生執行緒,程序也是呼叫作業系統的原生程序,原生程序是由作業系統自己維護的。

Python只是呼叫了C程式碼庫的一個介面啟動程序的,真正的程序管理還是作業系統自己完成的

 

程序processing

1.程式是不能單獨執行的,只能裝載到記憶體,系統為他分配資源進行執行,這種執行程式稱之程序

程序同一時間只能幹一件事,每個程序只能控制CPU一個核,由於多核同時可以多個程序,由於計算機的速度太快,1秒幹了N件事,讓我們看起來就等於同時幹了N件事

2.程序要操作CUP必須建立一個執行緒,程序本身是不可以執行的,通過執行緒才可以進行操作CPU

3.得出結論,程序至少要有一個執行緒,否則就無法執行

多執行緒簡單版

__author__ = "Burgess Zheng"

import multiprocessing
import time
import threading

def thread_run():#測試執行緒用的
    print(threading.get_ident()) #threading.get_ident():顯示執行緒號

def run(name):
    time.sleep(2)
    print("hello",name)
    t = threading.Thread(target=thread_run,)#例項化執行緒
    t.start()#啟動執行緒

#迴圈啟動多個程序併發
if __name__ == '__main__':
    for i in range(10):#迴圈啟動10程序
        p = multiprocessing.Process(target=run,args=('bob%s'%i,))
        p.start()
#和執行緒格式一樣的

執行結果:

 

父程序 主程序 子程序的原理以及獲取程序號的試驗

__author__ = "Burgess Zheng"

from multiprocessing import Process
import os


def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
#win7: os.getppid獲得父程序的PID #如果是主程序呼叫該函式列印結果:父程序是Pycharm(PID)
#linux: os.getpid得父程序的PID  #如果是主程序呼叫該函式列印結果:父程序是Linux根程序(PID1)
#得出結論:每個主程序都有一個固定的父程序:win7:pycharm(PIDx)   Linux:根程序(PID1)
#os.getppid獲得父程序的PID #如果是子程序呼叫該函式列印結果:的父程序就是啟動他的程序
#得出結論:每個字程序的父程序是啟動該子程序的程序

    print('process id:', os.getpid())
#os.getpid 呼叫者自己的程序PID
#如果是主程序呼叫就是主程序的PID
#如果是子程序呼叫就是子程序的PID
    print("\n\n")


def f(name):
    info('\033[31;1mcalled from child process function f\033[0m')
    #子程序呼叫info函式 #該子程序的父程序是主程序
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')#主程序呼叫info函式
    p = Process(target=f, args=('bob',))#例項化一個子程序
    p.start()#啟動該子程序
    # p.join()
"get_ProcessID.py" [New] 33L, 1301C written

執行結果:

程序lock

Without using the lock output from the different processes is liable to get all mixed up.

如果不使用來自不同程序的鎖輸出,則很容易混淆。

lock實戰

[[email protected] Part_ten]# vim lock_process.py 
__author__ = "Burgess Zheng"

from multiprocessing import Process, Lock


def f(l, i):#第一個形參接收的是鎖,第二個接收的是值
    l.acquire()
    print('hello world', i)
    l.release()
#不是說過程序不需要鎖嗎?現在為什麼要鎖?
#因為我們共享一個螢幕(win7試不出來)
#如果是linux 如果不加鎖,由於共享一個螢幕:出現以下情況
# 可能看到的資料是:某個程序列印hello world 還沒列印顯示完該字串,
# 另外個程序可能比較快,就造成了字串顯示混亂
#但是其實沒多大影響,知識觀看效果有影響而已
#Linux python2才會   Python3不會

if __name__ == '__main__':
    lock = Lock()#生產一個鎖

    for num in range(10):
        Process(target=f, args=(lock, num)).start()#例項子程序,鎖作為實參

執行結果:

 

程序池

程序池內部維護一個程序序列,當使用時,則去程序池中獲取一個程序,如果程序池序列中沒有可供使用的程序,那麼程式就會等待,直到程序池中有可用程序為止。

程序池中有兩個方法:1.apply   2.apply_async

程序池的作用:限定多少個子程序可以同時執行

 和 執行緒的Semaphore(訊號量)允許同時最多幾個執行緒同時執行一樣

 

程序池試驗

__author__ = "Burgess Zheng"

from  multiprocessing import Process, Pool
import time
import os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:', arg,os.getpid())

if __name__ == '__main__':
    #該語句的意思是,如果你主動執行這個指令碼,那麼裡面的程式碼就執行
    #如果其他的Python檔案通過模組呼叫執行該指令碼,不會執行裡面的程式碼
    #注意:在window系統啟動多執行緒就需要增加該語句,否則報錯,Linux不用

    pool = Pool(processes=5) #允許程序池同時放入5個程序。簡寫:(5)
    print("主程序",os.getpid())
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,), callback=Bar)#例項化pool子程序
            #callback=回撥 .. 子程序執行完Foo函式後,進行回撥執行Bar函式(但是主程序執行Bar函式)
            #callback=回撥作用:假如主程序可以調取資料庫,那麼子程序就無需連線資料庫 但也可以調取資料庫
            #不然有1000個子程序,個個都需要連線資料庫,效率就很低
        
        #pool.apply(func=Foo, args=(i,)) #例項化pool子程序 #apply:同步執行、序列
        #pool.apply_async(func=Foo, args=(i,))#例項化pool子程序 #apply_async:非同步執行、並行
    print('end')
    pool.close() #一般正常是join在close  但是程序池有點特別需要,需要close在join,否則報錯
    pool.join() #如果不加join是不會執行程序池例項化的子程序

執行結果:

程序間的通訊(QUEUES,PIPE,MANAGER)

Queues 實戰

__author__ = "Burgess Zheng"

from multiprocessing import Process, Queue
import threading
import queue

'''
def f():
    q.put([42, None, 'hello'])
    #執行緒與執行緒之間可以共享資料
    #所以可以直接追加到主執行緒生成的執行緒佇列容器
    
if __name__ == '__main__':
    q = queue.Queue()#生成執行緒佇列容器
    p = threading.Thread(target=f,)#例項化執行緒
    p.start()#啟動執行緒
    print(q.get()) #調取佇列容器的資料 結果顯示:"[42, None, 'hello']"
'''


#程序本身是不可以共享資料的,也不可以互相訪問,但是可以通過程序Queue實現共享資料
def f(qq):
    print("in child:",qq.qsize())
    #主程序啟動了子程序後開始執行  qq == q = Queue()
    #qq.qsize == q.qsize  該程序Queue佇列容器是沒有資料,所以列印結果是空
    qq.put([42, None, 'hello'])
    #q.put()追加一個列表資料進入程序Queue佇列容器
    #這樣的話主程序就可以取該資料了
    #我們都做到程序和程序之間是獨立的不可通訊的,
    #但是可以通過程序Queue()實現程序之間的通訊
if __name__ == '__main__':
    q = Queue()#生產程序Queue佇列容器:可傳遞給子程序達到程序之間的共享資料
    p = Process(target=f, args=(q,))#例項化子程序,並且把程序Q當做實參傳遞過去
    p.start()#啟動子程序
    print(q.get())  # 調取程序Queue資料 結果顯示:"[42, None, 'hello']"
    p.join()

     # prints "[42, None, 'hello']"
    #print(q.get())  # prints "[42, None, 'hello']"

#原理:兩個程序兩個程序Queue()佇列容器(子程序複製了主程序的程序Queue()佇列容器 )
#如果子程序p追加了資料到自身程序Queue佇列容器裡面
#主程序要獲取獲取到子程序p的程序Queue佇列容器的資料
#獲取過程:
#程序Queue本身就封裝了序列化和反序列化的兩個過程
#Queue先把子程序p的程序Queue佇列容器裡的資料進行pickle序列化後,
#Queue在進行pickle反序列化到主程序的程序Queue佇列容器裡
#實際不是共享一個Queue,是兩個Queue所以才需要pickle
#實現了資料的傳遞,而不是修改同一份資料

執行結果:

Pipes實戰

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

()函式返回一個對在管道連線的連線物件,預設情況下是雙工(雙向)。例如

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time

兩個連線管()返回的物件代表的兩端管道。每個連線物件傳送()recv()方法(以及其他)。注意,管道中的資料可能會損壞如果兩個程序(或執行緒)試圖從磁碟讀取或寫入相同的管同時結束。當然沒有腐敗的風險過程使用不同的管道在同一時間

__author__ = "Burgess Zheng"

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([42, None, 'hello from child'])#傳送資料給(父)管道另外一邊
    print(conn.recv())#接收來自(父)管道資料
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()#生成管道,管道有兩邊
    p = Process(target=f, args=(child_conn,))#例項化一個子程序,把child_conn邊當做實參
    p.start()#啟動子程序
    print(parent_conn.recv())#接收來自(子)管道的另外一邊
    #print(parent_conn.recv())  # 接收來自(子)管道的另外一邊#如果該管道沒有資料就一直卡住
    parent_conn.send('xiaoming')  # 傳送資料給(子)管道另外一邊
    p.join()

執行結果:

 

Managers實戰:

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array.

一個manager模組物件返回值是由Manager ()控制伺服器程序擁有的Python物件,並允許其他程序使用代理來操縱它們。
一個manager返回由Manager()支援的型別:如list( 列表),dict(字典),Naespace(名稱空間),lock(),RLock(遞迴鎖),訊號量,BoundedSemaphore(類名),condition(條件),event(事件),barrier(障礙),queue(佇列),value()array(陣列)。例如,

[[email protected] Part_ten]# vim manager_process.py
__author__ = "Burgess Zheng"

from multiprocessing import Process, Manager
import os

def f(d, l):
    d[os.getpid()] = os.getpid()#追加每個子程序的PID 到d字典
    l.append(os.getpid())#追加每個子程序的PID到l列表裡面的
    print(l,d)#列印顯示l列表和列印d字典


if __name__ == '__main__':
    with Manager() as manager: #Manager() = manager 這種格式也行
        d = manager.dict() #生產一個可再多個程序之間進行傳遞共享的字典

        l = manager.list(range(5))#生產一個可再多個程序之間進行傳遞共享的列表(預設該列表生產5個數字>
)
        p_list = []#子程序多且主程序需要等子程序執行完畢
                   #就添加個空列表.方便join
        for i in range(10):#迴圈10次等於生成10程序
            p = Process(target=f, args=(d, l))
#例項化子程序 把manager生產列表和字典作為引數傳遞給該子程序
            p.start()#啟動子程序
            p_list.append(p)#所有的子程序追加到該空列表
        for res in p_list:#所有子程序列表匯入
            res.join() #所有子程序等待執行完畢主程序才可以繼續執行

        l.append("from parent")#列表追加from parent字串
        print(d)#列印顯示字典
        print(l)#列印顯示列表

執行結果: