1. 程式人生 > >程序和執行緒——python筆記

程序和執行緒——python筆記

目錄

參考連結:

多程序

優缺點:

多執行緒

優缺點:

參考連結:

多執行緒:需要多核CPU才可能實現,最小的執行單元

多程序:一個程序至少一個執行緒

多工的實現有3種方式:

多程序模式;

多執行緒模式;

多程序+多執行緒模式

多程序

優缺點:

優點:就是穩定性高,因為一個子程序崩潰了,不會影響主程序和其他子程序。(當然主程序掛了所有程序就全掛了,但是Master程序只負責分配任務,掛掉的概率低)著名的Apache最早就是採用多程序模式。

缺點:是建立程序的代價大,在Unix/Linux系統下,用fork呼叫還行,在Windows

下建立程序開銷巨大。另外,作業系統能同時執行的程序數也是有限的,在記憶體和CPU的限制下,如果有幾千個程序同時執行,作業系統連排程都會成問題。

簡單程式碼例子:

multiprocessing

from multiprocessing import Process

process_list = [multiprocessing.Process(target=task_io, args=(i,)) for i in range(multiprocessing.cpu_count())]
    for p in process_list:
        p.start()
    for p in process_list:
        if p.is_alive():
            p.join()

multiprocessing.cpu_count()  #CPU數量

由於Python中GIL的原因,對於計算密集型任務,Python下比較好的並行方式是使用多程序,這樣可以非常有效的使用CPU資源。當然同一時間執行的程序數量取決你電腦的CPU核心數。

Python中的程序模組為mutliprocess模組,提供了很多容易使用的基於物件的介面。另外它提供了封裝好的管道和佇列,可以方便的在程序間傳遞訊息。Python還提供了程序池Pool物件,可以方便的管理和控制執行緒。

Pool(程序池)

如要啟動大量子程序,可以用程序池的方式批量建立子程序:

程式碼解讀:

Pool

物件呼叫join()方法會等待所有子程序執行完畢

呼叫join()之前必須先呼叫close()

呼叫close()之後就不能繼續新增新的Process了。

task 0123是立刻執行的,而task 4要等待前面某個task完成後才執行,這是因為Pool的預設大小在我的電腦上是4

程式碼例1

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

程式碼例2

import json
from multiprocessing import Pool
import requests
from requests.exceptions import RequestException
import re

def get_one_page(url): #判斷頁面的請求狀態來做異常處理
    try:
        response = requests.get(url)
        if response.status_code == 200:#200是請求成功
            return response.text
        return None
    except RequestException:
        return None

def parse_one_page(html):
    pattern = re.compile('<dd>.*?board-index.*?>(\d+)</i>.*?<p class="name">.*?data-val.*?>(.*?)</a>'#正則表示式
                         +'.*?star">(.*?)</p>.*?releasetime">(.*?)</p>'
                         +'.*?integer">(.*?)</i>.*?fraction">(\d+)</i>.*?</dd>',re.S)
    items = re.findall(pattern, html) #返回正則結果
    for item in items:  #對結果進行迭代,修飾
        yield{
            '排名:':item[0],
            '電影:':item[1],
            '主演:':item[2].strip()[3:],
            '上映時間:':item[3].strip()[5:],
            '評分:':item[4]+item[5]
        }
def write_to_file(content): #寫入檔案“result.txt”中
    with open('result.txt', 'a', encoding='utf-8') as f: #以utf-8的編碼寫入
        f.write(json.dumps(content, ensure_ascii=False) + "\n") #json序列化預設使用ascii編碼,這裡禁用ascii
        f.close()

def main(page):
    url = "http://maoyan.com/board/4?offset=" + str(page) #page 為頁碼數
    html = get_one_page(url)
    for item in parse_one_page(html):
        print(item)
        write_to_file(item)

if __name__ == '__main__':
    '''
    for i in range(10):
        main(i*10)
    '''
    pool = Pool() #建立程序池
    pool.map(main, (i*10 for i in range(10)))#對映到主函式中進行迴圈

程序間通訊

Queue為例,在父程序中建立兩個子程序,一個往Queue裡寫資料,一個從Queue裡讀資料:

from multiprocessing import Process, Queue
import os, time, random

# 寫資料程序執行的程式碼:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 讀資料程序執行的程式碼:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父程序建立Queue,並傳給各個子程序:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動子程序pw,寫入:
    pw.start()
    # 啟動子程序pr,讀取:
    pr.start()
    # 等待pw結束:
    pw.join()
    # pr程序裡是死迴圈,無法等待其結束,只能強行終止:
    pr.terminate()

執行結果如下:

Process to write: 50563

Put A to queue...

Process to read: 50564

Get A from queue.

Put B to queue...

Get B from queue.

Put C to queue...

Get C from queue.

#q.get()呼叫佇列物件的get()方法從隊頭刪除並返回一個專案。可選引數為block,預設為True。如果佇列為空且block為True,get()就使呼叫執行緒暫停,直至有專案可用。如果佇列為空且block為False,佇列將引發Empty異常。

多執行緒

優缺點:

優點:通常比多程序快一點,但是也快不到哪去

缺點:就是任何一個執行緒掛掉都可能直接造成整個程序崩潰,因為所有執行緒共享程序的記憶體。

threading

thread_list = [threading.Thread(target=task_cpu, args=(i,)) for i in range(5)]
    for t in thread_list:
        t.start()
    for t in thread_list:
        if t.is_alive():
            t.join()

多執行緒即在一個程序中啟動多個執行緒執行任務。一般來說使用多執行緒可以達到並行的目的,但由於Python中使用了全域性解釋鎖GIL的概念,導致Python中的多執行緒並不是並行執行,而是“交替執行”。類似於下圖:

所以Python中的多執行緒適合IO密集型任務,而不適合計算密集型任務。

Python提供兩組多執行緒介面,一是thread模組_thread,提供低等級介面。二是threading模組,提供更容易使用的基於物件的介面,可以繼承Thread物件來實現執行緒,此外其還提供了其它執行緒相關的物件,例如Timer,Lock等。

import time, threading
# 新執行緒執行的程式碼:
def loop():
    print('thread %s is running...' %threading.current_thread().name)
    n = 0
    while n < 5:
        n = n + 1
        print('thread %s >>> %s' % (threading.current_thread().name, n))
        time.sleep(1)            #延時1秒
    print('thread %s ended.' % threading.current_thread().name)

print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

lock(鎖)

多執行緒和多程序最大的不同在於,多程序中,同一個變數,各自有一份拷貝存在於每個程序中,互不影響,而多執行緒中,所有變數都由所有執行緒共享,所以,任何一個變數都可以被任何一個執行緒修改,因此,執行緒之間共享資料最大的危險在於多個執行緒同時改一個變數,把內容給改亂了。

由於鎖只有一個,無論多少執行緒,同一時刻最多隻有一個執行緒持有該鎖,所以,不會造成修改的衝突。建立一個鎖就是通過threading.Lock()來實現:

import time, threading
lock = threading.Lock()
# 假定這是你的銀行存款:
balance = 0
def change_it(n):
    # 先存後取,結果應該為0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(100000):
        # 先要獲取鎖:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要釋放鎖:
            lock.release()
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

ThreadLocal(全域性變數)

ThreadLocal應運而生,不用查詢dictThreadLocal幫你自動做這件事:

import threading

# 建立全域性ThreadLocal物件:
local_school = threading.local()

def process_student():
    # 獲取當前執行緒關聯的student:
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(name):
    # 繫結ThreadLocal的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

執行結果:

Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)

全域性變數local_school就是一個ThreadLocal物件,每個Thread對它都可以讀寫student屬性,但互不影響。你可以把local_school看成全域性變數,但每個屬性如local_school.student都是執行緒的區域性變數,可以任意讀寫而互不干擾,也不用管理鎖的問題,ThreadLocal內部會處理。

可以理解為全域性變數local_school是一個dict,不但可以用local_school.student,還可以繫結其他變數,如local_school.teacher等等。

ThreadLocal最常用的地方就是為每個執行緒繫結一個數據庫連線,HTTP請求,使用者身份資訊等,這樣一個執行緒的所有呼叫到的處理函式都可以非常方便地訪問這些資源。

IO密集型任務 VS 計算密集型任務

  • 所謂IO密集型任務,是指磁碟IO、網路IO佔主要的任務,計算量很小。比如請求網頁、讀寫檔案等。當然我們在Python中可以利用sleep達到IO密集型任務的目的。
  • 所謂計算密集型任務,是指CPU計算佔主要的任務,CPU一直處於滿負荷狀態。比如在一個很大的列表中查詢元素(當然這不合理),複雜的加減乘除等。

多執行緒適合IO密集型任務

多程序適合計算密集型任務