1. 程式人生 > >[ Python - 14 ] python進程及線程編程

[ Python - 14 ] python進程及線程編程

join() 都沒有 time() reading color 實體 def 概念 test

什麽是進程:

簡單來講,進程就是操作系統中運行的程序或任務,進程和程序的區別在於進程是動態的,而程序是靜態的。進程是操作系統資源管理的最小單位。

什麽是線程:

線程是進程的一個實體,是cpu調度和分派的最小單位,它是比進程更小的能獨立運行的基本單位,線程本身不擁有資源,但它可以與同屬於一個進程的線程共享進程的資源所擁有的全部資源。

python多線程編程與GIL:

為了更有效的利用多核處理,就出現了多線程編程,但是問題是線程間數據的一致性和狀態的同步如果得到保證,因此python解析器引入了GIL全局鎖
GIL全局鎖的出現雖然保證了線程之間狀態和一致性的原則,但是同一時間點上卻只能有一個線程在運行。比如:我們有4核CPU,同時發起4個線程,每個
線程都在cpu上,但因為GIL全局鎖的存在,在同一時間片上只有一個線程,所以多線程並發在python中就是一個美麗的夢。

線程與進程的區別:

1. 線程共享創建它的進程的地址空間;進程有自己的地址空間。
2. 線程可以直接訪問其進程的數據段;進程有自己的父進程數據段的副本。
3. 新線程很容易創建;新進程需要父進程fork。
4. 線程可以對同一進程的線程進行相當大的控制;進程只能對子進程執行控制。
5. 對主線程的更改(取消、優先級變更等)可能會影響進程的其他線程的行為;對父進程的更改不會影響子進程。

python多進程模型

multiprocessing 是一個跨平臺版本的多進程模塊,multiprocessing模塊提供了一個Process類來代表一個進程對象.

1. 進程的基本用法

#!_*_coding:utf-8_*_
# Author: hkey
from multiprocessing import Process     # 導入Process方法
import os

def run_proc():
    print(child process run %s (%s) %(os.getpid(), os.getppid()))     # os.getpid獲取當前進程的pid,os.getppid 獲取當前進程的父進程pid


if __name__ == __main__:
    print(parent process id: 
, os.getpid()) p = Process(target=run_proc) p.start() p.join() print(parent process %s done. %os.getpid())

創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,用start()方法啟動。
join()方法可以等待子進程結束後再繼續往下運行,通常用於進程間的同步,當使用join()方法時就阻塞了主進程,直到子進程執行完畢,再次執行主進程

2. 進程池Pool

如果要啟動大量子進程,可以用進程池的方式批量創建子進程:

#!_*_coding:utf-8_*_
# Author: hkey
from multiprocessing import Pool, Process
import os, time

def run_proc():
    print(Run task %s (%s) %(os.getpid(), os.getppid()))
    start_time = time.time()
    time.sleep(1)
    print(Task %s runs %.2f seconds. %(os.getpid(), time.time()-start_time))

if __name__ == __main__:
    print(parent process %s % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(run_proc)
    p.close()
    p.join()
    print(parent process %s done. % os.getpid())


輸出結果:

parent process 12980

Run task 8064 (12980)
Run task 9224 (12980)
Run task 11604 (12980)
Run task 13604 (12980)

Task 8064 runs 1.00 seconds.
Run task 8064 (12980)
Task 9224 runs 1.00 seconds.
Task 11604 runs 1.00 seconds.
Task 13604 runs 1.00 seconds.
Task 8064 runs 1.00 seconds.
parent process 12980 done.

上面的例子,進程池最大限制4個子進程,但是循環了5次。從結果可以看到首先建立了4個子進程,當其中一個退出後,再次創建第5個子進程。
特別註意的是,在使用進程池的時候,首先要調用close()方法,調用close()方法後就不能繼續添加新的子進程了,然後再調用join()方法。

3. 進程的鎖lock

當多個進程需要訪問共享資源的時候,Lock可以用來避免訪問沖突。
通過實例測試,在不加鎖的情況下,多進程無論在讀寫同一個文件還是cpu計算都沒有發生錯誤的現象。可能是我測試的量不夠。

對文件寫入:

#!_*_coding:utf-8_*_
#__author__:"hkey"
import multiprocessing, sys
def write1(f):
    fs = open(f, a+)
    n = 10000
    while n > 0:
        fs.write(aaaaaaaaaaaaaa\n)
        n -= 1
    fs.close()


def write2(f):
    fs = open(f, a+)
    n = 10000
    while n > 0:
        fs.write(bbbbbbbbbbbbbbbb\n)
        n -= 1
    fs.close()

if __name__ == __main__:
    f = test.txt
    p1 = multiprocessing.Process(target=write1, args=(f,))
    p2 = multiprocessing.Process(target=write2, args=(f,))
    p1.start()
    p2.start()

多進程在沒有加鎖的情況下,沒有出現寫入錯誤的現象。

多進程加鎖的寫法:

#!_*_coding:utf-8_*_
#__author__:"hkey"
import multiprocessing, sys
def write1(f, lock):
    lock.acquire()
    fs = open(f, a+)
    n = 10000
    while n > 0:
        fs.write(aaaaaaaaaaaaaa\n)
        n -= 1
    fs.close()
    lock.release()
def write2(f, lock):
    lock.acquire()
    fs = open(f, a+)
    n = 10000
    while n > 0:
        fs.write(bbbbbbbbbbbbbbbb\n)
        n -= 1
    fs.close()
    lock.release()
if __name__ == __main__:
    lock = multiprocessing.Lock()
    f = test.txt
    p1 = multiprocessing.Process(target=write1, args=(f,lock))
    p2 = multiprocessing.Process(target=write2, args=(f,lock))
    p1.start()
    p2.start()

個人總結:在多進程編程中,如果只是變量的計算或者cpu計算,可以不加鎖,因為每個進程的地址空間都是獨立的存在。
而在寫入同一個文件的時候,就有必要加鎖。

4. 子進程的控制

子進程是獨立與主進程的存在,創建子進程時,系統fork出子進程後,就與主進程資源完全獨立了,我們不單單創建完子進程就行了,還要控制子進程
的輸入和輸出。

subprocess模塊可以讓我們非常方便的啟動一個子進程,然後控制其輸入和輸出。

子進程輸出:
使用subprocess模塊,在子進程中運行 ping -n 1 baidu.com

#!_*_coding:utf-8_*_
# Author: hkey
import subprocess

print($ ping -n 1 baidu.com)
r = subprocess.call([ping, -n, 1, baidu.com])
print(Exit code:, r)

輸出結果:

$ ping -n 1 baidu.com

正在 Ping baidu.com [111.13.101.208] 具有 32 字節的數據:
來自 111.13.101.208 的回復: 字節=32 時間=22ms TTL=51

111.13.101.208 的 Ping 統計信息:
    數據包: 已發送 = 1,已接收 = 1,丟失 = 0 (0% 丟失),
往返行程的估計時間(以毫秒為單位):
    最短 = 22ms,最長 = 22ms,平均 = 22ms
Exit code: 0



子進程輸入:

print($nslookup)
p = subprocess.Popen([nslookup], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(bset q=mx\napache.org\nexit\n)     # 通過調用communicate方法實現輸入,這裏是二進制格式。
result = output if output else err
print(result.decode(gbk))
print(Exit code:, p.returncode)

輸出結果:
# 主機dns有問題,但是輸出結果完全正確的。
$nslookup
默認服務器:  UnKnown
Address:  127.0.0.1

> > 服務器:  UnKnown
Address:  127.0.0.1

> 
Exit code: 0

5. 進程間通信

雖然子進程從主進程fork後是獨立的存在,但進程之間肯定是需要通信的。python的multiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等方式
來交換數據。
進程間通信,最典型的例子就是消費者生產者模型, 生產者生產數據,消費者消費。

#!_*_coding:utf-8_*_
# Author: hkey
from multiprocessing import Process, Queue
import os, time
def write(q):
    for i in range(10):
        print(process to write: %s % os.getpid())
        print(生產包子[%s] %i)
        q.put(i)    # 將i上傳至隊列中。
        time.sleep(1)
def read(q):
    while True:
        print(process to read: %s % os.getpid())
        values = q.get(True)     # 通過get方法將隊列中的數據下載,從隊列中拿走一個數據就少一個數據。
        print(吃掉包子[%s] %values)
if __name__ == __main__:
    q = Queue()        # 調用Queue方法生成一個隊列
    pw = Process(target=write, args=(q,))     # 通過進程的方式調用
    pr = Process(target=read, args=(q,))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()     # 因為read()是死循環,需要通過terminate()方法關閉。
    
輸出結果:

process to read: 16628
process to write: 16440
生產包子[0]
吃掉包子[0]
process to read: 16628
process to write: 16440
生產包子[1]
吃掉包子[1]
process to read: 16628
process to write: 16440
生產包子[2]
吃掉包子[2]
......

通過結果發現是兩個進程通過隊列在通信,生產一個包子,吃掉一個包子。

什麽是生產者消費者模型

在python中,生產者消費者模型是一個很典型而且很經典的例子。
隊列的概念就是在生產者和消費者中間加一個類似倉庫的中間層,生產者不在直接對應消費者,而是生產者將生產好的東西放置到倉庫中,而當消費者需要
的時候,自己去倉庫中取出東西就好。這樣做有以下幾個優點:

1. 解耦
2. 支持並發
3. 支持忙閑不均

python多線程模型

多任務可以由多進程完成,也可以由一個進程內的多線程完成。一個進程中至少有一個線程。線程是操作系統直接支持的執行單元。
threading模塊提供python對線程的使用

1. 線程的基本用法

啟動一個線程就是把一個函數傳入並創建Thread實例,然後調用start()開始執行,語法和進程差不多

#!_*_coding:utf-8_*_
# Author: hkey
import threading, time

def run_thread():
    print(thread %s running... % threading.current_thread().name)
    n = 0
    while n < 5:
        n += 1
        print(thread %s >>> %s % (threading.current_thread().name, n))
        time.sleep(1)
    print(thread %s ended. % threading.current_thread().name)

if __name__ == __main__:
    print(threading %s is running... % threading.current_thread().name)
    t = threading.Thread(target=run_thread, name=LoopThread)
    t.start()
    t.join()
    print(thread %s ended. % threading.current_thread().name)
    

輸出結果:

threading MainThread is running...
thread LoopThread running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

任何一個進程默認就有一個線程,我們把該線程稱為主線程,主線程又可以啟動新的線程,python的threading模塊有個current_thread()函數,
它永遠返回當前線程的實例。主線程的名字叫 MainThread,子線程的名字在創建時指定,我們用LoopThread命名子線程。

2. 線程的型號量

相對於進程來說,線程占用的資源就很小,因此沒有使用到線程池的概念,但是要實現類似線程池的功能可以使用線程的型號量來做限制
線程的信號量是同時允許一定數量的線程更改數據,主要作用在於限制線程的並發。

#!_*_coding:utf-8_*_
#__author__:"hkey"
import threading, time, os

sem = threading.BoundedSemaphore(3)     # 調用BoundedSemaphore方法限制3個線程的並發
def run():
    sem.acquire()    # 開始
    print(threading running, threading.current_thread().name)
    time.sleep(1)    
    sem.release()    # 結束
if __name__ == __main__:
    for i in range(10):
        t = threading.Thread(target=run)
        t.start()

3. 線程的鎖Lock

由於線程是共享進程的地址空間,所以在多線程的環境下,鎖就顯得尤為重要。多線程編程,在不加鎖的情況下,同時對一個全局變量做修改,基本上全是錯誤。

#!_*_coding:utf-8_*_
#__author__:"hkey"
import threading
balance = 0
def run_thread(n):
    global balance
    for i in range(10000000):
        balance = balance + n
        balance = balance - n

if __name__ == __main__:
    t1 = threading.Thread(target=run_thread, args=(5,))
    t2 = threading.Thread(target=run_thread, args=(8,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(balance)

輸出結果:
-86

多線程編程加鎖實例如下:

#!_*_coding:utf-8_*_
#__author__:"hkey"
import threading
balance = 0
def run_thread(lock, n):
    lock.acquire()
    global balance
    for i in range(10000000):
        balance = balance + n
        balance = balance - n
    lock.release()
if __name__ == __main__:
    lock = threading.Lock()
    t1 = threading.Thread(target=run_thread, args=(lock, 5))
    t2 = threading.Thread(target=run_thread, args=(lock, 8))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(balance)

輸出結果:
0

多線程鎖總結:在多線程編程中,無論是變量的計算還是寫入同一個文件都要加鎖,使用多線程編程一定要註意鎖的使用。

[ Python - 14 ] python進程及線程編程