1. 程式人生 > >Python多執行緒、多程序和協程的例項講解

Python多執行緒、多程序和協程的例項講解

執行緒、程序和協程是什麼

執行緒、程序和協程的詳細概念解釋和原理剖析不是本文的重點,本文重點講述在Python中怎樣實際使用這三種東西

參考: 程序、執行緒、協程之概念理解

程序(Process)是計算機中的程式關於某資料集合上的一次執行活動,是系統進行資源分配和排程的基本單位,是作業系統結構的基礎。
執行緒,有時被稱為輕量級程序(Lightweight Process,LWP),是程式執行流的最小單元。
協程:一個程式可以包含多個協程,可以對比於一個程序包含多個執行緒,因而下面我們來比較協程和執行緒:我們知道多個執行緒相對獨立,有自己的上下文,切換受系統控制;而協程也相對獨立,有自己的上下文,但是其切換由自己控制,由當前協程切換到其他協程由當前協程來控制。

準備工作

磨刀不誤砍柴工,在用例項講解執行緒、程序和協程怎麼使用之前,先準備一些工具:

實際上多執行緒、多程序和協程,都屬於併發程式設計,併發程式設計的最重要的目標就是提高程式執行的效率,那麼我們需要一個計算一個函式耗時長度的工具,用於對比不同方式程式的執行時間,這裡我們寫一個函式計時裝飾器fn_timer來完成這件事:

def fn_timer(function):
    '''
    函式計時裝飾器
    :param function: 函式物件
    :return: 裝飾器
    '''
    @wraps(function)
    def function_timer(*args,**kwargs):
        # 起始時間
        t0 = time.time()
        # 呼叫函式
        result = function(*args,**kwargs)
        # 結束時間
        t1 = time.time()
        # 列印函式耗時
        print '[finished function:{func_name} in {time:.2f}s]'.format(func_name = function.__name__,time = t1 - t0)
        return result
    return function_timer

該裝飾器的用法示例:

# 測試
@fn_timer
def add(x,y):
    time.sleep(1.22)
    return x + y

if __name__ == '__main__':
    # 測試
    sum = add(1,2)
    print sum

執行程式碼輸出:

[finished function:test in 1.23s]
3

實際使用中,大規模爬蟲程式很適合用併發來實現,所以我們再準備一些網址放在urls列表中,用於測試爬蟲程式的效率(都是百度百科的一些詞條頁面):

# 20個網頁
urls = ['https://baike.baidu.com/item/%E8%87%AA%E7%94%B1%E8%BD%AF%E4%BB%B6',
        'https://baike.baidu.com/item/%E8%AE%A1%E7%AE%97%E6%9C%BA%E7%A8%8B%E5%BA%8F%E8%AE%BE%E8%AE%A1%E8%AF%AD%E8%A8%80',
        'https://baike.baidu.com/item/%E5%9F%BA%E9%87%91%E4%BC%9A',
        'https://baike.baidu.com/item/%E5%88%9B%E6%96%B02.0',
        'https://baike.baidu.com/item/%E5%95%86%E4%B8%9A%E8%BD%AF%E4%BB%B6',
        'https://baike.baidu.com/item/%E5%BC%80%E6%94%BE%E6%BA%90%E4%BB%A3%E7%A0%81',
        'https://baike.baidu.com/item/OpenBSD',
        'https://baike.baidu.com/item/%E8%A7%A3%E9%87%8A%E5%99%A8',
        'https://baike.baidu.com/item/%E7%A8%8B%E5%BA%8F/71525',
        'https://baike.baidu.com/item/%E7%BC%96%E7%A8%8B%E8%AF%AD%E8%A8%80',
        'https://baike.baidu.com/item/C%2B%2B',
        'https://baike.baidu.com/item/%E8%B7%A8%E5%B9%B3%E5%8F%B0',
        'https://baike.baidu.com/item/Web/150564',
        'https://baike.baidu.com/item/%E7%88%B1%E5%A5%BD%E8%80%85',
        'https://baike.baidu.com/item/%E6%95%99%E5%AD%A6',
        'https://baike.baidu.com/item/Unix%20shell',
        'https://baike.baidu.com/item/TIOBE',
        'https://baike.baidu.com/item/%E8%AF%BE%E7%A8%8B',
        'https://baike.baidu.com/item/MATLAB',
        'https://baike.baidu.com/item/Perl']

整合:把函式計時裝飾器和urls列表封裝在一個類:utils.py中:

# coding:utf-8
from functools import wraps
import time

# 20個網頁
urls = ['https://baike.baidu.com/item/%E8%87%AA%E7%94%B1%E8%BD%AF%E4%BB%B6',
        'https://baike.baidu.com/item/%E8%AE%A1%E7%AE%97%E6%9C%BA%E7%A8%8B%E5%BA%8F%E8%AE%BE%E8%AE%A1%E8%AF%AD%E8%A8%80',
        'https://baike.baidu.com/item/%E5%9F%BA%E9%87%91%E4%BC%9A',
        'https://baike.baidu.com/item/%E5%88%9B%E6%96%B02.0',
        'https://baike.baidu.com/item/%E5%95%86%E4%B8%9A%E8%BD%AF%E4%BB%B6',
        'https://baike.baidu.com/item/%E5%BC%80%E6%94%BE%E6%BA%90%E4%BB%A3%E7%A0%81',
        'https://baike.baidu.com/item/OpenBSD',
        'https://baike.baidu.com/item/%E8%A7%A3%E9%87%8A%E5%99%A8',
        'https://baike.baidu.com/item/%E7%A8%8B%E5%BA%8F/71525',
        'https://baike.baidu.com/item/%E7%BC%96%E7%A8%8B%E8%AF%AD%E8%A8%80',
        'https://baike.baidu.com/item/C%2B%2B',
        'https://baike.baidu.com/item/%E8%B7%A8%E5%B9%B3%E5%8F%B0',
        'https://baike.baidu.com/item/Web/150564',
        'https://baike.baidu.com/item/%E7%88%B1%E5%A5%BD%E8%80%85',
        'https://baike.baidu.com/item/%E6%95%99%E5%AD%A6',
        'https://baike.baidu.com/item/Unix%20shell',
        'https://baike.baidu.com/item/TIOBE',
        'https://baike.baidu.com/item/%E8%AF%BE%E7%A8%8B',
        'https://baike.baidu.com/item/MATLAB',
        'https://baike.baidu.com/item/Perl']


def fn_timer(function):
    '''
    函式計時裝飾器
    :param function: 函式物件
    :return: 裝飾器
    '''
    @wraps(function)
    def function_timer(*args,**kwargs):
        # 起始時間
        t0 = time.time()
        # 呼叫函式
        result = function(*args,**kwargs)
        # 結束時間
        t1 = time.time()
        # 列印函式耗時
        print '[finished function:{func_name} in {time:.2f}s]'.format(func_name = function.__name__,time = t1 - t0)
        return result
    return function_timer

# 測試
@fn_timer
def add(x,y):
    time.sleep(1.22)
    return x + y

if __name__ == '__main__':
    # 測試
    sum = add(1,2)
    print sum
    # 輸出:
    '''
    [finished function:test in 1.23s]
    3
    '''

例項講解多執行緒的用法

從聽音樂、看電影講起

我現在要做兩件事情:聽音樂、看電影,聽一首音樂假如耗時1秒,看一部電影假如耗時5秒,用兩個函式定義這兩個任務如下:

# 耗時任務:聽音樂
def music(name):
    print 'I am listening to music {0}'.format(name)
    time.sleep(1)

# 耗時任務:看電影
def movie(name):
    print 'I am watching movie {0}'.format(name)
    time.sleep(5)

假如我現在要聽10首音樂、看2部電影,那麼我就有如下幾種方案:

方案一:先一個個聽完10首音樂,再一個個看完2部電影,順序完成,程式碼如下:

# 單執行緒操作:順序執行聽10首音樂,看2部電影
@fn_timer
def single_thread():
    for i in range(10):
        music(i)
    for i in range(2):
        movie(i)

讓我們執行一下這段程式碼,輸出如下:

    I am listening to music 0
    I am listening to music 1
    I am listening to music 2
    I am listening to music 3
    I am listening to music 4
    I am listening to music 5
    I am listening to music 6
    I am listening to music 7
    I am listening to music 8
    I am listening to music 9
    I am watching movie 0
    I am watching movie 1
    [finished function:single_thread in 20.14s]

可以看到,老老實實嚴格按照先後順序來一件件做這些事情,所需的總時間和每件事情耗時加起來是一樣多的。

方案二:剛剛的方案不太好,太費時間了,那麼能不能同時進行一些事情呢?答案是可以的,可以同時聽多首音樂,同時看多部電影進行,程式碼如下:

# 多執行緒執行:聽10首音樂,看2部電影
@fn_timer
def multi_thread():
    # 執行緒列表
    threads = []
    for i in range(10):
        # 建立一個執行緒,target引數為任務處理函式,args為任務處理函式所需的引數元組
        threads.append(threading.Thread(target = music,args = (i,)))
    for i in range(2):
        threads.append(threading.Thread(target = movie,args = (i,)))

    for t in threads:
        # 設為守護執行緒
        t.setDaemon(True)
        # 開始執行緒
        t.start()
    for t in threads:
        t.join()

執行上述程式碼,執行結果:

    I am listening to music 0
    I am listening to music 1
    I am listening to music 2
    I am listening to music 3
    I am listening to music 4
    I am listening to music 5
    I am listening to music 6
    I am listening to music 7
    I am listening to music 8
    I am listening to music 9
    I am watching movie 0
    I am watching movie 1
    [finished function:multi_thread in 5.02s]

這次只用了5秒就完成了,完成效率顯著提升。這次試用多執行緒執行多個任務,所有任務最終的總耗時 = 耗時最長的那個單個任務的耗時,即看一部電影的5秒鐘時間。

方案三:使用執行緒池。上面使用多執行緒的方式比較繁瑣,下面使用執行緒池來實現:

# 使用執行緒池執行:聽10首音樂,看2部電影
@fn_timer
def use_pool():
    # 設定執行緒池大小為20,如果不設定,預設值是CPU核心數
    pool = Pool(20)
    pool.map(movie,range(2))
    pool.map(music,range(10))
    pool.close()
    pool.join()

執行結果:

    I am listening to music 0
    I am listening to music 1
    I am listening to music 2
    I am listening to music 3
    I am listening to music 4
    I am listening to music 5
    I am listening to music 6
    I am listening to music 7
    I am listening to music 8
    I am listening to music 9
    I am watching movie 0
    I am watching movie 1
    [finished function:use_pool in 6.12s]

可以看出使用執行緒池反而比手工排程執行緒多耗時一秒鐘,可能是因為執行緒池內部對執行緒的排程和執行緒切換的耗時造成的。

例項:使用多執行緒下載網頁

話不多說,直接上程式碼,用多執行緒併發下載20個百度百科網頁的例項程式碼及執行結果如下:

# coding:utf-8
# 測試多執行緒
import threading
import time
from utils import fn_timer
from multiprocessing.dummy import Pool
import requests
from utils import urls

# 應用:使用單執行緒下載多個網頁的內容
@fn_timer
def download_using_single_thread(urls):
    resps = []
    for url in urls:
        resp = requests.get(url)
        resps.append(resp)
    return resps

# 應用:使用多執行緒下載多個網頁的內容
@fn_timer
def download_using_multi_thread(urls):
    threads = []
    for url in urls:
        threads.append(threading.Thread(target = requests.get,args = (url,)))
    for t in threads:
        t.setDaemon(True)
        t.start()
    for t in threads:
        t.join()

# 應用:使用執行緒池下載多個網頁的內容
@fn_timer
def download_using_pool(urls):
    pool = Pool(20)
    # 第一個引數為函式名,第二個引數一個可迭代物件,為函式所需的引數列表
    resps = pool.map(requests.get,urls)
    pool.close()
    pool.join()
    return resps

def main():
    # 1.使用單執行緒
    resps = download_using_single_thread(urls)
    print len(resps)
    # 輸出:
    '''
    [finished function:download_using_single_thread in 6.18s]
    20
    '''
    # 2. 使用多執行緒
    download_using_multi_thread(urls)
    # 輸出:
    '''
    [finished function:download_using_multi_thread in 0.73s]
    '''

    # 3.使用執行緒池
    resps = download_using_pool(urls)
    print len(resps)
    # 輸出:
    '''
    [finished function:download_using_pool in 0.84s]
    20
    '''

if __name__ == '__main__':
    main()

例項講解多程序的用法

多程序和程序池的使用

例項程式碼如下:

# coding:utf-8
# 測試多程序
import os
import time
from multiprocessing import Process,Pool,Queue
from utils import fn_timer
import random

# 簡單的任務
@fn_timer
def do_simple_task(task_name):
    print 'Run child process {0}, task name is: {1}'.format(os.getpid(),task_name)
    time.sleep(1.2)
    return task_name

@fn_timer
# 1. 測試簡單的多程序
def test_simple_multi_process():
    p1 = Process(target=do_simple_task, args=('task1',))
    p2 = Process(target=do_simple_task, args=('task2',))
    print 'Process will start...'
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print 'Process end.'

@fn_timer
# 2. 測試使用程序池
def test_use_process_pool():
    # 建立一個程序池,數字表示一次性同時執行的最大子程序數
    pool = Pool(5)
    # 任務返回值列表
    results = []
    # 任務名稱列表
    task_names = []
    for i in range(7):
        task_names.append('task{0}'.format(i))
    # 併發執行多個任務,並獲取任務返回值
    results = pool.map_async(do_simple_task,task_names)
    print 'Many processes will start...'
    pool.close()
    pool.join()

    print 'All processes end, results is: {0}'.format(results.get())

def main():
    test_simple_multi_process()
    # 輸出:
    '''
    Process will start...
    Run child process 1524, task name is: task2
    Run child process 1728, task name is: task1
    [finished function:do_simple_task in 1.20s]
    [finished function:do_simple_task in 1.20s]
    Process end.
    [finished function:test_simple_multi_process in 1.34s]
    '''

    test_use_process_pool()
    # 輸出:
    '''
    Many processes will start...
    Run child process 7568, task name is: task0
    Run child process 7644, task name is: task1
    Run child process 7628, task name is: task2
    Run child process 7620, task name is: task3
    Run child process 7660, task name is: task4
    [finished function:do_simple_task in 1.20s]
    Run child process 7568, task name is: task5
    [finished function:do_simple_task in 1.20s]
    Run child process 7644, task name is: task6
    [finished function:do_simple_task in 1.20s]
    [finished function:do_simple_task in 1.20s]
    [finished function:do_simple_task in 1.20s]
    [finished function:do_simple_task in 1.20s]
    [finished function:do_simple_task in 1.20s]
    All processes end, results is: ['task0', 'task1', 'task2', 'task3', 'task4', 'task5', 'task6']
    [finished function:test_use_process_pool in 2.62s]
    '''
if __name__ == '__main__':
    main()

程序之間的通訊

程序間的通訊採用佇列來實現,例項程式碼如下:

# coding:utf-8
# 測試程序間的通訊
import os
import time
from multiprocessing import Process,Pool,Queue
from utils import fn_timer
import random

# 寫程序執行的任務
def write(q):
    for value in ['A','B','C']:
        print 'Put value: {0} to queue.'.format(value)
        q.put(value)
        time.sleep(random.random())

# 讀程序執行的任務
def read(q):
    while True:
        value = q.get(True)
        print 'Get value: {0} from queue.'.format(value)

# 測試程序間的通訊
def test_communication_between_process():
    q = Queue()
    # 寫程序
    pw = Process(target = write,args = (q,))
    # 讀程序
    pr = Process(target = read,args = (q,))
    pw.start()
    pr.start()
    pw.join()
    # 因為讀任務是死迴圈,所以要強行結束
    pr.terminate()

def main():
    test_communication_between_process()
    # 輸出
    '''
    Put value: A to queue.
    Get value: A from queue.
    Put value: B to queue.
    Get value: B from queue.
    Put value: C to queue.
    Get value: C from queue.
    '''

if __name__ == '__main__':
    main()

例項講解協程的用法

下面用協程下載同樣的20個網頁,例項程式碼如下:

            
           

相關推薦

Python執行程序例項講解

執行緒、程序和協程是什麼 執行緒、程序和協程的詳細概念解釋和原理剖析不是本文的重點,本文重點講述在Python中怎樣實際使用這三種東西 參考: 程序、執行緒、協程之概念理解 程序(Process)是計算機中的程式關於某資料集合上的一次執行活動,是系統進行資源分配和排程的基本單位,是作業系統結構的基礎。執

python執行程序的使用

本文主要介紹多執行緒、多程序、協程的最常見使用,每個的詳細說明與介紹有時間會在以後的隨筆中體現。 一、多執行緒 1.python通過兩個標準庫thread和threading提供對執行緒的支援。thread提供了低級別的、原始的執行緒以及一個簡單的鎖。threading通過對thread模組

Python3:談談python的GIL執行程序

本文只是適合初認識多執行緒的小夥伴,裡面的概念和原理一定要搞清楚, 不然以後設計多執行緒,多程序會出很大的錯. GIL的全稱是Global Interpreter Lock(全域性直譯器鎖),來源是python設計之初的考慮,為了資料安全所做的決定。 GIL 的特點: P

Python 執行程序 (二)之 執行同步通訊

Python 多執行緒、多程序 (一)之 原始碼執行流程、GIL Python 多執行緒、多程序 (二)之 多執行緒、同步、通訊 Python 多執行緒、多程序 (三)之 執行緒程序對比、多執行緒 一、python多執行緒 對於I/O操作的時候,程序與執行緒的效能差別不大,甚至由於執行緒更輕量級,效能更高

Python 執行程序 (一)之 原始碼執行流程GIL

Python 多執行緒、多程序 (一)之 原始碼執行流程、GIL Python 多執行緒、多程序 (二)之 多執行緒、同步、通訊 Python 多執行緒、多程序 (三)之 執行緒程序對比、多執行緒 一、python程式的執行原理 許多時候,在執行一個python檔案的時候,會發現在同一目錄下會出現一個__

Python 執行程序 (三)之 執行程序對比程序

Python 多執行緒、多程序 (一)之 原始碼執行流程、GIL Python 多執行緒、多程序 (二)之 多執行緒、同步、通訊 Python 多執行緒、多程序 (三)之 執行緒程序對比、多執行緒 一、多執行緒與多程序的對比 在之前簡單的提過,CPython中的GIL使得同一時刻只能有一個執行緒執行,即併

Python程序執行的理解

首先我們來了解下python中的程序,執行緒以及協程! 從計算機硬體角度: 計算機的核心是CPU,承擔了所有的計算任務。 一個CPU,在一個時間切片裡只能執行一個程式。 從作業系統的角度: 程序和執行緒,都是一種CPU的執行單元。 程序:表示一個程式的上下文執行活

零基礎學python:併發伺服器面向連線程序執行程序

面向連線的併發伺服器 只能同時為一個人服務 為了幫助小夥伴們更好的學習Python,小編整理了Python的相關學習視訊及學習路線圖; ,新增小編學習群943752371即可獲取 多程序併發伺服器 多程序伺服器代表:Apache伺服器 主程序中必需

Python 執行程序 (二)之 執行同步通訊

一、python多執行緒 對於I/O操作的時候,程序與執行緒的效能差別不大,甚至由於執行緒更輕量級,效能更高。這裡的I/O包括網路I/O和檔案I/O 1、例項 假如利用socket傳送http請求,也就是網路I/O。爬取列表網頁中的寫href連結,然後獲取href連結之後,在爬去連結的網頁詳情。 如果不適用

10-執行程序執行池程式設計

一、多執行緒、多程序和執行緒池程式設計 1.1、Python中的GIL鎖   CPython中,global interpreter lock(簡稱GIL)是一個互斥體,用於保護對Python物件的訪問,從而防止多個執行緒一次執行Python位元組碼(也就是說,GIL鎖每次只能允許一個執行緒工作,無法多個執行

python爬蟲之執行程序+程式碼示例

#python爬蟲之多執行緒、多程序 >使用多程序、多執行緒編寫爬蟲的程式碼能有效的提高爬蟲爬取目標網站的效率。 ## 一、什麼是程序和執行緒 引用[廖雪峰的官方網站](https://www.liaoxuefeng.com/wiki/1016959663602400/1017627212385376)

[進階]-執行程序非同步IO實用例子

在編寫爬蟲時,效能的消耗主要在IO請求中,當單程序單執行緒模式下請求URL時必然會引起等待,從而使得請求整體變慢。以下程式碼預設執行環境為python3。 目錄 一、多執行緒、多程序 1.同步執行 2.多執行緒執行 3.多執行緒+回撥函式執行 4.多程序執行 5.多程

執行程序之比較,以及三種執行模型。

工作幾年找工作幾乎總會被問,從最開始的從網上看答案,到現在憑自己的經驗去說,這個問題似乎也是經驗積累的一個驗證,最近沒事就總結一下吧: 程序和執行緒的定義、比較等: 程序:處於活動狀態的計算機程式。程序就是在作業系統中       執行特定的任務,程序針對

java執行:1程序執行

多執行緒樣例 我們用工人卸貨舉例:有一集裝箱的貨物等待卸車,共100個箱子,一個工人一次只能搬一個箱子。 如果只有一個工人,那麼該工人需要搬運100次,而且是不停歇的搬運。 如果有5個或者10個工人,那麼平均每個工人只需要搬運20或者10次就可以了。 甚至有1

基於TCP協議實現Linux下客戶端與伺服器之間的通訊,實現執行程序伺服器

TCP是TCP/IP協議族中一個比較重要的協議,這是一種可靠、建立連結、面向位元組流的傳輸,工作在傳輸層。和TCP相對的不可靠、無連結、面向資料報的協議UDP,瞭解UDP客戶端與伺服器之間通訊請戳UDP協議實現的伺服器與客戶端通訊 TCP協議建立連線 首

執行程序通訊 (java實現)

程序間通訊方式1.管道(匿名管道 Pipe)//   PipedInputStream  、PipedOutputStream2.命名管道(NamedPipe/FIFO)//java 不支援?3.訊號(Signal) // wait() notify() notifyall(

執行程序執行程序

多工 不管是單核CPU還是多核CPU,一旦任務數量超過核數,OS都會把每個任務輪流排程到每個核心上。OS實現多程序和多執行緒往往是通過時間片的形式執行的,即讓每個任務(程序/執行緒)輪流交替執行,因為時間片切分的很小,以至於我們感覺多個任務在同時執行。 如果

執行程序並行併發

1. 談談你對多程序,多執行緒,以及協程的理解,專案是否用? 這個問題被問的概率相當之大,其實多執行緒,多程序,在實際開發中用到的很少,除非是那些對專案效能要求特別高的,有的開發工作幾年了,也確實沒用過,你可以這麼回答,給他扯扯什麼是程序,執行緒(cpyth

Flask解析(二):Flask-Sqlalchemy與執行程序

原創作者:flowell,轉載請標明出處:https://www.cnblogs.com/flowell/p/multiprocessing_flask_sqlalchemy.html     Sqlalchemy   flask-sqlalchemy的session是執行緒安全的,但在

生產者與消費者模型(基於單鏈表環形佇列執行消費生產)

#include <pthread.h> #include <unistd.h> #include <stdlib.h> #include <signal.h> #include <semaphore.h> #define SIZE 10 sem_t