1. 程式人生 > >Python程序、執行緒、協程詳解、執行效能、效率(tqdm)

Python程序、執行緒、協程詳解、執行效能、效率(tqdm)

多程序實踐——multiprocessing

筆者最近在實踐多程序發現multiprocessing,真心很好用,不僅加速了運算,同時可以GPU呼叫,而且互相之間無關聯,這樣可以很放心的進行計算。

譬如(參考:多程序):

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print 'Run task %s (%s)...' % (name, os.getpid())
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print 'Task %s runs %0.2f seconds.' % (name, (end - start))

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print 'Waiting for all subprocesses done...'
    p.close()
    p.join()
    print 'All subprocesses done.'

先載入multiprocessing 模組Pool, 
然後定義一個函式long_time_task; 
建立一個程序池: p = Pool(), 
for i in range(5):即為定義開一個程序,此處發現ubuntu裡面用spyder中的ipython,開多程序CPU時候,只能開到4個(可能預設開到4個記憶體佔滿了); 
args是long_time_task函式的引數項, 
一定要p.close()之後才能執行後續內容, 
然後用p.join()呼叫join()之前必須先呼叫close(),呼叫close()之後就不能繼續新增新的Process了。

請注意輸出的結果,task 0,1,2,3是立刻執行的,而task 4要等待前面某個task完成後才執行,這是因為Pool的預設大小在我的電腦上是4,因此,最多同時執行4個程序。這是Pool有意設計的限制,並不是作業系統的限制。如果改成:

p = Pool(5)

就可以同時跑5個程序。 
由於Pool的預設大小是CPU的核數,如果你不幸擁有8核CPU,你要提交至少9個子程序才能看到上面的等待效果。

.

延伸一:Caffe Python介面多程序提取特徵

那麼在做影象處理的時候,進行預測任務的時候,可以開多程序,GPU方案。那麼步驟是:

  • 1、分割資料;
  • 2、多個程序池。

第一步:分割資料,用split_list函式:

def split_list(alist, wanted_parts=1):
    length = len(alist)
    return [ alist[i*length // wanted_parts: (i+1)*length // wanted_parts] 
             for i in range(wanted_parts) ]

第二步:開多個程序池 
可參考部落格:機器視覺:Caffe Python介面多程序提取特徵

.


多執行緒案例——threading

1、普通的threading

參考:python 併發執行之多執行緒

import threading
import time
def haha(max_num):
    """
    隨便定義一個函式,要求使用者輸入一個要列印數字的最大範圍
    輸入之後就會從0開始列印,直到使用者輸入的最大範圍
    """
    for i in range(max_num):
        """
        每次列印一個數字要間隔1秒,那麼列印10個數就要耗時10秒
        """
        time.sleep(1)
        print i
for x in range(3):
    """
    這裡的rang(3)是要依次啟動三個執行緒,每個執行緒都呼叫函式haha()
    第一個執行緒啟動執行之後,馬上啟動第二個執行緒再次執行。最後也相當
    函式執行了3次
    """
    #通過threading.Thread方法例項化多執行緒類
    #target後面跟的是函式的名稱但是不要帶括號也不填寫引數
    #args後面的內容才是要傳遞給函式haha()的引數。切記引數一定要以陣列的形式填寫不然會報錯。
    t=threading.Thread(target=haha,args=(10,))
    #將執行緒設定為守護執行緒
    t.setDaemon(True)
    #執行緒準備就緒,隨時等候cpu排程
    t.start()

其中setDaemon 這個引數是True,就表示程式流程跑完之後直接就關閉執行緒然後退出了,根本不管執行緒是否執行完。 
.

2. join()

結果看起來規則一些可以考慮使用join()方法,參考:python 併發執行之多執行緒 
join(timeout)方法將會等待直到執行緒結束。這將阻塞正在呼叫的執行緒,直到被呼叫join()方法的執行緒結束。

import threading
import time
def haha(max_num):
    for i in range(max_num):
        time.sleep(1)
        print i
for x in range(3):
    t=threading.Thread(target=haha,args=(5,))
    t.start()
    #通過join方法讓執行緒逐條執行
    t.join()
0
1
2
3
4
0
1
2
3
4
0
1
2
3
4

3. 多執行緒迴圈

背景:Python指令碼:讀取檔案中每行,放入列表中;迴圈讀取列表中的每個元素,並做處理操作。 
核心:多執行緒處理單個for迴圈函式呼叫

#!/usr/bin/env python
#-*- coding: utf8 -*-

import sys
import time
import string
import threading
import datetime
fileinfo = sys.argv[1]

# 讀取檔案內容放入列表
host_list = []
port_list = []

# 定義函式:讀取檔案內容放入列表中
def CreateList():
    f = file(fileinfo,'r')
    for line in f.readlines():
        host_list.append(line.split(' ')[0])
        port_list.append(line.split(' ')[1])
    return host_list
    return port_list
    f.close()

# 單執行緒 迴圈函式,註釋掉了
#def CreateInfo():   
#    for i in range(0,len(host_list)):     # 單執行緒:直接迴圈列表
#        time.sleep(1)
#        TimeMark = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
#        print "The Server's HostName is %-15s and Port is %-4d !!! [%s]" % (host_list[i],int(port_list[i]),TimeMark)
#   

#  定義多執行緒迴圈呼叫函式
def MainRange(start,stop):     #提供列表index起始位置引數
    for i in range(start,stop):
        time.sleep(1)
        TimeMark = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print "The Server's HostName is %-15s and Port is %-4d !!! [%s]" % (host_list[i],int(port_list[i]),TimeMark)

# 執行函式,生成列表
CreateList()
# 列表分割成:兩部分 mid為列表的index中間位置
mid = int(len(host_list)/2)

# 多執行緒部分
threads = []
t1 = threading.Thread(target=MainRange,args=(0,mid))
threads.append(t1)
t2 = threading.Thread(target=MainRange,args=(mid,len(host_list)))
threads.append(t2)

for t in threads:
    t.setDaemon(True)
    t.start()
t.join()
print "ok" 

也有一個分拆的步驟,args=(0,mid),args=(mid,len(host_list)

.

4.執行緒鎖與執行緒同步

當你有多個執行緒,就需要考慮怎樣避免執行緒衝突。解決辦法就是使用執行緒鎖。鎖由 Python 的 threading 模組提供,並且它最多被一個執行緒所持有。當一個執行緒試圖獲取一個已經鎖在資源上的鎖時,該執行緒通常會暫停執行,直到這個鎖被釋放。 
讓我們給這個函式新增鎖。有兩種方法可以實現。第一種方式是使用 try/finally ,從而確保鎖肯定會被釋放。下面是示例:



import threading

total = 0
lock = threading.Lock()

def update_total(amount):
    """
    Updates the total by the given amount
    """
    global total
    lock.acquire()
    try:
        total += amount
    finally:
        lock.release()
    print (total)

if __name__ == '__main__':
    for i in range(10):
        my_thread = threading.Thread(
            target=update_total, args=(5,))
        my_thread.start()

由 with 語句作為替代。

import threading

total = 0
lock = threading.Lock()


def do_something():
    lock.acquire()

    try:
        print('Lock acquired in the do_something function')
    finally:
        lock.release()
        print('Lock released in the do_something function')

    return "Done doing something"

def do_something_else():
    lock.acquire()

    try:
        print('Lock acquired in the do_something_else function')
    finally:
        lock.release()
        print('Lock released in the do_something_else function')

    return "Finished something else"

if __name__ == '__main__':
    result_one = do_something()
    result_two = do_something_else()

可重入鎖 
為了支援在同一執行緒中多次請求同一資源,python提供了可重入鎖(RLock)。RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。

即把 lock = threading.lock() 替換為 lock = threading.RLock(),然後重新執行程式碼,現在程式碼就可以正常運行了。

參考文獻: 
Python 多執行緒 
一文學會 Python 多執行緒程式設計

.


python 效能除錯工具(line_profiler)

參考:python 效能除錯工具(line_profiler)使用 
網上大部分都是說在所需要測的函式前面加一個@profile,如文件所說。但是加了@profile後函式無法直接執行,只能優化的時候加上,除錯的時候又得去掉。文章中提到了這個問題的解決辦法,個人覺得還是有點麻煩,不太能理解這是為什麼。我在stackoverflow上看到了另一種關於line_profile的使用方法,簡單而且實用。


from line_profiler import LineProfiler
import random

def do_stuff(numbers):
    s = sum(numbers)
    l = [numbers[i]/43 for i in range(len(numbers))]
    m = ['hello'+str(numbers[i]) for i in range(len(numbers))]

numbers = [random.randint(1,100) for i in range(1000)]
lp = LineProfiler()
lp_wrapper = lp(do_stuff)
lp_wrapper(numbers)
lp.print_stats()

輸出結果:

Timer unit: 1e-06 s

Total time: 0.000649 s
File: <ipython-input-2-2e060b054fea>
Function: do_stuff at line 4

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     4                                           def do_stuff(numbers):
     5         1           10     10.0      1.5      s = sum(numbers)
     6         1          186    186.0     28.7      l = [numbers[i]/43 for i in range(len(numbers))]
     7         1          453    453.0     69.8      m = ['hello'+str(numbers[i]) for i in range(len(numbers))]

.


python通過tqdm 執行時間

來源:python tqdm模組分析

安裝

pip install tqdm
  •  

在迭代器for中使用:

from tqdm import tqdm
for i in tqdm(range(9)):
    ...

同時也可以支援這樣的迭代方式:

[i for i in tqdm(range(9))]
  •  

trange的方式:

>>> for i in trange(100):
...     sleep(0.1)
100%|################################################################| 100/100 [00:10<00:00,  9.97it/s]

當迭代的內容為list:

>>> pbar = tqdm(["a", "b", "c", "d"])
>>> for char in pbar:
...         pbar.set_description("Processing %s" % char)
Processing d: 100%|######################################################| 4/4 [00:06<00:00,  1.53s/it]

手動的控制更新

把執行的粒度放寬


>>> with tqdm(total=100) as pbar:
...     for i in range(10):
...         sleep(0.1)
...         pbar.update(10)
100%|################################################################| 100/100 [00:01<00:00, 99.60it/s]

函式執行時間函式兩個小技巧:

# 第一種方式
%time [i+1 for i in range(100)]

# 第二種方式
%%time
[i+1 for i in range(100)]

返回的結果都是:

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 26.9 µs

延伸二:Python 多程序實踐

參考:Python 多程序實踐 
多程序的方式可以增加指令碼的併發處理能力, python 支援這種多程序的程式設計方式 
在類unix系統中, python的os 模組內建了fork 函式用以建立子程序

1、fork 方式建立子程序 
這裡寫圖片描述

從結果可以看到, 從pid = os.fork() 開始, 下面的部分程式碼運行了兩次, 第一次是父程序執行, 第二次是子程序執行, 且子程序的fork的結果總是0, 所以這個也可以用來作為區分父程序或是子程序標誌

那麼變數在多個程序之間是否相互影響呢 
import os

這裡寫圖片描述

很明顯, 初始值為10的source 在父程序中值 減少了 1, 為9, 而子程序明顯source的初始值 是10, 也就是說多程序之間並沒有什麼相互影響