1. 程式人生 > >Day-12: 進程和線程

Day-12: 進程和線程

多進程 繼承 lose running and abc 釋放 時間 進程創建

  • 進程和線程

  在操作系統看來,一個任務就是一個進程,而一個進程內部如果要做多個任務就是有多個線程。一個進程至少有一個線程。

  真正的並行執行任務是由多個CUP分別執行任務,實際中是由,操作系統輪流讓各個任務交替執行,任務1執行0.01秒,任務2執行0.01秒,之後再依次切換。

  Python中支持兩種模式:

  多進程模式

  多線程模式

  • 多進程

  Linux操作系統下,提供了一個fork()系統調用。調用一次fork(),返回兩次,因為操作系統自動把當前的進程(作為父進程)復制了一份(稱為子進程),然後子進程返回0,父進程返回子進程的ID。

# multiprocessing.py
import os print Process (%s) start... % os.getpid() pid = os.fork() if pid==0: print I am child process (%s) and my parent is %s. % (os.getpid(), os.getppid()) else: print I (%s) just created a child process (%s). % (os.getpid(), pid)
Process (876) start...
I (876) just created a child process (877).
I am child process (
877) and my parent is 876.

  由於windows下沒有fork()調用,提供了multiprocessing模塊進行跨平臺版本的多進程模塊。

用Process類代表創建進程對象,傳入一個執行函數和函數的參數。之後再用start()方法啟動,jion()方法可以等待子進程結束後再繼續往下進行,通常用於進程間的同步。

from multiprocessing import Process
import os

# 子進程要執行的代碼
def run_proc(name):
    print Run child process %s (%s)...
% (name, os.getpid()) if __name__==__main__: print Parent process %s. % os.getpid() p = Process(target=run_proc, args=(test,)) print Process will start. p.start() p.join() print Process end.
Parent process 928.
Process will start.
Run child process test (929)...
Process end.

  Pool進程池創建多個子進程

  對Pool對象創建多個子進程後,用close()方法結束創建,再用join()方法等待所有子進程執行完畢。在每個子進程中會隨機休眠一段時間,其他的子進程在這段休眠時間裏就會調用。

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print Run task %s (%s)... % (name, os.getpid())
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print Task %s runs %0.2f seconds. % (name, (end - start))

if __name__==__main__:
    print Parent process %s. % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print Waiting for all subprocesses done...
    p.close()
    p.join()
    print All subprocesses done.
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

  進程間通信

  Python的miltiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等多種方式來交換數據。

  在父進程中創建兩個子進程,一個往Queue中寫數據,一個從Queue裏讀數據。

from multiprocessing import Process, Queue
import os, time, random

# 寫數據進程執行的代碼:
def write(q):
    for value in [A, B, C]:
        print Put %s to queue... % value
        q.put(value)
        time.sleep(random.random())

# 讀數據進程執行的代碼:
def read(q):
    while True:
        value = q.get(True)
        print Get %s from queue. % value

if __name__==__main__:
    # 父進程創建Queue,並傳給各個子進程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動子進程pw,寫入:
    pw.start()
    # 啟動子進程pr,讀取:
    pr.start()
    # 等待pw結束:
    pw.join()
    # pr進程裏是死循環,無法等待其結束,只能強行終止:
    pr.terminate()
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
  • 多線程

  Python中提供兩個模塊,thread是低級模塊,threading是高級模塊,對thread進行了封裝。

import time, threading

# 新線程執行的代碼:
def loop():
    print thread %s is running... % threading.current_thread().name
    n = 0
    while n < 5:
        n = n + 1
        print thread %s >>> %s % (threading.current_thread().name, n)
        time.sleep(1)
    print thread %s ended. % threading.current_thread().name

print thread %s is running... % threading.current_thread().name
t = threading.Thread(target=loop, name=LoopThread)
t.start()
t.join()
print thread %s ended. % threading.current_thread().name
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

  多進程和多線程最大的不同在於,多進程中,同一個變量,各自有一份拷貝存在於每個進程中,互不影響,而多線程中,所有變量都由所有線程共享,所以,任何一個變量都可以被任何一個線程修改。因此,線程之間共享數據最大的危險在於多個線程同時該變一個變量,把內容給改亂了。  

  因此得加上一把鎖lock

balance = 0
lock = threading.Lock()

def run_thread(n):
    for i in range(100000):
        # 先要獲取鎖:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要釋放鎖:
            lock.release()

  當多個線程同時執行lock.acquire()時,只有一個線程能成功地獲取鎖,然後繼續執行代碼,其他線程就繼續等待直到獲得鎖為止。

  但是,這樣實際上就不是並行處理了。

  Python的多進程由於存在GIL鎖的問題,所以多線程實際上不能有效利用多核。多線程的並發在Python中是無用的。

  • ThreadLocal

  全局變量local_school就是一個ThreadLoacl對象,每個Thread對它都可以讀寫student屬性,但是互不影響。可以把local_school看成全局變量,但每個屬性如local_school.student都是線程的局部變量,可以任意讀寫而互不幹擾,也不用管理鎖的問題,ThreadLocal內部會處理。

import threading

# 創建全局ThreadLocal對象:
local_school = threading.local()

def process_student():
    print Hello, %s (in %s) % (local_school.student, threading.current_thread().name)

def process_thread(name):
    # 綁定ThreadLocal的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=(Alice,), name=Thread-A)
t2 = threading.Thread(target= process_thread, args=(Bob,), name=Thread-B)
t1.start()
t2.start()
t1.join()
t2.join()
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
  • 進程vs線程

  多進程的優點是穩定性高,一個崩潰,不會影響其他的進程,但是,代價大,在linux下,調用fork還可以,但是windows下進程開銷巨大。

  多線程模式比多進程快一點,但是也快不了多少,缺點十分明顯,由於共享進程的內存,一個線程崩了,就都崩了。

  計算密集型和IO密集型:

  計算密集型會消耗大量的CPU資源,代碼的運行效率就至關重要,Python等腳本語言運行效率低,不適合。

  IO密集型涉及到網絡、磁盤IO的任務,它們的CUP消耗較少,任務的主要時間在等待IO操作完成,CUP效率無法完全使用,所以適合開發效率高的語言。

  現代操作系統對IO操作進行了巨大的改進,支持異步IO。利用異步IO,就可以用單進程模型來執行多任務,這種全新的模型稱為事件驅動型。

  • 分布式進程

  多臺電腦協助工作,一臺電腦作為調度者,依靠網絡通信,將任務分布到其他電腦的進程中。

  通過manager模塊把Queue通過網絡暴露出去,讓其他機器的進程可以訪問Queue

服務器繼承中,負責啟動Queue,把Queue註冊到網絡上,然後往Queue裏面寫入任務:

# taskmanager.py

import random, time, Queue
from multiprocessing.managers import BaseManager

# 發送任務的隊列:
task_queue = Queue.Queue()
# 接收結果的隊列:
result_queue = Queue.Queue()

# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把兩個Queue都註冊到網絡上, callable參數關聯了Queue對象:
QueueManager.register(get_task_queue, callable=lambda: task_queue)
QueueManager.register(get_result_queue, callable=lambda: result_queue)
# 綁定端口5000, 設置驗證碼‘abc‘:
manager = QueueManager(address=(‘‘, 5000), authkey=abc)
# 啟動Queue:
manager.start()
# 獲得通過網絡訪問的Queue對象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個任務進去:
for i in range(10):
    n = random.randint(0, 10000)
    print(Put task %d... % n)
    task.put(n)
# 從result隊列讀取結果:
print(Try get results...)
for i in range(10):
    r = result.get(timeout=10)
    print(Result: %s % r)
# 關閉:
manager.shutdown()

在另一臺機器上啟動任務進程:

# taskworker.py

import time, sys, Queue
from multiprocessing.managers import BaseManager

# 創建類似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由於這個QueueManager只從網絡上獲取Queue,所以註冊時只提供名字:
QueueManager.register(get_task_queue)
QueueManager.register(get_result_queue)

# 連接到服務器,也就是運行taskmanager.py的機器:
server_addr = 127.0.0.1
print(Connect to server %s... % server_addr)
# 端口和驗證碼註意保持與taskmanager.py設置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=abc)
# 從網絡連接:
m.connect()
# 獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊列取任務,並把結果寫入result隊列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print(run task %d * %d... % (n, n))
        r = %d * %d = %d % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print(task queue is empty.)
# 處理結束:
print(worker exit.)

服務進程啟動如下:

$ python taskmanager.py 
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...

工作進程啟動如下:

$ python taskworker.py 127.0.0.1
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.

等到工作進程結束後,服務進程如下:

Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

  註意Queue的作用是來傳遞任務和接受結果的,每個任務的描述量要盡量小。比如發送一個處理日誌文件的任務,不要發送幾百兆的日誌文件本身,而是發送日誌文件存放的完整路徑,由Worker進程再去共享的磁盤上讀取文件。

Day-12: 進程和線程