1. 程式人生 > >搞定python多線程和多進程

搞定python多線程和多進程

觸發 不一定 並不會 守護線程 執行 comm mes 權限 pipe

1.1 線程

1.1.1 什麽是線程

線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以並發多個線程,每條線程並行執行不同的任務。一個線程是一個execution context(執行上下文),即一個cpu執行時所需要的一串指令。

1.1.2 線程的工作方式

假設你正在讀一本書,沒有讀完,你想休息一下,但是你想在回來時恢復到當時讀的具體進度。有一個方法就是記下頁數、行數與字數這三個數值,這些數值就是execution context。如果你的室友在你休息的時候,使用相同的方法讀這本書。你和她只需要這三個數字記下來就可以在交替的時間共同閱讀這本書了。

線程的工作方式與此類似。CPU會給你一個在同一時間能夠做多個運算的幻覺,實際上它在每個運算上只花了極少的時間,本質上CPU同一時刻只幹了一件事。它能這樣做就是因為它有每個運算的execution context。就像你能夠和你朋友共享同一本書一樣,多任務也能共享同一塊CPU。

1.2 進程

一個程序的執行實例就是一個進程。每一個進程提供執行程序所需的所有資源。(進程本質上是資源的集合)

一個進程有一個虛擬的地址空間、可執行的代碼、操作系統的接口、安全的上下文(記錄啟動該進程的用戶和權限等等)、唯一的進程ID、環境變量、優先級類、最小和最大的工作空間(內存空間),還要有至少一個線程。

每一個進程啟動時都會最先產生一個線程,即主線程。然後主線程會再創建其他的子線程。

與進程相關的資源包括:

  • 內存頁(同一個進程中的所有線程共享同一個內存空間
  • 文件描述符(e.g. open sockets)
  • 安全憑證(e.g.啟動該進程的用戶ID)

1.3 進程與線程區別

1.同一個進程中的線程共享同一內存空間,但是進程之間是獨立的。
2.同一個進程中的所有線程的數據是共享的(進程通訊),進程之間的數據是獨立的。
3.對主線程的修改可能會影響其他線程的行為,但是父進程的修改(除了刪除以外)不會影響其他子進程。
4.線程是一個上下文的執行指令,而進程則是與運算相關的一簇資源。
5.同一個進程的線程之間可以直接通信,但是進程之間的交流需要借助中間代理來實現。
6.創建新的線程很容易,但是創建新的進程需要對父進程做一次復制。
7.一個線程可以操作同一進程的其他線程,但是進程只能操作其子進程。
8.線程啟動速度快,進程啟動速度慢(但是兩者運行速度沒有可比性)。

2 多線程

2.1 線程常用方法

方法註釋
start() 線程準備就緒,等待CPU調度
setName() 為線程設置名稱
getName() 獲取線程名稱
setDaemon(True) 設置為守護線程
join() 逐個執行每個線程,執行完畢後繼續往下執行
run() 線程被cpu調度後自動執行線程對象的run方法,如果想自定義線程類,直接重寫run方法就行了
2.1.1 Thread類

1.普通創建方式

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)
    print(‘2s‘)
    time.sleep(1)
    print(‘1s‘)
    time.sleep(1)
    print(‘0s‘)
    time.sleep(1)

t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",))
t1.start()
t2.start()

"""
task t1
task t2
2s
2s
1s
1s
0s
0s
"""

2.繼承threading.Thread來自定義線程類
其本質是重構Thread類中的run方法

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()  # 重構run函數必須要寫
        self.n = n

    def run(self):
        print("task", self.n)
        time.sleep(1)
        print(‘2s‘)
        time.sleep(1)
        print(‘1s‘)
        time.sleep(1)
        print(‘0s‘)
        time.sleep(1)


if __name__ == "__main__":
    t1 = MyThread("t1")
    t2 = MyThread("t2")

    t1.start()
    t2.start()
2.1.2 計算子線程執行的時間

註:sleep的時候是不會占用cpu的,在sleep的時候操作系統會把線程暫時掛起。

join()  #等此線程執行完後,再執行其他線程或主線程
threading.current_thread()      #輸出當前線程
import threading
import time

def run(n):
    print("task", n,threading.current_thread())    #輸出當前的線程
    time.sleep(1)
    print(‘3s‘)
    time.sleep(1)
    print(‘2s‘)
    time.sleep(1)
    print(‘1s‘)

strat_time = time.time()

t_obj = []   #定義列表用於存放子線程實例

for i in range(3):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.start()
    t_obj.append(t)
    
"""
由主線程生成的三個子線程
task t-0 <Thread(Thread-1, started 44828)>
task t-1 <Thread(Thread-2, started 42804)>
task t-2 <Thread(Thread-3, started 41384)>
"""

for tmp in t_obj:
    t.join()            #為每個子線程添加join之後,主線程就會等這些子線程執行完之後再執行。

print("cost:", time.time() - strat_time) #主線程

print(threading.current_thread())       #輸出當前線程
"""
<_MainThread(MainThread, started 43740)>
"""

2.1.3 統計當前活躍的線程數

由於主線程比子線程快很多,當主線程執行active_count()時,其他子線程都還沒執行完畢,因此利用主線程統計的活躍的線程數num = sub_num(子線程數量)+1(主線程本身)

import threading
import time

def run(n):
    print("task", n)    
    time.sleep(1)       #此時子線程停1s

for i in range(3):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.start()

time.sleep(0.5)     #主線程停0.5秒
print(threading.active_count()) #輸出當前活躍的線程數

"""
task t-0
task t-1
task t-2
4
"""

由於主線程比子線程慢很多,當主線程執行active_count()時,其他子線程都已經執行完畢,因此利用主線程統計的活躍的線程數num = 1(主線程本身)

import threading
import time


def run(n):
    print("task", n)
    time.sleep(0.5)       #此時子線程停0.5s


for i in range(3):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.start()

time.sleep(1)     #主線程停1秒
print(threading.active_count()) #輸出活躍的線程數
"""
task t-0
task t-1
task t-2
1
"""

此外我們還能發現在python內部默認會等待最後一個進程執行完後再執行exit(),或者說python內部在此時有一個隱藏的join()。

2.2 守護進程

我們看下面這個例子,這裏使用setDaemon(True)把所有的子線程都變成了主線程的守護線程,因此當主進程結束後,子線程也會隨之結束。所以當主線程結束後,整個程序就退出了。

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)       #此時子線程停1s
    print(‘3‘)
    time.sleep(1)
    print(‘2‘)
    time.sleep(1)
    print(‘1‘)

for i in range(3):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.setDaemon(True)   #把子進程設置為守護線程,必須在start()之前設置
    t.start()

time.sleep(0.5)     #主線程停0.5秒
print(threading.active_count()) #輸出活躍的線程數
"""
task t-0
task t-1
task t-2
4

Process finished with exit code 0
"""

2.3 GIL

在非python環境中,單核情況下,同時只能有一個任務執行。多核時可以支持多個線程同時執行。但是在python中,無論有多少核,同時只能執行一個線程。究其原因,這就是由於GIL的存在導致的。

GIL的全稱是Global Interpreter Lock(全局解釋器鎖),來源是python設計之初的考慮,為了數據安全所做的決定。某個線程想要執行,必須先拿到GIL,我們可以把GIL看作是“通行證”,並且在一個python進程中,GIL只有一個。拿不到通行證的線程,就不允許進入CPU執行。GIL只在cpython中才有,因為cpython調用的是c語言的原生線程,所以他不能直接操作cpu,只能利用GIL保證同一時間只能有一個線程拿到數據。而在pypy和jpython中是沒有GIL的。

Python多線程的工作過程:
python在使用多線程的時候,調用的是c語言的原生線程。

  1. 拿到公共數據
  2. 申請gil
  3. python解釋器調用os原生線程
  4. os操作cpu執行運算
  5. 當該線程執行時間到後,無論運算是否已經執行完,gil都被要求釋放
  6. 進而由其他進程重復上面的過程
  7. 等其他進程執行完後,又會切換到之前的線程(從他記錄的上下文繼續執行)
    整個過程是每個線程執行自己的運算,當執行時間到就進行切換(context switch)。
  • python針對不同類型的代碼執行效率也是不同的:

    1、CPU密集型代碼(各種循環處理、計算等等),在這種情況下,由於計算工作多,ticks計數很快就會達到閾值,然後觸發GIL的釋放與再競爭(多個線程來回切換當然是需要消耗資源的),所以python下的多線程對CPU密集型代碼並不友好。
    2、IO密集型代碼(文件處理、網絡爬蟲等涉及文件讀寫的操作),多線程能夠有效提升效率(單線程下有IO操作會進行IO等待,造成不必要的時間浪費,而開啟多線程能在線程A等待時,自動切換到線程B,可以不浪費CPU的資源,從而能提升程序執行效率)。所以python的多線程對IO密集型代碼比較友好。

  • 使用建議?

    python下想要充分利用多核CPU,就用多進程。因為每個進程有各自獨立的GIL,互不幹擾,這樣就可以真正意義上的並行執行,在python中,多進程的執行效率優於多線程(僅僅針對多核CPU而言)。

  • GIL在python中的版本差異:

    1、在python2.x裏,GIL的釋放邏輯是當前線程遇見IO操作或者ticks計數達到100時進行釋放。(ticks可以看作是python自身的一個計數器,專門做用於GIL,每次釋放後歸零,這個計數可以通過sys.setcheckinterval 來調整)。而每次釋放GIL鎖,線程進行鎖競爭、切換線程,會消耗資源。並且由於GIL鎖存在,python裏一個進程永遠只能同時執行一個線程(拿到GIL的線程才能執行),這就是為什麽在多核CPU上,python的多線程效率並不高。
    2、在python3.x中,GIL不使用ticks計數,改為使用計時器(執行時間達到閾值後,當前線程釋放GIL),這樣對CPU密集型程序更加友好,但依然沒有解決GIL導致的同一時間只能執行一個線程的問題,所以效率依然不盡如人意。

2.4 線程鎖

由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之後,當多個線程同時修改同一條數據時可能會出現臟數據,所以,出現了線程鎖,即同一時刻允許一個線程執行操作。線程鎖用於鎖定資源,你可以定義多個鎖, 像下面的代碼, 當你需要獨占某一資源時,任何一個鎖都可以鎖這個資源,就好比你用不同的鎖都可以把相同的一個門鎖住是一個道理。

由於線程之間是進行隨機調度,如果有多個線程同時操作一個對象,如果沒有很好地保護該對象,會造成程序結果的不可預期,我們也稱此為“線程不安全”。

#實測:在python2.7、mac os下,運行以下代碼可能會產生臟數據。但是在python3中就不一定會出現下面的問題。

import threading
import time

def run(n):
    global num
    num += 1

num = 0
t_obj = [] 

for i in range(20000):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.start()
    t_obj.append(t)

for t in t_obj:
    t.join()

print "num:", num
"""
產生臟數據後的運行結果:
num: 19999
"""

2.5 互斥鎖(mutex)

為了方式上面情況的發生,就出現了互斥鎖(Lock)

import threading
import time


def run(n):
    lock.acquire()  #獲取鎖
    global num
    num += 1
    lock.release()  #釋放鎖

lock = threading.Lock()     #實例化一個鎖對象

num = 0
t_obj = []  

for i in range(20000):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.start()
    t_obj.append(t)

for t in t_obj:
    t.join()

print "num:", num

2.6 遞歸鎖

RLcok類的用法和Lock類一模一樣,但它支持嵌套,,在多個鎖沒有釋放的時候一般會使用使用RLcok類。

import threading
import time
   
gl_num = 0
   
lock = threading.RLock()
   
def Func():
    lock.acquire()
    global gl_num
    gl_num +=1
    time.sleep(1)
    print gl_num
    lock.release()
       
for i in range(10):
    t = threading.Thread(target=Func)
    t.start()

2.7 信號量(BoundedSemaphore類)

互斥鎖同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,後面的人只能等裏面有人出來了才能再進去。

import threading
import time


def run(n):
    semaphore.acquire()   #加鎖
    time.sleep(1)
    print("run the thread:%s\n" % n)
    semaphore.release()     #釋放


num = 0
semaphore = threading.BoundedSemaphore(5)  # 最多允許5個線程同時運行

for i in range(22):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.start()

while threading.active_count() != 1:
    pass  # print threading.active_count()
else:
    print(‘-----all threads done-----‘)

2.8 事件(Event類)

python線程的事件用於主線程控制其他線程的執行,事件是一個簡單的線程同步對象,其主要提供以下幾個方法:

方法註釋
clear 將flag設置為“False”
set 將flag設置為“True”
is_set 判斷是否設置了flag
wait 會一直監聽flag,如果沒有檢測到flag就一直處於阻塞狀態

事件處理的機制:全局定義了一個“Flag”,當flag值為“False”,那麽event.wait()就會阻塞,當flag值為“True”,那麽event.wait()便不再阻塞。

#利用Event類模擬紅綠燈
import threading
import time

event = threading.Event()


def lighter():
    count = 0
    event.set()     #初始值為綠燈
    while True:
        if 5 < count <=10 :
            event.clear()  # 紅燈,清除標誌位
            print("\33[41;1mred light is on...\033[0m")
        elif count > 10:
            event.set()  # 綠燈,設置標誌位
            count = 0
        else:
            print("\33[42;1mgreen light is on...\033[0m")

        time.sleep(1)
        count += 1

def car(name):
    while True:
        if event.is_set():      #判斷是否設置了標誌位
            print("[%s] running..."%name)
            time.sleep(1)
        else:
            print("[%s] sees red light,waiting..."%name)
            event.wait()
            print("[%s] green light is on,start going..."%name)

light = threading.Thread(target=lighter,)
light.start()

car = threading.Thread(target=car,args=("MINI",))
car.start()

2.9 條件(Condition類)

使得線程等待,只有滿足某條件時,才釋放n個線程

2.10 定時器(Timer類)

定時器,指定n秒後執行某操作

from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

3 多進程

在linux中,每個進程都是由父進程提供的。每啟動一個子進程就從父進程克隆一份數據,但是進程之間的數據本身是不能共享的。

from multiprocessing import Process
import time
def f(name):
    time.sleep(2)
    print(‘hello‘, name)
 
if __name__ == ‘__main__‘:
    p = Process(target=f, args=(‘bob‘,))
    p.start()
    p.join()
from multiprocessing import Process
import os
 
def info(title):
    print(title)
    print(‘module name:‘, __name__)
    print(‘parent process:‘, os.getppid())  #獲取父進程id
    print(‘process id:‘, os.getpid())   #獲取自己的進程id
    print("\n\n")
 
def f(name):
    info(‘\033[31;1mfunction f\033[0m‘)
    print(‘hello‘, name)
 
if __name__ == ‘__main__‘:
    info(‘\033[32;1mmain process line\033[0m‘)
    p = Process(target=f, args=(‘bob‘,))
    p.start()
    p.join()

3.1 進程間通信

由於進程之間數據是不共享的,所以不會出現多線程GIL帶來的問題。多進程之間的通信通過Queue()或Pipe()來實現

3.1.1 Queue()

使用方法跟threading裏的queue差不多

from multiprocessing import Process, Queue
 
def f(q):
    q.put([42, None, ‘hello‘])
 
if __name__ == ‘__main__‘:
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, ‘hello‘]"
    p.join()
3.1.2 Pipe()

Pipe的本質是進程之間的數據傳遞,而不是數據共享,這和socket有點像。pipe()返回兩個連接對象分別表示管道的兩端,每端都有send()和recv()方法。如果兩個進程試圖在同一時間的同一端進行讀取和寫入那麽,這可能會損壞管道中的數據。

from multiprocessing import Process, Pipe
 
def f(conn):
    conn.send([42, None, ‘hello‘])
    conn.close()
 
if __name__ == ‘__main__‘:
    parent_conn, child_conn = Pipe() 
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, ‘hello‘]"
    p.join()

3.2 Manager

通過Manager可實現進程間數據的共享。Manager()返回的manager對象會通過一個服務進程,來使其他進程通過代理的方式操作python對象。manager對象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array.

from multiprocessing import Process, Manager
 
def f(d, l):
    d[1] = ‘1‘
    d[‘2‘] = 2
    d[0.25] = None
    l.append(1)
    print(l)
 
if __name__ == ‘__main__‘:
    with Manager() as manager:
        d = manager.dict()
 
        l = manager.list(range(5))
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
 
        print(d)
        print(l)

3.3 進程鎖(進程同步)

數據輸出的時候保證不同進程的輸出內容在同一塊屏幕正常顯示,防止數據亂序的情況。
Without using the lock output from the different processes is liable to get all mixed up.

from multiprocessing import Process, Lock
 
def f(l, i):
    l.acquire()
    try:
        print(‘hello world‘, i)
    finally:
        l.release()
 
if __name__ == ‘__main__‘:
    lock = Lock()
 
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

3.4 進程池

由於進程啟動的開銷比較大,使用多進程的時候會導致大量內存空間被消耗。為了防止這種情況發生可以使用進程池,(由於啟動線程的開銷比較小,所以不需要線程池這種概念,多線程只會頻繁得切換cpu導致系統變慢,並不會占用過多的內存空間)

進程池中常用方法:
apply() 同步執行(串行)
apply_async() 異步執行(並行)
terminate() 立刻關閉進程池
join() 主進程等待所有子進程執行完畢。必須在close或terminate()之後。
close() 等待所有進程結束後,才關閉進程池。

from  multiprocessing import Process,Pool
import time
 
def Foo(i):
    time.sleep(2)
    return i+100
 
def Bar(arg):
    print(‘-->exec done:‘,arg)
 
pool = Pool(5)  #允許進程池同時放入5個進程
 
for i in range(10):
    pool.apply_async(func=Foo, args=(i,),callback=Bar)  #func子進程執行完後,才會執行callback,否則callback不執行(而且callback是由父進程來執行了)
    #pool.apply(func=Foo, args=(i,))
 
print(‘end‘)
pool.close()
pool.join() #主進程等待所有子進程執行完畢。必須在close()或terminate()之後。

進程池內部維護一個進程序列,當使用時,去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那麽程序就會等待,直到進程池中有可用進程為止。在上面的程序中產生了10個進程,但是只能有5同時被放入進程池,剩下的都被暫時掛起,並不占用內存空間,等前面的五個進程執行完後,再執行剩下5個進程。

4 補充:協程

線程和進程的操作是由程序觸發系統接口,最後的執行者是系統,它本質上是操作系統提供的功能。而協程的操作則是程序員指定的,在python中通過yield,人為的實現並發處理。

協程存在的意義:對於多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時。協程,則只使用一個線程,分解一個線程成為多個“微線程”,在一個線程中規定某個代碼塊的執行順序。

協程的適用場景:當程序中存在大量不需要CPU的操作時(IO)。
常用第三方模塊gevent和greenlet。(本質上,gevent是對greenlet的高級封裝,因此一般用它就行,這是一個相當高效的模塊。)

4.1 greenlet

from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

實際上,greenlet就是通過switch方法在不同的任務之間進行切換。

4.2 gevent

from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
    print(‘GET: %s‘ % url)
    resp = requests.get(url)
    data = resp.text
    print(‘%d bytes received from %s.‘ % (len(data), url))

gevent.joinall([
        gevent.spawn(f, ‘https://www.python.org/‘),
        gevent.spawn(f, ‘https://www.yahoo.com/‘),
        gevent.spawn(f, ‘https://github.com/‘),
])

通過joinall將任務f和它的參數進行統一調度,實現單線程中的協程。代碼封裝層次很高,實際使用只需要了解它的幾個主要方法即可。

搞定python多線程和多進程