1. 程式人生 > >Python的程序、執行緒、協程

Python的程序、執行緒、協程

1 程序

1.1 multiprocessing(跨平臺的多程序模組)

在Unix/Linux下,multiprocessing模組封裝了fork()呼叫,而Windows沒有fork()呼叫,因此multiprocessing需要模擬fork。因為父程序所有Python物件都必須通過pickle序列化再傳到子程序去,如果multiprocessing在Windows下呼叫失敗了,要先考慮是不是pickle失敗了。

Process(程序物件類)
Process(target, args):建立一個程序物件。target為程序執行的函式,args為函式的引數,args是元組,注意單個元素的元組寫法。
Process.start():啟動子程序。
Process.join():在父程序中呼叫(Process為子程序物件),等待子程序結束。
Process.terminate():強行終止子程序。

#!/bin/env python
# -*- coding: utf-8 -*-

from multiprocessing import Process
import os

def run_proc(args):
    print('Child process %s is running, args %s.' % (os.getpid(), args))

if __name__ == '__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('child process 1'
,)) print('Child process will start.') p.start() p.join() print('Child process is finished.')

Pool(程序池)

  • Pool(n):建立程序池,n為程序池中程序的數量。
  • Pool.apply_async(target, args):新增程序,引數target和args的含義與Process相同。如果新增的程序數量超過程序池的大小,多出來的程序暫時等待,但是apply_async()函式非同步返回。
  • Pool.close():呼叫join()之前必須先呼叫close(),呼叫close()之後就不能繼續新增程序。
  • Pool.join():等待所有子程序結束。
#!/bin/env python
# -*- coding: utf-8 -*-
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

Queue(程序間通訊)

  • Queue.put(value):寫資料
  • Queue.get():讀資料

managers(分散式程序)

例:如果已經實現了通過Queue通訊的多程序程式在一臺機器上執行,現在希望把派發任務的程序和處理任務的程序分佈到兩臺機器上。

服務程序負責啟動Queue,把Queue註冊到網路上,然後往Queue裡面寫入任務。使用分散式多程序時,新增任務到Queue不可以直接對原始的Queue物件進行操作,必須通過manager.get_task_queue()獲得的Queue介面。(實際上就是RPC,服務程序向Queuemanager註冊名為get_task_queue的RPC,任務程序通過這個名字獲取Queue)

Queue的作用是用來傳遞任務和接收結果,任務和結果的描述資料要儘量小。比如傳送一個處理日誌檔案的任務,就不要傳送幾百兆的日誌檔案本身,而是傳送日誌檔案存放的完整路徑,由Worker程序再去共享的磁碟上讀取檔案。

# task_master.py

import random, time, queue
from multiprocessing.managers import BaseManager

# 傳送任務的佇列:
task_queue = queue.Queue()
# 接收結果的佇列:
result_queue = queue.Queue()

# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把兩個Queue都註冊到網路上, callable引數關聯了Queue物件:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 繫結埠5000, 設定驗證碼'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 啟動Queue:
manager.start()
# 獲得通過網路訪問的Queue物件:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個任務進去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 從result佇列讀取結果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 關閉:
manager.shutdown()
print('master exit.')

# task_worker.py

import time, sys, queue
from multiprocessing.managers import BaseManager

# 建立類似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由於這個QueueManager只從網路上獲取Queue,所以註冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 連線到伺服器,也就是執行task_master.py的機器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 埠和驗證碼注意保持與task_master.py設定的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 從網路連線:
m.connect()
# 獲取Queue的物件:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task佇列取任務,並把結果寫入result佇列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 處理結束:
print('worker exit.')

2 執行緒(threading)

適用場景:I/O密集型操作。對於計算密集型任務,應該使用Python多程序。

GIL鎖:Python的執行緒雖然是真正的執行緒,但直譯器執行程式碼時,有一個GIL鎖(Global Interpreter Lock),任何Python執行緒執行前,必須先獲得GIL鎖,然後每執行100條位元組碼,直譯器就自動釋放GIL鎖,讓別的執行緒有機會執行。這個GIL全域性鎖實際上把所有執行緒的執行程式碼都給上了鎖,所以多執行緒在Python中只能交替執行,即使100個執行緒跑在100核CPU上,也只能用到1個核。

GIL是Python直譯器設計的歷史遺留問題,通常我們用的直譯器是官方實現的CPython,要真正利用多核,除非重寫一個不帶GIL的直譯器。所以在Python中,可以使用多執行緒,但不要指望能有效利用多核。如果一定要通過多執行緒利用多核,有兩種辦法:通過C擴充套件來實現,但這樣就失去了Python簡單易用的特點;通過多程序實現多核任務。多個Python程序有各自獨立的GIL鎖,互不影響。

2.1 執行緒管理

  • threading.Thread(target, name):建立執行緒物件,target為執行緒函式,name為執行緒名,如果不起名字Python就自動給執行緒命名為Thread-1,Thread-2。
  • Thread.start():啟動執行緒(啟動的是呼叫start()的threading例項物件)。
  • Thread.join(timeout):主執行緒阻塞等待子執行緒結束。如果超過timeout秒子執行緒還沒結束,主執行緒會殺死daemon的子執行緒*,否則主執行緒自己返回,子執行緒繼續執行直到返回為止。
  • Thread.daemon = True:設定為守護執行緒。
import threading, time

def fun():
    print("thread start")
    time.sleep(3)
    print("thread end")

print("main start")
t = threading.Thread(target=fun)
t.daemon = True
t.start()
t.join()
print("main end")

t.join() #不論子執行緒是不是daemon,主執行緒都等待子執行緒結束
main start
thread start
thread end
main end

t.join(1) && t.daemon == True #主執行緒殺死daemon子執行緒
main start
thread start
main end

t.join(1) && t.daemon == False #主執行緒自己返回,子執行緒的執行完返回
main start
thread start
main end
thread end

2.2 執行緒同步

  • threading.Lock():建立互斥鎖物件。
  • threading.Lock().acquire():獲取鎖。
  • threading.Lock().release():釋放鎖。注意使用finally確保鎖一定會被釋放。
balance = 0
lock = threading.Lock()
def run_thread(n):
    for i in range(100000):
        lock.acquire()
        try:
            change_it(n)
        finally:
            lock.release()

2.3 ThreadLocal(執行緒私有資料)

ThreadLocal變數雖然是全域性變數(下面程式碼中的local_school),但每個執行緒都只能讀寫自己執行緒的獨立副本,互不干擾。可以把ThreadLocal理解成一個dict,local_school.student表示訪問student變數,變數名作為key。
ThreadLocal最常用的地方就是為每個執行緒繫結一個數據庫連線,HTTP請求,使用者身份資訊等,這樣一個執行緒的所有呼叫到的處理函式都可以非常方便地訪問這些資源。

import threading

# 建立全域性ThreadLocal物件:
local_school = threading.local()

def process_student():
    # 獲取當前執行緒關聯的student:
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(name):
    # 繫結ThreadLocal的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

3 協程(Coroutine,微執行緒)

3.1 協程

“Subroutines are special cases of … coroutines.” –Donald Knuth.

Fundamental Algorithms. The Art of Computer Programming 1 (3rd ed.). Addison-Wesley. Section 1.4.2: Coroutines, pp. 193–200.

協程在執行過程中,可以在子程式內部中斷,然後轉而執行別的子程式,在適當的時候再返回來接著執行。

  • 效率高:子程式切換由程式自身控制,沒有執行緒切換的開銷。
  • 無鎖:因為協程是一個執行緒執行,不存在同時寫變數衝突,在協程中控制共享資源不用加鎖。

呼叫包含yield語句的函式返回一個generator物件。主要方法:

  • generator.send(value):暫停呼叫generator.send的函式,轉而從generator函式上次yield的位置繼續執行。value是generator函式上次yield表示式的返回值,generator函式下一次yield的值為generator.send(value)的返回值。
    當第一次呼叫generator函式時傳入的value必須為None,因為generator函式從頭開始執行,找不到上次yield的位置,傳入非None的value就不知道是給哪個yield表示式當作返回值。

  • generator.throw(type):generator函式在上次暫停的位置(通常是yield)丟擲type型別的異常,並繼續執行到下一個yield,yield的值為throw的返回值。如果沒執行到yield,generator.throw()丟擲StopIteration。。如果generator函式沒有捕捉type異常或者丟擲了其他異常,這些異常會被generator.throw()丟擲。當generator返回錯誤值時,呼叫generator函式的一方可以通過異常的形式告訴generator函式。

  • generator.close():讓generator函式返回,通過GeneratorExit異常實現的,它和Exception都是並列的,從BaseException派生。
    當generator.close()被呼叫時,generator函式在上次暫停的位置(通常是yield)丟擲GeneratorExit。
    如果generator函式捕捉了GeneratorExit異常但沒有return並再次執行到了yield,呼叫generator.close()的函式會丟擲RuntimeError。這個機制實現了資料收發同步,當呼叫generator函式的一方已經明確告訴generator函式自己不再接收資料了,但是generator函式偏偏又返回了資料。
    generator函式不捕捉GeneratorExit異常即可自動返回。generator函式正確的layout(參考6.2.9.2. Examples):

def generator_func(value=None):
    do_set_up()
    try:
        while True:
            try:
                recv_from_peer = (yield send_to_peer)
            except Exception as e:
                do_peer_excption()
    finally:
        do_tear_down()

例:使用協程的生產者-消費者模型。

def consumer(): 
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

3.2 生成器

生成器是一種特殊的可迭代物件。

python中的生成器有兩種形式:

  1. 推導式
  2. 帶有yield語句的函式

當函式中遇到yield value時,函式被掛起,value作為呼叫next(生成器物件)的返回值。下一次再呼叫next(生成器物件)時,函式返回繼續執行,直到函式執行完成或者再次遇到yield。
當函式執行完成後再呼叫next(生成器物件),丟擲StopIteration異常。

例1:使用生成器實現斐波那契數列。


例2:利用推導式生成列表、字典、集合等

#建立0-99中不是2,但是3的倍數的所有值
list1 = [i for i in range(100) if not (i % 2) and i % 3]

#建立鍵為0-19,值為鍵是否為2的倍數的字典
dict1 = {i : i % 2 == 0 for i in range(20)}

#從一個列表中剔除重複元素建立集合 
set1 = {i for i in [1, 1, 2, 2, 3, 1, 2]}

注:

  1. 字串沒有推導式,因為雙引號裡面的推導式會被解釋為字串。
  2. 推導式是可迭代物,任何接受iterable形參的函式都可以用推導式作為實參。
  3. 元組由於使用圓括號定義,而這裡圓括號由於裡面是推導式而不是值序列,所以這裡圓括號不被解釋為元組的定義,返回的是生成器物件。


3.2 非同步IO

  • 同步IO+多執行緒/多程序:因為IO操作阻塞了當前執行緒,其他程式碼無法執行,所以必須使用多執行緒或者多程序來併發執行。每個使用者都會分配一個執行緒,如果遇到IO導致執行緒被掛起,其他使用者的執行緒不受影響。

  • 非同步IO:當代碼需要執行一個耗時的IO操作時,它只發出IO指令,並不等待IO結果,然後就去執行其他程式碼了。一段時間後,當IO返回結果時,再通知CPU進行處理。非同步IO模型的核心是訊息迴圈,主執行緒不斷地重複“讀取訊息-處理訊息”這一過程。

asyncio:3.4引入的非同步IO標準庫模組

  1. asyncio的程式設計模型就是一個訊息迴圈。首先從asyncio模組中直接獲取一個EventLoop,然後把需要執行的協程扔到EventLoop中執行,就實現了非同步IO。
  2. asyncio實現了TCP、UDP、SSL等協議,aiohttp則是基於asyncio實現的HTTP框架。
  3. async/await:3.5引入的協程新語法。async等效於@asyncio.coroutine,await等效於yield from。

主要用法:

  1. @asyncio.coroutine:把一個generator標記為coroutine,進而可以在EventLoop中執行。
  2. ans = yield from coroutine_func():呼叫另一個coroutine並獲得返回值。當執行到這一句時,當前執行緒不會阻塞,而是轉去執行下一個就緒的訊息處理函式。當coroutine_func()執行完以後,再回到當前的協程中繼續執行。

例:用asyncio的非同步網路連線來獲取sina、sohu和163的網站首頁。3個連線由一個執行緒通過coroutine併發完成。

import threading
import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s... [%s]' % (host, threading.currentThread()))
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in 
    ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

@asyncio.coroutine
def hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")

async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")

參考