多線程 及 分布式進程間的通信
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
#-*- coding:utf-8 -*-
#多線程
#多任務可以由多進程完成,也可以由一個進程內的多線程完成。
#進程是若幹線程組成,一個進程至少有一個線程
#由於線程是操作系統直接支持的執行單元,因此,高級語言通常都內置多線程的支持,python 的線程是真正的posix thread,而不是模擬出來的線程。
#python 的標準庫提供了兩個模塊:_thread 和threading,_thread 是低級模塊,threading 是高級模塊,對_thread進行了封裝。通常,只需使用threading這個模塊。
#啟動一個線程就是把一個函數傳入並創建Thread 實例,然後調用start()開始執行:
import time,threading
#新線程執行的代碼:
def loop():
print(‘thread %s is running...‘ % threading.current_thread().name)
n=0
while n < 5:
n=n+1
print(‘thread %s >>>%s‘ % (threading.current_thread().name,n))
time.sleep(1)
print(‘thread %s ended‘ % threading.current_thread().name)
print(‘thread %s is running...‘ % threading.current_thread().name)
t=threading.Thread(target=loop,name=‘LoopThread‘)
t.start()
t.join()
print(‘thread %s ended ‘ % threading.current_thread().name)
‘‘‘
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>>1
thread LoopThread >>>2
thread LoopThread >>>3
thread LoopThread >>>4
thread LoopThread >>>5
thread LoopThread ended
thread MainThread ended
‘‘‘
#由於任何進程默認就會啟動一個線程,我們把該線程稱為主線程,主線程又可以啟動新的線程,Python的threading模塊有個current_thread()函數,它永遠返回
#當前線程的實例。主線程實例的名字叫MainThread,子線程的名字在創建時指定,我們用LoopThread命名子線程。名字僅僅在打印時用來顯示,完全沒有其他意義,
#如果不起名字Python就自動給線程命名為Thread-1,Thread-2...
#Lock
#多線程和多進程最大的不同在於,多進程中,同一個變量,各自有一份拷貝存在於每個進程中,互不影響,而多線程中,所有變量都由所有線程共享,所以,任何
#一個變量都可以被任何一個線程修改,因此,線程之間共享數據最大的危險在於多個線程同時改一個變量,把內容給改亂了。
#來看看多個線程同時操作一個變量怎麽把內容給改亂了:
import time,threading
balance=0
def change_it(n):
global balance
balance=balance + n
balancd=balance - n
def run_thread(n):
for i in range(100000):
change_it(n)
t1=threading.Thread(target=run_thread,args=(5,))
t2=threading.Thread(target=run_thread,args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
#我們定義了一個共享變量balance,初始值為0,並且啟動兩個線程,先加後減,理論上結果應該為0,但是,由於線程的調度是由操作系統決定的,當t1/t2交替
#執行時,只要循環次數足夠多 ,balance的結果就不一定是0了。
#原因是因為高級語言的一條語句在CPU執行時是若幹條語句,即使一個簡單的計算:
#balance=balance+n
#也分兩步:
#1.計算balance+n,存入臨時變量中;
#2.將臨時變量的值賦給balance。
#也可以看成是
#x=balance +n
#balance = x
#由於x是局部變量,兩個線程各自都有自己的x,當代碼正常執行時:
#t1和t2是交替運行的,如果操作系統以下面的順序執行t1/t2:
#初始值 balance=0
‘‘‘
t1: x1=balance + 5
t2: x2=balance+8
t2:balance=x2
t1:balance=x1
t1:x1=balance-5
t1:balance=x1
t2:x2=balance-8
t2:balance=x2
結果 balance = -8
‘‘‘
#究其原因,是因為修改balance需要多條語句,而執行這幾條語句時,線程可能中斷,從而導致多個線程把同一個對象的內容改亂了。
#如果我們要確保balance計算正確,就要給change_it()上一把鎖,當某個線程開始執行change_it()時,我們說,該線程因為獲得了鎖,因此其他線程不能同時執行
#change_it(),只能等待,直到鎖被釋放後,獲得該鎖以後才能改。由於鎖只有一個,無論多少線程,同一時刻最多只有一個線程持有該鎖,所以,不會造成修改的
#沖突。創建一個鎖就是通過threading.Lock()來實現:
balance=0
lock=threading.Lock()
def run_thread(n):
for i in range(10000):
#先要獲取鎖:
lock.acquire()
try:
#放心的改吧:
change_it(n)
finally:
#改完後要釋放鎖
lock.release()
#當多個線程同時執行lock.acquire()時,只有一個線程能成功地獲取鎖,然後繼續執行代碼,其他線程就繼續執行代碼,其他線程就繼續等待直到獲得鎖為止。
#獲得鎖的線程用完後一定要釋放鎖,否則那些苦苦等待鎖的線程將永遠等待下去,成為死線程。所以我們用try...finally來確保鎖一定會被釋放。
#鎖的好處就是確保了某段關鍵代碼只能由一個線程從頭到尾完整地執行,壞處當然也很多,首先是阻止了多線程並發執行,包含鎖的某段代碼實際上只能以單線程
#模式執行,效率就大大下降了。其次,由於可以存在多個鎖,不同的線程持有不同的鎖,並在試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個線程全部掛起,
#既不能執行,也無法結束,只能靠操作系統強制終止。
#多核CPU
#一個死循環線程會100%占用一個CPU。如果有兩個死循環,在多核CPU中,可以監控到會占用200%的CPU,也就是占用兩個CPU核心。
#要想把N核CPU的核心全部跑滿,就必須啟動N個死循環線程。
#Python程序啟動與CPU核心數量相同的N個線程,發現,在4核CPU上可以監控到CPU占用率僅有102%,也就是僅使用了一核。
#但是用C/C++/JAVA 來寫相同的死循環,直接可以把全部核心跑滿,4核就跑400%,8核就跑到800%,為什麽Python不行呢?
#因為Python的線程雖然是真正的線程,但解釋器執行代碼時,有一個GIL鎖:Global Interpreter Lock ,任何Python線程執行前,必須先獲得GIL鎖,然後,每執行100
#條字節碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執行。這個GIL全局鎖實際上把所有線程的執行代碼都給上了鎖,所以,多線程在Python中只能交替執行,即使100
#個線程跑在100核CPU上,也只能用到1個核。
#Python雖然不能利用多線程實現多核任務,但可以通過多進程實現多核任務。多個Python進程有各自獨立的GIL鎖,互不影響。
#多線程編程,模型復雜,容易發生沖突,必須用鎖加以隔離,同時,又要小心死鎖的發生。
#Python 解釋器由於設計時有GIL全局鎖,導致了多線程無法利用多核。多線程的並發在Python中就是一個美麗的夢。
#ThreadLocal
#在多線程環境下,每個線程都有自己的數據。一個線程使用自己的局部變量比使用全局變量好,因為局部變量只有線程自己能看見,不會影響其他線程,而全局變量的
#修改必須加鎖。
#但是局部變量也有問題,就是在函數調用的時候,傳遞起來很麻煩。
#ThreadLocal對象可以解決這個問題
import threading
#創建全局ThreadLocal對象:
local_school=threading.local()
def process_student():
#獲取當前線程關聯的student:
std=local_school.student
print(‘Hello,%s (in %s)‘ %(std,threading.current_thread().name))
def process_thread(name):
#綁定threadLocal的student:
local_school.student=name
process_student()
t1=threading.Thread(target=process_thread,args=(‘Alice‘,),name=‘Thread-A‘)
t2=threading.Thread(target=process_thread,args=(‘Bob‘,),name=‘Thread-B‘)
t1.start()
t2.start()
t1.join()
t2.join()
‘‘‘
Hello,Alice (in Thread-A)
Hello,Bob (in Thread-B)
‘‘‘
#全局變量local_school 就是一個ThreadLocal對象,每個Thread對它都可以讀寫student屬性,但互不影響。你可以把local_school看成全局變量,但每個屬性如
#local_school.student都是線程的局部變量,可以任意讀寫而互不幹擾,也不用管理鎖的問題,ThreadLocal內部會處理。
#可以理解為全局變量local_school是一個dict,不但可以用local_school.student,還可以綁定其他變量,如local_school.teacher等等
#ThreadLocal最常用的地方就是為每個線程綁定一個數據庫連接,HTTP請求,用戶身份信息等,這樣一個線程的所有調用到的處理函數都可以非常方便地訪問這些資源。
#一個ThreadLocal變量雖然是全局變量,但每個線程都只能讀寫自己線程的獨立副本,互不幹擾。ThreadLocal解決了參數在一個線程中各個函數之間互相傳遞的問題。
#分布式進程
#在Thread和Process中,應當優選Process,因為Process更穩定,而且,Process可以分布到多臺機器上,而Thread最多只能分布到同一臺機器的多個CPU上。
#Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調度者,將任務分布到其他多個
#進程中,依靠網絡通信。由於managers模塊封裝很好,不必了解網絡通信的細節,就可以很容易地編寫分布式多進程程序。
#舉個例子:如果我們已經有一個通過Queue通信的多進程程序在同一臺機器上運行,現在,由於處理任務的進程任務繁重,希望把發送任務的進程和處理任務的進程
#分布到兩臺機器上。怎麽用分布式進程實現?
#原有的Queue可以繼續使用,但是,通過managers 模塊把Queue通過網絡暴露出去,就可以讓其他機器的進程訪問Queue了.
#我們先看服務進程,服務進程負責啟動Queue,把Queue註冊到網絡上,然後往Queue裏面寫入任務:
#task_master.py
import random,time,queue
from multiprocessing.managers import BaseManager
#發送任務的隊列:
task_queue=queue.Queue()
#接收結果的隊列:
result_queue=queue.Queue()
#從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
pass
#把兩個Queue都註冊到網絡上,callable參數關聯了Queue 對象:
QueueManager.register(‘get_task_queue‘,callable=lambda:task_queue)
QueueManager.register(‘get_result_queue‘,callable=lambda: result_queue)
#鎖定端口5000,設置驗證碼‘abc’:
manager=QueueManager(address=(‘‘,5000),authkey=b‘abc‘)
#啟動Queue:
manager.start()
#獲得通過網絡訪問的Queue對象:
task=manager.get_task_queue()
result=manager.get_result_queue()
#放幾個任務進去:
for i in range(10):
n=random.randint(0,10000)
print(‘Put task %d...‘ %n)
task.put(n)
#從result隊列讀取結果:
print(‘Try get results...‘)
for i in range(10):
r=result.get(timeout=10)
print(‘Result:%s‘%r)
#關閉:
manager.shutdown()
print(‘master exit.‘)
#請註意,當我們在一臺機器上寫多進程程序時,創建的Queue可以直接拿來用,但是,在分布式多進程環境下,添加任務到Queue不可以直接對原始的task_queue
#進行操作,那樣就繞過了QueueManager的封裝,必須通過manager.get_task_queue()獲得的Queue接口添加。
#然後,在另一臺機器上啟動任務進程(本機上啟動也可以):
#task_worker.py
import time,sys,queue
from multiprocessing.managers import BaseManager
#創建類似的QueueManager:
class QueueManager(BaseManager):
pass
#由於這個QueueManager只是從網絡上獲取Queue,所以註冊時只提供名字:
QueueManager.register(‘get_task_queue‘)
QueueManager.register(‘get_result_queue‘)
#連接到服務器,也就是運行task_master.py 的機器:
server_addr=‘127.0.0.1‘
print(‘Connect to server %s ...‘ % server_addr)
#端口和驗證碼主要保持與task_master.py 設置的完全一致:
m=QueueManager(address=(server_addr,5000),authkey=b‘abc‘)
#從網絡連接:
m.connect()
#獲取Queue的對象:
task=m.get_task_queue()
result=m.get_result_queue()
#從task隊列取任務,並把結果寫入result隊列:
for i in range(10):
try:
n=task.get(timeout=1)
print(‘run task %d * %d ...‘ % (n,n))
r=‘%d*%d=%d‘ %(n,n,n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print(‘task queue is empty.‘)
#處理結束:
print(‘worker exit‘)
#任務進程要通過網絡連接到服務進程,所以要指定服務進程的IP。
#現在,可以試試分布式進程的工作效果,先啟動task_master.py服務進程:
#運行結果省略(因為沒有服務器IP...)
#task_master.py 進程發送完任務後,開始等待result隊列的結果。現在啟動task_worker.py 進程:
#運行結果省略(因為沒有服務器IP...)
#task_worker.py 進程結束,在task_master.py 進程中會繼續打印出結果:
#運行結果省略(因為沒有服務器IP...)
#這個簡單的Master/Worker模型有什麽用?其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個worker,就可以把任務分布到幾臺甚至十幾臺機器上,比如
#把計算n*n的代碼換成發送郵件,就實現了郵件隊列的異步發送。
#Queue對象存儲在哪?註意到task_worker.py中根本就沒有創建Queue的代碼,所以,Queue對象存儲在task_master.py 進程中 :
#而Queue之所以能通過網絡訪問,就是通過QueueManager實現的。由於QueueManager管理的不止一個Queue,所以,要給每個Queue的網絡調用接口起個名字,比如
#get_task_queue.
#authkey有什麽用?這是為了保證兩臺機器正常通信,不被其他機器惡意幹擾。如果task_worker.py 的 authkey和task_master.py的authkey不一致,肯定連接不上。
#Python 的分布式進程接口簡單,封裝良好,適合需要把繁重任務分布到多臺機器的環境下。
#註意Queue的作用是用來傳遞任務和接收結果,每個任務的描述數量要盡量小。比如發送一個處理日誌文件的任務,就不要發送幾百兆的日誌文件本身,而是發送日誌
#文件存放的完整路徑,由worker進程再去共享的磁盤上讀取文件。
多線程 及 分布式進程間的通信