1. 程式人生 > >Python36 多線程、多進程的使用場景

Python36 多線程、多進程的使用場景

python

多線程與多進程的使用場景

io 操作不占用CPU(從硬盤、從網絡、從內存讀數據都算io)
計算占用CPU(如1+1計算)

python中的線程是假線程,不同線程之間的切換是需要耗費資源的,因為需要存儲線程的上下文,不斷的切換就會耗費資源。。

python多線程適合io操作密集型的任務(如socket server 網絡並發這一類的);
python多線程不適合cpu密集操作型的任務,主要使用cpu來計算,如大量的數學計算。
那麽如果有cpu密集型的任務怎麽辦,可以通過多進程來操作(不是多線程)。
假如CPU有8核,每核CPU都可以用1個進程,每個進程可以用1個線程來進行計算。
進程之間不需要使用gil鎖,因為進程是獨立的,不會共享數據。

進程可以起很多個,但是8核CPU同時只能對8個任務進行操作。

多進程

測試多進程

import multiprocessing
import time

def run(name):
    time.sleep(2)
    print (‘heelo‘,name)

if __name__ == ‘__main__‘:

    for i in range(10): #起了10個進程
        p = multiprocessing.Process(target=run,args=(‘bob%s‘ %i,))
        p.start()

執行結果:
heelo bob1
heelo bob0
heelo bob2
heelo bob3
heelo bob5
heelo bob4
heelo bob6
heelo bob7
heelo bob8
heelo bob9

##2秒左右就執行完成了,有幾核CPU,同時就可以處理幾個進程;當然要考慮你的電腦還開啟了N多個其他應用程序,不過CPU計算比較快。

import multiprocessing
import time,threading

def thread_run():
    print (threading.get_ident()) #get_ident獲取當前線程id

def run(name):
    time.sleep(2)
    print (‘heelo‘,name)
    t = threading.Thread(target=thread_run,)    #在每個進程中又起了1個線程
    t.start()

if __name__ == ‘__main__‘:

    for i in range(10):     #起了10個進程
        p = multiprocessing.Process(target=run,args=(‘bob%s‘ %i,))
        p.start()

執行結果:
heelo bob0
16684
heelo bob1
15052
heelo bob2
15260
heelo bob3
6192
heelo bob4
6748
heelo bob7
13980
heelo bob5
6628
heelo bob6
3904
heelo bob9
2328
heelo bob8
17072
from multiprocessing import Process
import os

def info(title):
    print(title)
    print(‘module name:‘, __name__)
    print(‘parent process:‘, os.getppid())  #獲取父進程的id
    print(‘process id:‘, os.getpid())   #獲取自身的id
    print("\n\n")

def f(name):
    info(‘\033[31;1mfunction f\033[0m‘)
    print(‘hello‘, name)

if __name__ == ‘__main__‘:
    info(‘\033[32;1mmain process line\033[0m‘)  ##直接調用函數
    # p = Process(target=f, args=(‘bob‘,))
    # p.start()
    # p.join()

執行結果:
main process line
module name: __main__
parent process: 1136    #父進程ID,這個父進程就是pycharm
process id: 16724   #這個子進程就是python的代碼程序
##每個進程都會有一個父進程。

from multiprocessing import Process
import os

def info(title):
    print(title)
    print(‘module name:‘, __name__)
    print(‘parent process:‘, os.getppid())  #獲取父進程的id
    print(‘process id:‘, os.getpid())   #獲取自身的id
    print("\n\n")

def f(name):
    info(‘\033[31;1mcalled from child process function f\033[0m‘)
    print(‘hello‘, name)

if __name__ == ‘__main__‘:
    info(‘\033[32;1mmain process line\033[0m‘)
    p = Process(target=f, args=(‘bob‘,))    #設置子進程
    p.start()   #啟動子進程
    # p.join()

執行結果:
main process line
module name: __main__
parent process: 1136    #主進程pycharm
process id: 14684       #子進程python代碼

called from child process function f
module name: __mp_main__
parent process: 14684   #主進程python代碼(1136的子進程)
process id: 15884       #python代碼(主進程14684)中的子進程的子15884
## 每個進程都有主進程(父進程)

hello bob

進程間通訊

默認進程之間數據是不共享的,如果一定要實現互訪可以通過Queue來實現,這個Queue和線程中的Queue使用方法一樣,不過線程中的Queue只能在線程之間使用。

線程

import queue
import threading

def f():
    q.put([42,None,‘heelo‘])

if __name__ == ‘__main__‘:
    q = queue.Queue()     
    p = threading.Thread(target=f,)

    p.start()

    print (q.get())
    p.join()

執行結果:
[42, None, ‘heelo‘]
## 通過子線程put進去數據,然後在主線程get出內容,表明線程之間數據是可以共享的。
進程

import queue
from multiprocessing import Process

def f():
    q.put([42,None,‘heelo‘])    #這裏的q屬於主進程

if __name__ == ‘__main__‘:
    q = queue.Queue()   #主進程起的q
    p = Process(target=f,)
    ## 在主進程中來定義子進程;如果在主進程中啟動了子進程,那麽主進程和子進程之間內存是獨立的。
    ## 因為內存獨立,子進程p是無法訪問主進程def f()中的q的。
    p.start()

    print (q.get())
    p.join()

執行結果:
Process Process-1:
Traceback (most recent call last):
  File "D:\python3.6.4\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "D:\python3.6.4\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "E:\python\代碼練習\A3.py", line 7, in f
    q.put([42,None,‘heelo‘])    
NameError: name ‘q‘ is not defined

##可以看到已經報錯,這是因為子進程不能訪問主進程的q

import queue
from multiprocessing import Process

def f(qq):
    qq.put([42,None,‘heelo‘])

if __name__ == ‘__main__‘:
    q = queue.Queue()
    p = Process(target=f,args=(q,)) #將父進程q傳給子進程

    p.start()

    print (q.get())
    p.join()

執行結果:

Traceback (most recent call last):
  File "E:/python/代碼練習/A3.py", line 13, in <module>
    p.start()
  File "D:\python3.6.4\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "D:\python3.6.4\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "D:\python3.6.4\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "D:\python3.6.4\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "D:\python3.6.4\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can‘t pickle _thread.lock objects

## 這是因為我們將線程的q傳給另一個進程,這是不可以的,線程只屬於當前進程,不能傳給其他進程。
## 如果想將q傳給子進程,那麽必須將進程q傳進去,而不是線程q。

from multiprocessing import Process,Queue
##大寫的Queue是進程隊列; queue是線程隊列
##大寫的Queue需要從multiprocessing導入

def f(qq):
    qq.put([42,None,‘heelo‘])

if __name__ == ‘__main__‘:
    q = Queue()
    p = Process(target=f,args=(q,)) #將父進程q傳給子進程

    p.start()

    print (q.get()) #父進程去get子進程的內容
    p.join()

執行結果:
[42, None, ‘heelo‘]
##父進程可以get子進程put進去的內容了;從表面上看感覺是兩個進程共享了數據,其實不然。

## 現在已經實現了進程間的通訊。父進程將q傳給子進程,其實是克隆了一份q給子進程,此時子進程就多了一個q進程隊列;  但是父進程又為什麽能夠get子進程put進去的數據呢,這是因為當前兩個進程在內存空間依然是獨立的,只不過子進程put的數據 通過pickle序列化放到內存中一個中間的位置,然後父進程從這個中間的位置取到數據(而不是從子進程中取的數據)。  所以進程間的通訊不是共享數據,而是一個數據的傳遞。
進程之間的數據還可以通過管道的方式來通訊

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, ‘hello from child1‘])  #發送數據給parent_conn
    conn.close()    #發完數據需要關閉

if __name__ == ‘__main__‘:
    parent_conn, child_conn = Pipe()
    ## 生成管道。 生成時會產生兩個返回對象,這兩個對象相當於兩端的電話,通過管道線路連接。
    ## 兩個對象分別交給兩個變量。
    p = Process(target=f, args=(child_conn,))   #child_conn需要傳給對端,用於send數據給parent_conn
    p.start()
    print(parent_conn.recv())  #parent_conn在這端,用於recv數據
    p.join()

執行結果:
[42, None, ‘hello from child1‘]

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, ‘hello from child1‘])
    conn.send([42, None, ‘hello from child2‘])  #發送兩次數據
    conn.close()

if __name__ == ‘__main__‘:
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())  
    p.join()

執行結果:
[42, None, ‘hello from child1‘]
## 可以看到這端只接收到了一次數據
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, ‘hello from child1‘])
    conn.send([42, None, ‘hello from child2‘])  
    conn.close()

if __name__ == ‘__main__‘:
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())   #第二次接收數據
    p.join()

執行結果:
[42, None, ‘hello from child1‘]
[42, None, ‘hello from child2‘]
##對端發送幾次,這端就需要接收幾次

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, ‘hello from child1‘])
    conn.send([42, None, ‘hello from child2‘])  #發送兩次數據
    conn.close()

if __name__ == ‘__main__‘:
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())
    print(parent_conn.recv())   #對端發送兩次,本段接收三次
    p.join()

執行結果:
[42, None, ‘hello from child1‘]
[42, None, ‘hello from child2‘]
## 程序卡主了,除非對端在發送一次數據。

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, ‘hello from child1‘])
    conn.send([42, None, ‘hello from child2‘])  #發送兩次數據
    print (conn.recv()) #接收數據
    conn.close()

if __name__ == ‘__main__‘:
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    print(parent_conn.recv())
    parent_conn.send("data from parent_conn")   #發送數據
    p.join()

執行結果:
[42, None, ‘hello from child1‘]
[42, None, ‘hello from child2‘]
data from parent_conn

##通過管道實現了相互發送接收數據(實現了數據傳遞)

進程間數據交互及共享

from multiprocessing import Process, Manager
import os

def f(d, l):
    d[1] = ‘1‘  #放入key和value到空字典中
    d[‘2‘] = 2
    d[0.25] = None

    l.append(os.getpid()) #將每個進程的id值放入列表中;每個進程的id值都不同。
    print(l)

if __name__ == ‘__main__‘:
    with Manager() as manager:  #做一個別名,此時manager就相當於Manager()
        d = manager.dict()  #生成一個可在多個進程之間傳遞和共享的字典

        l = manager.list(range(5))  #生成一個可在多個進程之間傳遞和共享的列表;通過range(5)給列表中生成5個數據
        p_list = []
        for i in range(10): #生成10個進程
            p = Process(target=f, args=(d, l))  #將字典和列表傳給每個進程,每個進程可以進行修改
            p.start()
            p_list.append(p)    # 將每個進程放入空列表中
        for res in p_list:
            res.join()

        print(d)    #所有進程都執行完畢後打印字典
        print(l)    #所有進程都執行完畢後打印列表

執行結果:
[0, 1, 2, 3, 4, 15788] 
#列表生成的時候自動加入了0-4這5個數字;然後每個進程都將各自的pid加入到列表。
[0, 1, 2, 3, 4, 15788, 1568]
[0, 1, 2, 3, 4, 15788, 1568, 7196]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412, 5368]
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412, 5368, 3092] #第10個進程打印的列表中有10個進程的pid
{1: ‘1‘, ‘2‘: 2, 0.25: None}    #最後打印的字典
[0, 1, 2, 3, 4, 15788, 1568, 7196, 6544, 9568, 16952, 15704, 14412, 5368, 3092] #最後打印的列表

from multiprocessing import Process, Manager
import os

def f(d, l):
    d[os.getpid()] = os.getpid()

    l.append(os.getpid())
    print(l)

if __name__ == ‘__main__‘:
    with Manager() as manager:
        d = manager.dict()  #對字典做個調整,也將pid加入到字典中

        l = manager.list(range(5))
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()

        print(d)
        print(l)

執行結果:
[0, 1, 2, 3, 4, 2240]
[0, 1, 2, 3, 4, 2240, 10152]
[0, 1, 2, 3, 4, 2240, 10152, 10408]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976]
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976, 16532]
{2240: 2240, 10152: 10152, 10408: 10408, 6312: 6312, 17156: 17156, 6184: 6184, 16168: 16168, 11384: 11384, 15976: 15976, 16532: 16532}
[0, 1, 2, 3, 4, 2240, 10152, 10408, 6312, 17156, 6184, 16168, 11384, 15976, 16532]

##現在我們看到可以實現進程間的數據共享、修改和傳遞。
##Manager()自帶鎖,會控制進程之間同一時間修改數據;
##字典和列表的數據不是一份,而是因為10個進程,所以有10個字典和10個列表。每個進程修改後,都會copy給其他進程,其他進程可以對最新的數據進行修改,所以數據不會被修改亂。

進程同步

在進程裏面也有鎖


from multiprocessing import Process, Lock   #從multiprocessing導入Lock這個鎖

def f(l, i):
    l.acquire()     #獲取修改數據的鎖
    print(‘hello world‘, i)
    l.release()     #釋放鎖

if __name__ == ‘__main__‘:
    lock = Lock()   #實例鎖

    for num in range(10):   #生成10個進程
        Process(target=f, args=(lock, num)).start() #執行子進程並傳入參數給子進程

執行結果:
hello world 1
hello world 4
hello world 0
hello world 3
hello world 2
hello world 5
hello world 6
hello world 8
hello world 7
hello world 9
## 可以看到一共10個進程,並不是連續的,說明執行進程的時候說不準先執行哪個進程。

##進程之間數據是獨立的,這裏我們為什麽又要加鎖呢,這是因為所有進程使用同一個屏幕來輸出數據;比如 我們現在輸出的數據是 hello world x,在輸出的過程中很有可能其中一個進程還沒輸出完(比如只輸出了hello wo),另一個進程就執行輸出了(可能會在屏幕上看到hello wohello world0201的現象)。  所以需要通過鎖來控制同一時間只能有一個進程輸出數據到屏幕。

進程池

執行多進程,子進程會從主進程復制一份完整數據,1個、10個進程可能還沒什麽感覺,但是如果有100或1000,甚至更多個進程的時候開銷就會特別大,就會明顯感覺到多進程執行有卡頓現象。

進程池可以設定同一時間有多少個進程可以在CPU上運行。


from  multiprocessing import Process, Pool
#從multiprocessing導入pool

import time,os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid()) #打印進程id
    return i + 100

def Bar(arg):
    print(‘-->exec done:‘, arg)

if __name__ == ‘__main__‘:  ##這行代碼用途是如果主動執行該代碼的.py文件,則該代碼下面的代碼可以被執行;如果該.py模塊被導入到其他模塊中,從其他模塊執行該.py模塊,則該行下面的代碼不會被執行。  有些時候可以用這種方式用於測試,在該行代碼下面寫一些測試代碼。。
    pool = Pool(5)  #同時只能放入5個進程

    for i in range(10): #創建10個進程,但是因為pool的限制,只有放入進程池中的5個進程才會被執行(),其他的被掛起了,如果進程池中其中有兩個進程執行完了,就會補進2個進程進去。
        # pool.apply_async(func=Foo, args=(i,), callback=Bar)
        pool.apply(func=Foo, args=(i,)) #pool.apply用來將進程放入pool

    print(‘end‘)    #執行完畢
    pool.close()    #允許pool中的進程關閉(close必須在join前面,可以理解close相當於一個開關吧)
    pool.join()  # 進程池中進程執行完畢後再關閉,如果註釋,那麽程序直接關閉。

執行結果:
in process 2240
in process 3828
in process 16396
in process 11848
in process 11636
in process 2240
in process 3828
in process 16396
in process 11848
in process 11636
end

##可以看到通過串行的方式將結果打印出來,這是因為我們使用的是pool.apply。 pool.apply就是通過串行的方式來執行。
from  multiprocessing import Process, Pool
import time,os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print(‘-->exec done:‘, arg)

if __name__ == ‘__main__‘:
    pool = Pool(5)

    for i in range(10):
        pool.apply_async(func=Foo, args=(i,))
        ## 使用pool.apply_async就可以並行了

    print(‘end‘)
    pool.close()
    # pool.join()   註釋掉

執行結果:
end
## 只執行了print(‘end‘)代碼,其他進程的結果沒有看到,這是因為其他進程還沒有執行完成,主進程pool.close()就執行完了,close以後所有其他進程也不會在執行了。
## 要想其他進程執行完成後在關閉,必須使用pool.join()
from  multiprocessing import Process, Pool
import time,os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print(‘-->exec done:‘, arg)

if __name__ == ‘__main__‘:
    pool = Pool(5)

    for i in range(10):
        pool.apply_async(func=Foo, args=(i,))

    print(‘end‘)
    pool.close()
    pool.join()

執行結果:

end
in process 13272
in process 14472
in process 3724
in process 9072
in process 15068
in process 13272
in process 14472
in process 3724
in process 9072
in process 15068

##從執行結果來看,5個 5個的被打印出來。

回調

from  multiprocessing import Process, Pool
import time,os

def Foo(i):
    time.sleep(2)
    print("in process",os.getpid())
    return i + 100

def Bar(arg):
    print(‘-->exec done:‘, arg,os.getpid())

if __name__ == ‘__main__‘:
    pool = Pool(5)

    print ("主進程:",os.getpid())  #打印主進程id
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,),callback=Bar)
        ##callback叫做回調,就是當執行完了func=Foo後,才會執行callback=Bar(每個進程執行完了後都會執行回調)。
        ## 回調可以用於當執行完代碼後做一些後續操作,比如查看完命令後,通過回調進行備份;或者執行完什麽動作後,做個日誌等。
        ## 備份、寫日誌等在子進程中也可以執行,但是為什麽要用回調呢! 這是因為如果用子進程,有10個子進程就得連接數據庫十次,而使用回調的話是用主進程連接數據庫,所以只連接一次就可以了,這樣寫能大大提高運行效率。
        ##通過主進程建立數據庫的連接的話,因為在同一個進程中只能在數據庫建立一次連接,所以即使是多次被子進程回調,也不會重復建立連接的,因為數據庫會限制同一個進程最大連接數,這都是有數據庫設置的。

    print(‘end‘)
    pool.close()
    pool.join()

執行結果:
主進程: 12776              #主進程是12766
end
in process 7496
-->exec done: 100 12776     #這裏可以看出回調是通過主進程調用的
in process 3324
-->exec done: 101 12776
in process 16812
-->exec done: 102 12776
in process 10876
-->exec done: 103 12776
in process 8200
-->exec done: 104 12776
in process 7496
-->exec done: 105 12776
in process 3324
-->exec done: 106 12776
in process 16812
-->exec done: 107 12776
in process 10876
-->exec done: 108 12776
in process 8200
-->exec done: 109 12776

Python36 多線程、多進程的使用場景