1. 程式人生 > >【Python】程式設計筆記9

【Python】程式設計筆記9

文章目錄

程序和執行緒

對於作業系統來說,一個任務就是一個程序(Process)

在一個程序內部,要同時幹多件事,就需要同時執行多個“子任務”,將程序內的這些“子任務”稱為執行緒(Thread)

==》如何同時執行多個任務?

  • 方法1:多程序模式。 啟動多個程序,每個程序雖然只有一個執行緒,但是多個程序可以一塊執行多個任務。
  • 方法2:多執行緒模式。 啟動一個程序,在一個程序內啟動多個執行緒,則多個執行緒可以一塊執行多個任務。
  • 方法3:多程序+多執行緒模式。啟動多個程序,每個程序再啟動多個執行緒==》模型複雜,很少採用。
  • 注意:還需考慮相互通訊和協調、同步、資料共享的問題。

一、基礎知識

Unix/Linux系統中的 fork() 系統呼叫,fork() 呼叫一次,返回兩次。(作業系統自動將當前程序(父程序)複製一份(子程序),然後分別在父程序和子程序內返回。)

**子程序永遠返回 0,而父程序返回子程序的 ID。**子程序呼叫 getppid() 就可以獲得父程序的 ID。

import
os print('Process (%s) start...' % os.getpid()) ## 僅在Unix/Linux系統下 pid = os.fork() if pid == 0: print("I am child process (%s) and my parent is %s." % (os.getpid(), os.getppid())) else: print("I (%s) just created a child process (%s)." % (os.getpid(), pid))

輸出結果

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
  • os.getpid()——獲取當前程序的 ID
  • os.getppid()——獲得當前程序的父程序的 ID

==》fork 呼叫可以在一個程序在接到新任務時就可以複製出一個子程序來處理新任務。

二、多程序(multiprocessing)

模組:multiprocessing——跨平臺

1、初體驗

Process 類來代表一個程序物件,建立例項物件時,需要傳入執行函式和執行函式的引數;
start()方法啟動程序例項;
join()方法可以等待子程序結束後再繼續往下執行,常用於程序間的同步。

from multiprocessing import Process
import os

# 子程序要執行的程式碼
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__ == "__main__":
    print('Parent process %s.' % os.getpid())
    ## 建立一個程序物件 p
    ## 引數:執行函式、執行函式的引數
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    # 啟動子程序 p
    p.start()
    p.join()   # 常用於程序間的同步
    print('Child process end.')

輸出結果

Parent process 10808.
Child process will start.
Run child process test (13692)...
Child process end.

2、Pool(程序池)

用程序池(Pool)的方式批量建立子程序。

Pool 的預設大小是 CPU 的核數

(1)非阻塞

import multiprocessing, os
import time

def func(msg):
    print("msg:", msg, "(%s)" % os.getpid())
    start = time.time()
    time.sleep(3)
    end = time.time()
    print("Task %s end %f" % (msg, (end - start)))

if __name__ == '__main__':
    print('Parent process %s.' % os.getpid())
    pool = multiprocessing.Pool(processes = 3)
    for i in range(3):
        msg = "hello %d" % (i)
        pool.apply_async(func, (msg,))
    pool.close()
    pool.join()
    print("Sub-process(es) done.")

輸出結果

Parent process 1000.
msg: hello 0 (4416)
msg: hello 1 (14240)
msg: hello 2 (828)
Task hello 0 end 3.000664
Task hello 1 end 3.000952
Task hello 2 end 3.000939
Sub-process(es) done.

(2)阻塞

import multiprocessing, time, os

def func(msg):
    print("msg:", msg, "(%s)" % os.getpid())
    start = time.time()
    time.sleep(3)
    end = time.time()
    print("Task %s end %f" % (msg, (end - start)))
if __name__ == '__main__':
    print('Parent process %s.' % os.getpid())
    pool = multiprocessing.Pool(processes = 3)
    for i in range(3):
        msg = "hello %d" % (i)
        pool.apply(func, (msg,))
    pool.close()
    pool.join()
    print("Sub-process(es) done.")

輸出結果

Parent process 244.
msg: hello 0 (10280)
Task hello 0 end 3.000280
msg: hello 1 (15200)
Task hello 1 end 3.000413
msg: hello 2 (10140)
Task hello 2 end 3.000886
Sub-process(es) done.

(4)程式碼解讀

對 Pool 物件呼叫 join() 方法會等待所有子程序執行完畢,呼叫 join() 之前必須先呼叫 close(),呼叫 close()之後就不能繼續新增新的 Process 了。

(3)分析

區別主要是 apply_async和 apply函式,前者是非阻塞的,後者是阻塞。非阻塞多個子程序可以同時進行,而阻塞子程序依次進行。

3、子程序

子程序是一個外部程序,需要控制其輸入和輸出。主要功能是執行外部的命令和程式。
==》模組:subprocess

subprocess包中定義有數個建立子程序的函式,這些函式分別以不同的方式建立子程序,所以我們可以根據需要來從中選取一個使用。另外subprocess還提供了一些管理標準流(standard stream)和管道(pipe)的工具,從而在程序間使用文字通訊。

使用subprocess包中的函式建立子程序的時候,要注意:

  • 在建立子程序之後,父程序是否暫停,並等待子程序執行。
  • 函式返回什麼
  • 當returncode不為0時,父程序如何處理。

(1)開啟子程序

  • subprocess.call()
  • subprocess.check_call()
  • subprocess.check_output()

引數:命令字串,eg:([‘ping’,‘www.baidu.com’,’-c’,‘3’]) 或 (“ping www.baidu.com -c 3”) 兩種形式。在Windows環境下,最好新增 shell=True 引數,使得可以順利地執行dos命令。

區別:返回值。子程序的執行返回碼;若返回碼是0則返回0,否則出錯的話raise起CalledProcessError,可以用except處理之;若返回碼是0則返回子程序向stdout輸出的結果,否則也raise起CalledProcessError。

三種方法均會讓父程序掛起等待,在子程序結束之前,父程序不會繼續執行下去。

本質:對 subprocess.Popen 方法的封裝,Popen 開啟的子程序不會讓父程序等待其完成的,除非呼叫 wait() 方法。

import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup','www.python.org'])
print('Exit coe:' ,r)

結果輸出

$ nslookup www.python.org
��Ȩ��Ӧ��:
������:  UnKnown
Address:  192.168.43.1

����:    dualstack.python.map.fastly.net
Addresses:  2a04:4e42:6::223
	  151.101.24.223
Aliases:  www.python.org

Exit coe: 0

(2)新增輸入——communicate()方法

## 有輸入的子程序
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('gbk'))
# returncode 子程序的退出狀態
print('Exit code:', p.returncode)

結果輸出

$ nslookup
預設伺服器:  UnKnown
Address:  192.168.43.1

> > 伺服器:  UnKnown
Address:  192.168.43.1

python.org	MX preference = 50, mail exchanger = mail.python.org
> 
Exit code: 0

stdin, stdout 和 stderr:指定了執行程式的標準輸入,標準輸出和標準錯誤的檔案控制代碼。它們的值可以是PIPE, 一個存在的檔案描述符(正整數),一個存在的檔案物件,或 None。

4、程序間通訊

多種機制:Queue、Pipes等方式交換資料。

示例:Queue為例,在父程序中建立兩個子程序,一個往 Queue 裡寫資料,一個從 Queue 裡讀資料。

from multiprocessing import Process, Queue
import os, time, random

# 寫資料程序執行的程式碼
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 讀資料程序執行的程式碼
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__ == '__main__':
    # 父程序建立 Queue,並傳給各個子程序
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q, ))
    # 啟動子程序 pw,寫入
    pw.start()
    # 啟動子程序 pr,讀取
    pr.start()
    # 等待 pw 結束
    pw.join()
    # pr 程序裡是死迴圈,無法等待其結束,只能強行終止
    pr.terminate()

結果輸出

Process to write: 3564
Put A to queue...
Process to read: 2500
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

三、多執行緒

一個程序內包含若干執行緒(至少有一個執行緒)。

執行緒是作業系統直接支援的執行單元 ==》高階語言內建多執行緒支援。Python中為真正的 Posix Thread

模組:_thread 和 threading

  • _thread:低階模組
  • threading:對_thread的封裝,最常用。

1、啟動

啟動一個執行緒就是把一個函式傳入並建立 Thread 例項,然後呼叫 start() 開始執行。

import time, threading

# 新執行緒執行的程式碼
def loop():
    print('Thread %s is running...' % threading.current_thread().name)
    n = 0
    while n < 5:
        n = n + 1
        print('Thread %s >>> %s' % (threading.current_thread().name, n))
        time.sleep(1)
    print('Thread %s ended.' % threading.current_thread().name)

print('Thread %s is running...' % threading.current_thread().name)
## 建立子執行緒
t = threading.Thread(target=loop, name='LoopThread')
## 啟動子執行緒
t.start()
t.join()
print('Thread %s ended.' % threading.current_thread().name)

結果輸出

Thread MainThread is running...
Thread LoopThread is running...
Thread LoopThread >>> 1
Thread LoopThread >>> 2
Thread LoopThread >>> 3
Thread LoopThread >>> 4
Thread LoopThread >>> 5
Thread LoopThread ended.
Thread MainThread ended.

分析

  • 任何程序預設就會啟動一個執行緒(主執行緒,name:MainThread),主執行緒又可以啟動新的執行緒。
  • threading.current_thread()函式,返回當前執行緒的例項。
  • 子執行緒的名字在建立時指定,eg:LoopThread。若不指定,則自動給執行緒命名為 Thread-1、Thread-2…

2、Lock

程序與執行緒最大的區別

  • 多程序中,同一變數,各自有一份拷貝存於每個程序,互不影響;
  • 多執行緒中,所有變數都由所有執行緒共享。==》任何一個變數都可以被任何一個執行緒修改。

原因:高階語言的一條語句在CPU執行時是若干條語句,而執行這幾條語句中時,執行緒可能中斷,從而導致多個執行緒把同一個物件的內容改亂了。
==》解決方法:threading.Lock() 函式。

import time, threading

# 假定這是你的銀行存款
balance = 0
lock = threading.Lock()

def run_thread(n):
    for i in range(10000000):
        # 先要獲取鎖
        lock.acquire()
        try:
            # 放心地改吧
            change_it(n)
        finally:
            # 改完了一定要釋放鎖
            lock.release()

def change_it(n):
    # 先存後取,結果應該為 0
    global balance
    balance = balance + n
    balance = balance - n

t1 = threading.Thread(target=run_thread, args=(5, ))
t2 = threading.Thread(target=run_thread, args=(8, ))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

結果輸出:0
分析:當多個執行緒同時執行 lock.acquire()時,只有一個執行緒能成功地獲取鎖,然後繼續執行程式碼,其他執行緒就繼續等待直到獲得鎖為止。

注意:獲得鎖的執行緒使用完要釋放鎖,否則會造成死執行緒,推薦使用 try…finally 方式。

缺點

  • 阻止多執行緒併發的執行:包含鎖的某段程式碼實際只能單執行緒模式執行,效率低;
  • 存在多個鎖時,不同的執行緒持有不同的鎖,並試圖獲取對方持有的鎖時,可能造成死鎖,導致多個執行緒全部掛起,既不能執行,也無法結束,只能靠作業系統強制終止。

3、多核CPU

多核CPU遇到死迴圈時:

  • CPU使用率:一個死迴圈執行緒會100%佔用一個CPU;兩個則會佔到200%CPU(兩個CPU核心)==》要想把 N 核 CPU 的核心全部跑滿,就必須啟動 N 個死迴圈執行緒。

在Python中,當直譯器執行程式碼時,有個 GIL鎖(Global Interpreter Lock)。任何 Python 執行緒執行前,先必須獲得GIL鎖,然後,每執行100條位元組碼,直譯器就自動釋放 GIL 鎖,讓其他的執行緒有機會執行。
==》多執行緒在Python中只能交替執行,也只能用到一個核。
==》可以使用多執行緒,但不能有效利用多核,除非通過C擴充套件實現。
==》Python 可利用多程序實現多工。多個程序有自己獨立的 GIL 鎖,互不影響。

4、ThreadLocal

ThreadLocal 最常用於 為每個執行緒繫結一個數據庫連線、HTTP請求、使用者身份資訊等。
==》一個執行緒的所有呼叫到的處理函式都可以訪問這些資源。

import threading

# 建立全域性 ThreadLocal 物件
local_school = threading.local()

def process_student():
    # 獲取當前執行緒關聯的 student,
    # 每個Thread對它都可讀寫student屬性,互不影響
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(name):
    # 繫結 ThreadLocal 的 student
    local_school.student = name
    process_student()

t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('Bob', ), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

結果輸出

Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)

四、程序 vs. 執行緒

1、多工實現

通常設計 Master-Worker 模式,Master 負責分配任務,Worker 負責執行任務。
==》一個 Master、多個 Worker

若用多程序實現 Master-Worker,主程序是 Master,其他程序是 Worker;
==》穩定性高。(一個子程序崩潰了,不會影響主程序和其他子程序。當然主程序掛了所有程序就全掛了,但是Master 程序只負責分配任務,掛掉的概率低,eg:Apache)
==》建立程序的代價大。 Windows下開銷巨大。

若用多執行緒實現 Master-Worker,主執行緒是 Master,其他執行緒是 Worker。
==》稍快,效率高(Windows下IIS服務區預設採用多執行緒模式)
==》致命缺點:任何一個執行緒掛掉都可能直接造成整個程序崩潰(所有執行緒共享程序的記憶體)。

2、執行緒切換

單任務模式(批處理任務模式):處理完任務A,再處理任務 B …

多工模式:涉及到任務的切換。作業系統在切換程序或
者執行緒時也是一樣的,它需要先儲存當前執行的現場環境( CPU 暫存器狀態、記憶體頁等),然後,把新任務的執行環境準備好(恢復上次的暫存器狀態,切換記憶體頁等),才能開始執行。
==》任務過多,會造成系統處於假死的狀態。

3、計算密集型 vs. IO 密集型

計算密集型任務

  • 需要大量的計算,消耗CPU資源。==》強調:程式碼效率(C語言)
  • 雖可以用多工完成,但任務越多,切換所需的時間就越多,CPU執行任務的效率就越低 ==》高效:計算密集型任務同時進行的數量等於CPU的數量。

IO 密集型任務

  • 涉及網路、磁碟IO的任務均為IO 密集型任務;
  • CPU 消耗少,大多數時間(99%)是等待IO操作完成。
  • 任務越多,CPU效率越高==》強調:開發效率(Python)

4、非同步IO

可實現:用單程序單執行緒模型執行多工,稱為事件驅動模型

五、分散式程序

Thread vs. Process

  • Process 更穩定,可分佈到多臺機器上;
  • Thread 最多隻能分佈到同一臺機器的多個 CPU 上。

multiprocessing.managers 模組

  • 支援把多程序分佈到多臺機器上。一個服務程序為排程者,通過網路通訊將任務分佈到其他多個程序中。

待完善。。。