1. 程式人生 > >多執行緒多程序使用場景

多執行緒多程序使用場景

Python36 多執行緒、多程序的使用場景

多執行緒與多程序的使用場景

io 操作不佔用CPU(從硬碟、從網路、從記憶體讀資料都算io)
計算佔用CPU(如1+1計算)

python中的執行緒是假執行緒,不同執行緒之間的切換是需要耗費資源的,因為需要儲存執行緒的上下文,不斷的切換就會耗費資源。。

python多執行緒適合io操作密集型的任務(如socket server 網路併發這一類的);
python多執行緒不適合cpu密集操作型的任務,主要使用cpu來計算,如大量的數學計算。
那麼如果有cpu密集型的任務怎麼辦,可以通過多程序來操作(不是多執行緒)。
假如CPU有8核,每核CPU都可以用1個程序,每個程序可以用1個執行緒來進行計算。
程序之間不需要使用gil鎖,因為程序是獨立的,不會共享資料。
程序可以起很多個,但是8核CPU同時只能對8個任務進行操作。

多程序

測試多程序

import multiprocessing
import time

def run(name):
    time.sleep(2)
    print ('heelo',name)

if __name__ == '__main__':

    for i in range(10): #起了10個程序
        p = multiprocessing.Process(target=run,args=('bob%s' %i,))
        p.start()

執行結果:
heelo bob1
heelo bob0
heelo bob2
heelo bob3
heelo bob5
heelo bob4
heelo bob6
heelo bob7
heelo bob8
heelo bob9

##2秒左右就執行完成了,有幾核CPU,同時就可以處理幾個程序;當然要考慮你的電腦還開啟了N多個其他應用程式,不過CPU計算比較快。

import multiprocessing
import time,threading

def thread_run():
    print (threading.get_ident()) #get_ident獲取當前執行緒id

def run(name):
    time.sleep(2)
    print ('heelo',name)
    t = threading.Thread(target=thread_run,)    #在每個程序中又起了1個執行緒
    t.start()

if __name__ == '__main__':

    for i in
range(10): #起了10個程序 p = multiprocessing.Process(target=run,args=('bob%s' %i,)) p.start() 執行結果: heelo bob0 16684 heelo bob1 15052 heelo bob2 15260 heelo bob3 6192 heelo bob4 6748 heelo bob7 13980 heelo bob5 6628 heelo bob6 3904 heelo bob9 2328 heelo bob8 17072

import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  #獲取父程序的id
    print('process id:', os.getpid())   #獲取自身的id
    print("\n\n")

def f(name):
    info('\033[31;1mfunction f\033[0m')
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')  ##直接呼叫函式
    # p = Process(target=f, args=('bob',))
    # p.start()
    # p.join()

執行結果:
main process line
module name: __main__
parent process: 1136    #父程序ID,這個父程序就是pycharm
process id: 16724   #這個子程序就是python的程式碼程式
##每個程序都會有一個父程序。

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  #獲取父程序的id
    print('process id:', os.getpid())   #獲取自身的id
    print("\n\n")

def f(name):
    info('\033[31;1mcalled from child process function f\033[0m')
    print('hello', name)

if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')
    p = Process(target=f, args=('bob',))    #設定子程序
    p.start()   #啟動子程序
    # p.join()

執行結果:
main process line
module name: __main__
parent process: 1136    #主程序pycharm
process id: 14684       #子程序python程式碼

called from child process function f
module name: __mp_main__
parent process: 14684   #主程序python程式碼(1136的子程序)
process id: 15884       #python程式碼(主程序14684)中的子程序的子15884
## 每個程序都有主程序(父程序)

hello bob

程序間通訊

預設程序之間資料是不共享的,如果一定要實現互訪可以通過Queue來實現,這個Queue和執行緒中的Queue使用方法一樣,不過執行緒中的Queue只能在執行緒之間使用。

執行緒

import queue
import threading

def f():
    q.put([42,None,'heelo'])

if __name__ == '__main__':
    q = queue.Queue()     
    p = threading.Thread(target=f,)

    p.start()

    print (q.get())
    p.join()

執行結果:
[42, None, 'heelo']
## 通過子執行緒put進去資料,然後在主執行緒get出內容,表明執行緒之間資料是可以共享的。
程序

import queue
from multiprocessing import Process

def f():
    q.put([42,None,'heelo'])    #這裡的q屬於主程序

if __name__ == '__main__':
    q = queue.Queue()   #主程序起的q
    p = Process(target=f,)
    ## 在主程序中來定義子程序;如果在主程序中啟動了子程序,那麼主程序和子程序之間記憶體是獨立的。
    ## 因為記憶體獨立,子程序p是無法訪問主程序def f()中的q的。
    p.start()

    print (q.get())
    p.join()

執行結果:
Process Process-1:
Traceback (most recent call last):
  File "D:\python3.6.4\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "D:\python3.6.4\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "E:\python\程式碼練習\A3.py", line 7, in f
    q.put([42,None,'heelo'])    
NameError: name 'q' is not defined

##可以看到已經報錯,這是因為子程序不能訪問主程序的q

import queue
from multiprocessing import Process

def f(qq):
    qq.put([42,None,'heelo'])

if __name__ == '__main__':
    q = queue.Queue()
    p = Process(target=f,args=(q,)) #將父程序q傳給子程序

    p.start()

    print (q.get())
    p.join()

執行結果:

Traceback (most recent call last):
  File "E:/python/程式碼練習/A3.py", line 13, in <module>
    p.start()
  File "D:\python3.6.4\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "D:\python3.6.4\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "D:\python3.6.4\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "D:\python3.6.4\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "D:\python3.6.4\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

## 這是因為我們將執行緒的q傳給另一個程序,這是不可以的,執行緒只屬於當前程序,不能傳給其他程序。
## 如果想將q傳給子程序,那麼必須將程序q傳進去,而不是執行緒q。

from multiprocessing import Process,Queue
##大寫的Queue是程序佇列; queue是執行緒佇列
##大寫的Queue需要從multiprocessing匯入

def f(qq):
    qq.put([42,None,'heelo'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f,args=(q,)) #將父程序q傳給子程序

    p.start()

    print (q.get()) #父程序去get子程序的內容
    p.join()

執行結果:
[42, None, 'heelo']
##父程序可以get子程序put進去的內容了;從表面上看感覺是兩個程序共享了資料,其實不然。

## 現在已經實現了程序間的通訊。父程序將q傳給子程序,其實是克隆了一份q給子程序,此時子程序就多了一個q程序佇列;  但是父程序又為什麼能夠get子程序put進去的資料呢,這是因為當前兩個程序在記憶體空間依然是獨立的,只不過子程序put的資料 通過pickle序列化放到記憶體中一箇中間的位置,然後父程序從這個中間的位置取到資料(而不是從子程序中取的資料)。  所以程序間的通訊不是共享資料,而是一個數據的傳遞。
程序之間的資料還可以通過管道的方式來通訊

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello from child1'])  #傳送資料給parent_conn
    conn.close()    #發完資料需要關閉

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    ## 生成管道。 生成時會產生兩個返回物件,這兩個物件相當於兩端的電話,通過管道線路連線。
    ## 兩個物件分別交給兩個變數。
    p = Process(target=f, args=(child_conn,))   #child_conn需要傳給對端,用於send資料給parent_conn
    p.start()
    print(parent_conn.recv())  #parent_conn在這端,用於recv資料
    p.join()

執行結果:
[42, None, 'hello from child1']

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello from child1'])
    conn.send([42, None, 'hello from child2'])  #傳送兩次資料
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())  
    p.join()

執行結果:
[42, None, 'hello from child1']
## 可以看到這端只接收到了一次資料
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello from child1'])
    conn.send([42, None, 'hello from child2'])  
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())   #第二次接收資料
    p.join()

執行結果:
[42, None, 'hello from child1']
[42, None, 'hello from child2']
##對端傳送幾次,這端就需要接收幾次

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello from child1'])
    conn.send([42, None, 'hello from child2'])  #傳送兩次資料
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())
    print(parent_conn.recv())   #對端傳送兩次,本段接收三次
    p.join()

執行結果:
[42, None, 'hello from child1']
[42, None, 'hello from child2']
## 程式卡主了,除非對端在傳送一次資料。

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello from child1'])
    conn.send([42, None, 'hello from child2'])  #傳送兩次資料
    print (conn.recv()) #接收資料
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())
    parent_conn.send("data from parent_conn")   #傳送資料
    p.join()

執行結果:
[42, None, 'hello from child1']
[42, None, 'hello from child2']
data from parent_conn

##通過管道實現了相互發送接收資料(實現了資料傳遞)

程序間資料互動及共享

from multiprocessing import Process, Manager
import os

def f(d, l):
    d[1] = '1'  #放入key和value到空字典中
    d['2'] = 2
    d[0.25] = None

    l.append(os.getpid()) #將每個程序的id值放入列表中;每個程序的id值都不同。
    print(l)

if __name__ == '__main__':
    with Manager() as manager:  #做一個別名,此時manager就相當於Manager()
        d = manager.dict()  #生成一個可在多個程序之間傳遞和共享的字典

        l = manager.list(range(5))  #生成一個可在多個程序之間傳遞和共享的列表;通過range(5)給列表中生成5個數據
        p_list = []
        for i in range(10): #生成10個程序
            p = Process(target=f, args=(d, l))  #將字典和列表傳給每個程序,每個程序可以進行修改
            p.start()
            p_list.append(p)    # 將每個程序放入空列表中
        for res in p_list:
            res.join()

        print(d)    #所有程序都執行完畢後列印字典
        print(l)    #所有程序都執行完畢後列印列表

執行結果:
[0, 1, 2, 3, 4, 15788] 
#列表生成的時候自動加入了0-4這5個數字;然後每個程序都將各自的pid加入到列表。
[0, 1, 2, 3, 4, 15788, 1568]
[0, 1, 2, 3, 4, 15788, 1568, 7196]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412, 5368]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412, 5368, 3092] #第10個程序列印的列表中有10個程序的pid
{1: '1', '2': 2, 0.25: None}    #最後列印的字典
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412, 5368, 3092] #最後列印的列表

from multiprocessing import Process, Manager
import os

def f(d, l):
    d[os.getpid()] = os.getpid()

    l.append(os.getpid())
    print(l)

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()  #對字典做個調整,也將pid加入到字典中

        l = manager.list(range(5))
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()

        print(d)
        print(l)

執行結果:
[0, 1, 2, 3, 4, 2240]
[0, 1, 2, 3, 4, 2240, 10152]
[0, 1, 2, 3, 4, 2240, 10152, 10408]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976, 16532]
{2240: 2240, 10152: 10152, 10408: 10408, 6312: 6312, 17156: 17156, 6184: 6184, 16168: 16168, 11384: 11384, 15976: 15976, 16532: 16532}
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976, 16532]

##現在我們看到可以實現程序間的資料共享、修改和傳遞。
##Manager()自帶鎖,會控制程序之間同一時間修改資料;
##字典和列表的資料不是一份,而是因為10個程序,所以有10個字典和10個列表。每個程序修改後,都會copy給其他程序,其他程序可以對最新的資料進行修改,所以資料不會被修改亂。

程序同步

在程序裡面也有鎖


from multiprocessing import Process, Lock   #從multiprocessing匯入Lock這個鎖

def f(l, i):
    l.acquire()     #獲取修改資料的鎖
    print('hello world', i)
    l.release()     #釋放鎖

if __name__ == '__main__':
    lock = Lock()   #例項鎖

    for num in range(10):   #生成10個程序
        Process(target=f, args=(lock, num)).start() #執行子程序並傳入引數給子程序

執行結果:
hello world 1
hello world 4
hello world 0
hello world 3
hello world 2
hello world 5
hello world 6
hello world 8
hello world 7
hello world 9
## 可以看到一共10個程序,並不是連續的,說明執行程序的時候說不準先執行哪個程序。

##程序之間資料是獨立的,這裡我們為什麼又要加鎖呢,這是因為所有程序使用同一個螢幕來輸出資料;比如 我們現在輸出的資料是 hello world x,在輸出的過程中很有可能其中一個程序還沒輸出完(比如只輸出了hello wo),另一個程序就執行輸出了(可能會在螢幕上看到hello wohello world0201的現象)。  所以需要通過鎖來控制同一時間只能有一個程序輸出資料到螢幕。

程序池

執行多程序,子程序會從主程序複製一份完整資料,1個、10個程序可能還沒什麼感覺,但是如果有100或1000,甚至更多個程序的時候開銷就會特別大,就會明顯感覺到多程序執行有卡頓現象。

程序池可以設定同一時間有多少個程序可以在CPU上執行。


from  multiprocessing import Process, Pool
#從multiprocessing匯入pool

import time,os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid()) #列印程序id
    return i + 100

def Bar(arg):
    print('-->exec done:', arg)

if __name__ == '__main__':  ##這行程式碼用途是如果主動執行該程式碼的.py檔案,則該程式碼下面的程式碼可以被執行;如果該.py模組被匯入到其他模組中,從其他模組執行該.py模組,則該行下面的程式碼不會被執行。  有些時候可以用這種方式用於測試,在該行程式碼下面寫一些測試程式碼。。
    pool = Pool(5)  #同時只能放入5個程序

    for i in range(10): #建立10個程序,但是因為pool的限制,只有放入程序池中的5個程序才會被執行(),其他的被掛起了,如果程序池中其中有兩個程序執行完了,就會補進2個程序進去。
        # pool.apply_async(func=Foo, args=(i,), callback=Bar)
        pool.apply(func=Foo, args=(i,)) #pool.apply用來將程序放入pool

    print('end')    #執行完畢
    pool.close()    #允許pool中的程序關閉(close必須在join前面,可以理解close相當於一個開關吧)
    pool.join()  # 程序池中程序執行完畢後再關閉,如果註釋,那麼程式直接關閉。

執行結果:
in process 2240
in process 3828
in process 16396
in process 11848
in process 11636
in process 2240
in process 3828
in process 16396
in process 11848
in process 11636
end

##可以看到通過序列的方式將結果打印出來,這是因為我們使用的是pool.apply。 pool.apply就是通過序列的方式來執行。
from  multiprocessing import Process, Pool
import time,os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:', arg)

if __name__ == '__main__':
    pool = Pool(5)

    for i in range(10):
        pool.apply_async(func=Foo, args=(i,))
        ## 使用pool.apply_async就可以並行了

    print('end')
    pool.close()
    # pool.join()   註釋掉

執行結果:
end
## 只執行了print('end')程式碼,其他程序的結果沒有看到,這是因為其他程序還沒有執行完成,主程序pool.close()就執行完了,close以後所有其他程序也不會在執行了。
## 要想其他程序執行完成後在關閉,必須使用pool.join()
from  multiprocessing import Process, Pool
import time,os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:', arg)

if __name__ == '__main__':
    pool = Pool(5)

    for i in range(10):
        pool.apply_async(func=Foo, args=(i,))

    print('end')
    pool.close()
    pool.join()

執行結果:

end
in process 13272
in process 14472
in process 3724
in process 9072
in process 15068
in process 13272
in process 14472
in process 3724
in process 9072
in process 15068

##從執行結果來看,5個 5個的被打印出來。

回撥

from  multiprocessing import Process, Pool
import time,os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print('-->exec done:', arg,os.getpid())

if __name__ == '__main__':
    pool = Pool(5)

    print ("主程序:",os.getpid())  #列印主程序id
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,),callback=Bar)
        ##callback叫做回撥,就是當執行完了func=Foo後,才會執行callback=Bar(每個程序執行完了後都會執行回撥)。
        ## 回撥可以用於當執行完程式碼後做一些後續操作,比如檢視完命令後,通過回撥進行備份;或者執行完什麼動作後,做個日誌等。
        ## 備份、寫日誌等在子程序中也可以執行,但是為什麼要用回撥呢! 這是因為如果用子程序,有10個子程序就得連線資料庫十次,而使用回撥的話是用主程序連線資料庫,所以只連線一次就可以了,這樣寫能大大提高執行效率。
        ##通過主程序建立資料庫的連線的話,因為在同一個程序中只能在資料庫建立一次連線,所以即使是多次被子程序回撥,也不會重複建立連線的,因為資料庫會限制同一個程序最大連線數,這都是有資料庫設定的。

    print('end')
    pool.close()
    pool.join()

執行結果:
主程序: 12776              #主程序是12766
end
in process 7496
-->exec done: 100 12776     #這裡可以看出回撥是通過主程序呼叫的
in process 3324
-->exec done: 101 12776
in process 16812
-->exec done: 102 12776
in process 10876
-->exec done: 103 12776
in process 8200
-->exec done: 104 12776
in process 7496
-->exec done: 105 12776
in process 3324
-->exec done: 106 12776
in process 16812
-->exec done: 107 12776
in process 10876
-->exec done: 108 12776
in process 8200
-->exec done: 109 12776