Python學習記錄-多進程和多線程
[TOC]
1. 進程和線程
進程
狹義定義:進程是正在運行的程序的實例(an instance of a computer program that is being executed)。
廣義定義:進程是一個具有一定獨立功能的程序關於某個數據集合的一次運行活動。它是操作系統動態執行的基本單元,在傳統的操作系統中,進程既是基本的分配單元,也是基本的執行單元。
線程
線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以並發多個線程,每條線程並行執行不同的任務。
線程與進程比較
線程與進程的區別:
1)地址空間和其它資源(如打開文件):進程間相互獨立,同一進程的各線程間共享。某進程內的線程在其它進程不可見。
2)通信:進程間通信IPC,線程間可以直接讀寫進程數據段(如全局變量)來進行通信——需要進程同步和互斥手段的輔助,以保證數據的一致性。
3)創建:創建新線程很簡單,創建新進程需要對父進程進行一次克隆。
4)調度和切換:一個線程可以控制和操作同一進程裏的其它線程,但是進程只能操作子進程;線程上下文切換比進程上下文切換要快得多。
5)在多線程OS中,進程不是一個可執行的實體。
<font color="red">註意:</font>
線程和進程快慢無法對比,因為線程被包含在進程中。
2. threading模塊
線程有2種調用方式,如下:
直接調用
import threading import time def sayhi(num): # 定義每個線程要運行的函數 print("running on number:%s" % num) time.sleep(3) if __name__ == ‘__main__‘: t1 = threading.Thread(target=sayhi, args=(1,)) # 生成一個線程實例 t2 = threading.Thread(target=sayhi, args=(2,)) # 生成另一個線程實例 t1.start() # 啟動線程 t2.start() # 啟動另一個線程 print(t1.getName()) # 獲取線程名 print(t2.getName())
繼承式調用
import threading
import time
class MyThread(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num
def run(self): # 定義每個線程要運行的函數
print("running on number:%s" % self.num)
time.sleep(3)
if __name__ == ‘__main__‘:
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()
平常用到的主要方法:
start
線程準備就緒,等待CPU調度setName
為線程設置名稱getName
獲取線程名稱setDaemon
設置為後臺線程或前臺線程(默認)
如果是後臺線程,主線程執行過程中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均停止
如果是前臺線程,主線程執行過程中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序停止join
逐個執行每個線程,執行完畢後繼續往下執行,該方法使得多線程變得無意義run
線程被cpu調度後自動執行線程對象的run方法
2.1 Join & Daemon
import time
import threading
def run(n):
print(‘[%s]------running----\n‘ % n)
time.sleep(2)
print(‘--done--‘)
def main():
for i in range(5):
t = threading.Thread(target=run, args=[i, ])
t.start()
t.join(1)
print(‘starting thread‘, t.getName())
m = threading.Thread(target=main, args=[])
m.setDaemon(True) # 將main線程設置為Daemon線程,它做為程序主線程的守護線程,當主線程退出時,m線程也會退出,由m啟動的其它子線程會同時退出,不管是否執行完任務
m.start()
m.join(timeout=2)
print("---main thread done----")
import time
import threading
def addNum():
global num # 在每個線程中都獲取這個全局變量
print(‘--get num:‘, num)
time.sleep(1)
num -= 1 # 對此公共變量進行-1操作
num = 100 # 設定一個共享變量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: # 等待所有線程執行完畢
t.join()
print(‘final num:‘, num)
2.2 線程鎖(互斥鎖Mutex)
一個進程下可以啟動多個線程,多個線程共享父進程的內存空間,也就意味著每個線程可以訪問同一份數據,由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之後,當多個線程同時修改同一條數據時可能會出現臟數據,所以,出現了線程鎖 - 同一時刻允許一個線程執行操作。
import time
import threading
def addNum():
global num # 在每個線程中都獲取這個全局變量
print(‘--get num:‘, num)
time.sleep(1)
num -= 1 # 對此公共變量進行-1操作
num = 100 # 設定一個共享變量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: # 等待所有線程執行完畢
t.join()
print(‘final num:‘, num)
因為python2.7以上版本,已經自動添加線程鎖,所以我們不用細糾。
import time
import threading
def addNum():
global num # 在每個線程中都獲取這個全局變量
print(‘--get num:‘, num)
time.sleep(1)
lock.acquire() # 修改數據前加鎖
num -= 1 # 對此公共變量進行-1操作
lock.release() # 修改後釋放
num = 100 # 設定一個共享變量
thread_list = []
lock = threading.Lock() # 生成全局鎖
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: # 等待所有線程執行完畢
t.join()
print(‘final num:‘, num)
2.3 信號量(Semaphore)
互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,後面的人只能等裏面有人出來了才能再進去。
import threading, time
def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread: %s" % n)
semaphore.release()
if __name__ == ‘__main__‘:
num = 0
semaphore = threading.BoundedSemaphore(5) # 最多允許5個線程同時運行
for i in range(20):
t = threading.Thread(target=run, args=(i,))
t.start()
2.4 事件(event)
python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set
、wait
、clear
。
事件處理的機制:全局定義了一個“Flag”
,如果“Flag”
值為 False
,那麽當程序執行 event.wait
方法時就會阻塞,如果“Flag”
值為True
,那麽event.wait
方法時便不再阻塞。
clear
:將“Flag”
設置為False
set
:將“Flag”
設置為True
import threading
def do(event):
print(‘start‘)
event.wait()
print(‘execute‘)
event_obj = threading.Event()
for i in range(10):
t = threading.Thread(target=do, args=(event_obj,))
t.start()
event_obj.clear()
inp = input(‘input:‘)
if inp == ‘true‘:
event_obj.set()
2.5 條件(Condition)
使得線程等待,只有滿足某條件時,才釋放n個線程。
未使用條件時:
import threading
def run(n):
con.acquire()
con.wait()
print("run the thread: %s" %n)
con.release()
if __name__ == ‘__main__‘:
con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
while True:
inp = input(‘>>>‘)
if inp == ‘q‘:
break
con.acquire()
con.notify(int(inp))
con.release()
使用條件時:
def condition_func():
ret = False
inp = input(‘>>>‘)
if inp == ‘1‘:
ret = True
return ret
def run(n):
con.acquire()
con.wait_for(condition_func)
print("run the thread: %s" %n)
con.release()
if __name__ == ‘__main__‘:
con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
2.6 定時器(Timer)
定時器,指定n秒後執行某操作
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello)
t.start() # after 1 seconds, "hello, world" will be printed
3. 進程
from multiprocessing import Process
import threading
import time
def foo(i):
time.sleep(1)
print(‘say hi‘,i)
for i in range(10):
p = Process(target=foo,args=(i,))
p.start()
<font color="red">註意</font>:
由於進程之間的數據需要各自持有一份,所以創建進程需要的非常大的開銷。
3.1 進程數據共享
進程各自持有一份數據,默認無法共享數據。
from multiprocessing import Process
import time
li = []
def foo(i):
li.append(i)
print(‘say hi‘,li)
for i in range(10):
p = Process(target=foo,args=(i,))
p.start()
print (‘ending‘,li)
結果類似這樣(每次的結果可能排序會有變化)。
say hi [2]
say hi [3]
say hi [5]
say hi [0]
say hi [1]
say hi [4]
say hi [6]
say hi [7]
say hi [8]
ending []
say hi [9]
想要進程間共享數據
方法一:使用Array
from multiprocessing import Process,Array
temp = Array(‘i‘, [11,22,33,44])
def Foo(i):
temp[i] = 100+i
for item in temp:
print(i,‘----->‘,item)
for i in range(2):
p = Process(target=Foo,args=(i,))
p.start()
方法二:manage.dict()共享數據
from multiprocessing import Process,Manager
manage = Manager()
dic = manage.dict()
def Foo(i):
dic[i] = 100+i
print dic.values()
for i in range(2):
p = Process(target=Foo,args=(i,))
p.start()
p.join()
3.2 進程池
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那麽程序就會等待,直到進程池中有可用進程為止。
進程池中有兩個方法:
- apply
- apply_async
from multiprocessing import Process,Pool
import time
def Foo(i):
time.sleep(2)
return i+100
def Bar(arg):
print(arg)
pool = Pool(5)
print(pool.apply(Foo,(1,)))
print(pool.apply_async(func =Foo, args=(1,)).get())
for i in range(10):
pool.apply_async(func=Foo, args=(i,),callback=Bar)
print(‘end‘)
pool.close()
pool.join() #進程池中進程執行完畢後再關閉,如果註釋,那麽程序直接關閉。
4. 協程
線程和進程的操作是由程序觸發系統接口,最後的執行者是系統;協程的操作則是程序員。
協程存在的意義:對於多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。
協程的適用場景:當程序中存在大量不需要CPU的操作時(IO),適用於協程;
4.1 greenlet
from greenlet import greenlet
def test1():
print(12)
gr2.switch()
print(34)
gr2.switch()
def test2():
print(56)
gr1.switch()
print(78)
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
運行結果:
12
56
34
78
4.2 gevent
Python通過yield提供了對協程的基本支持,但是不完全。而第三方的gevent為Python提供了比較完善的協程支持。
gevent是第三方庫,通過greenlet實現協程,其基本思想是:
當一個greenlet遇到IO操作時,比如訪問網絡,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由於IO操作非常耗時,經常使程序處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO。
import gevent
def foo():
print(‘Running in foo‘)
gevent.sleep(0)
print(‘Explicit context switch to foo again‘)
def bar():
print(‘Explicit context to bar‘)
gevent.sleep(0)
print(‘Implicit context switch back to bar‘)
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
運行結果:
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
遇到IO操作自動切換:
from gevent import monkey; monkey.patch_all()
import gevent
import urllib.request
def f(url):
print(‘GET: %s‘ % url)
resp = urllib.request.urlopen(url)
data = resp.read()
print(‘%d bytes received from %s.‘ % (len(data), url))
gevent.joinall([
gevent.spawn(f, ‘https://www.python.org/‘),
gevent.spawn(f, ‘https://www.baidu.com/‘),
gevent.spawn(f, ‘https://www.so.com/‘),
])
Python學習記錄-多進程和多線程