1. 程式人生 > >python的多執行緒和多程序(一)

python的多執行緒和多程序(一)

在進入主題之前,我們先學習一下併發和並行的概念:

--併發:在作業系統中,併發是指一個時間段中有幾個程式都處於啟動到執行完畢之間,且這幾個程式都是在同一個處理機上執行。但任一時刻點上只有一個程式在處理機上執行。形象的點描述:一個人做很多事情,但同一時刻只能做一件事情。

--並行:當系統有一個CPU時,則程式的操作有可能非併發。當一個CPU執行一個程式時,另一個CPU可以執行另一個程式,兩個程式互不搶佔CPU資源,可以同時進行。形象的描述:多人同時做多件事情

 可能有小夥伴又會問序列又是什麼鬼呢?再來嘮叨一下:

序列和並行是資料傳輸方式的區別。其實和併發和並行的概念是兩個領域:

1、資料傳輸方式不同,序列口傳輸方式為資料排成一行,一位一位送出,接受也是一樣。並行口傳輸8位資料一次送出。

2、針腳不同,序列口針腳少,並行口針腳多

3、用途不同,序列口現在只用作控制介面,並行口多用作印表機、掃描器等介面。

 來吧,進入正題:

程序的概念:計算機程式其實只是儲存在磁碟上可執行的二進位制(或其他可執行的型別)檔案。只有把它們載入到記憶體中並被作業系統呼叫,才算是被執行,並用擁有生命週期。所以說程序是一個執行中的程式。

每個程序都擁有自己的地址空間,記憶體,資料棧以及其他用於跟蹤執行的輔助資料。

此間,作業系統管理其上的所有程序的執行,併為這些程序合理分配時間。

多程序:那就是在一個多核的電腦上同時執行多個程唄,其實是實現並行。

但是呢,我們先從多執行緒學習起。多執行緒搞定了,多程序也就是小意思了。

在此之前先介紹一個概念,之後會用到。GIL鎖,有可能大家會聽到別人說python執行很慢啥的,其實就是這個GIL鎖的原因,那它到底是個啥呢:

GIL全域性解釋性鎖,python在設計的時候,還沒有多核處理器的概念。因此,為了設計的方便和執行緒的安全,python之父就設計了一個鎖。這個鎖要求,任何一個程序中,同時只能有一個執行緒在執行。因此並不能多個執行緒分配多個CPU資源。所以python中的執行緒只能實現併發,而不能實現真正的並行,這就是python所執行緒的侷限性。在python3中GIL鎖進行了該井,在遇到任何IO阻塞(不是耗時)的時候,會自動切換執行緒,這就使得在進行多IO操作的時候python的鎖多執行緒有很大的優勢。同時一個執行緒在執行時間或者執行步驟達到一定的閾值時,也會自動的切換執行緒。

對比於多程序,多執行緒其實是實現併發的操作,在在python中使用multiprocessing包實現多程序:

首先來看下多執行緒的實現,這裡使用的是threading包。用sleep模擬耗時操作:

 1 import time
 2 import threading
 3 
 4 def get_detail_html(url):
 5     print("starting get detail html")
 6     time.sleep(4)
 7     print("ending get detail html")
 8 
 9 def get_detail_url(url):
10     print("starting get detail url")
11     time.sleep(2)
12     print("ending get detail url")
13 
14 if __name__ == '__main__':
15     thread1 = threading.Thread(target=get_detail_html, args=("",))
16     thread2 = threading.Thread(target=get_detail_url, args=("",))
17     start_time = time.time()
18     thread1.setDaemon(True)
19     thread2.setDaemon(True)
20     thread1.start()
21     thread2.start()
22     print(123456)
23     thread1.join()
24     thread2.join()
25    print("using time: {}".format(time.time()-start_time)

做個註釋吧,可能有點亂:

1、在子執行緒sleep時候,主執行緒繼續往下執行,主執行緒執行完,程式此刻並沒有退出。子執行緒的程式還是會執行完。

2、為了在主執行緒執行完之後kill掉子執行緒。需要給子執行緒.setdeDaemon(True)。將子執行緒設定為守護執行緒。意思為:主執行緒結束後設置為守護執行緒的子執行緒也會義無反顧的殉情而亡當有多個子執行緒,其中部分設定守護執行緒。主執行緒會等到沒有設定為守護執行緒的子執行緒執行完之後推出而設定為守護執行緒的子執行緒,如果在未設定為守護執行緒的子執行緒結束前執行完畢,那萬事大吉,如果沒有執行完成還是跟著主執行緒殉情

3、如果想讓主執行緒等待子執行緒執行完成再往下面執行的話,用thread1.join()。將主執行緒阻塞住,也就是說.join()之後的主執行緒程式碼不會執行,直到子執行緒執行完畢再執行這個時候的using time 就是4秒鐘。如果只有部分子執行緒阻塞,那麼阻塞時間就是設定了.join()的子執行緒執行的時間。然後主執行緒結束,但是還是會等到所有子執行緒執行結束後主執行緒才會退出

上邊是使用函式的方法實現多執行緒,那用類呢?可定也能實現咯:

import time
import threading

# 用類實現多執行緒 class GetDetailHtml(threading.Thread): """ 繼承threading.Thread類 """ def __init__(self, name): # 給呼叫父類init方法執行緒命名 super().__init__(name=name) def run(self): """ 重寫run方法 :return: """ print("starting get detail html") time.sleep(4) print("ending get detail html") class GetDetailUrl(threading.Thread): def __init__(self, name): # 給呼叫父類init方法執行緒命名 super().__init__(name=name) def run(self): print("starting get detail url") time.sleep(2) print("ending get detail url") if __name__ == '__main__': thread1 = GetDetailHtml('get_detail_html') thread2 = GetDetailUrl('get_detail_url') start_time = time.time() thread1.start() thread2.start() thread1.join() thread2.join() print('used time: {}'.format(time.time() - start_time))

只需要重寫類的run方法即可,其他和函式方法使用一樣。

如果多個執行緒需要用到相同的資料咋辦呢?這就涉及到執行緒間的通訊問題了,接下來我們來詳細的瞭解一下。

方法一:利用共享變數的方式,嗯,也就是全域性變數global。

import time
import threading
# 1、共享全域性變數 使用global。
detail_url_list = []
def get_detail_html():
    global detail_url_list
    # 使用for迴圈,這樣子做得話併發不高
    # for url in detail_url_list:
    #     print("starting get detail html")
    #     time.sleep(4)
    #     print("ending get detail html")
    # 使用pop操作,並開啟多個get_detail_html的子執行緒進行處理。但是執行緒是不安全的
    url = detail_url_list.pop()
    print("starting get detail html")
    time.sleep(4)
    print("ending get detail html")

def get_detail_url():
    global detail_url_list
    print("starting get detail url")
    time.sleep(2)
    for i in range(20):
        detail_url_list.append(i)
    print("ending get detail url")


if __name__ == '__main__':
    thread1 = threading.Thread(target=get_detail_html, args=("",))
    thread2 = threading.Thread(target=get_detail_url, args=("",))
    start_time = time.time()
    thread1.start()
    thread2.start()thread1.join()
    thread2.join()
    print("using time: {}".format(time.time() - start_time))

這個很容易理解,用起來也很方便。可以幫我們快速的解決比較簡單的問題。

方法二:利用佇列queue。。比價高階點的用法。先看實現方式。

import time
import threading
from queue import Queue, PriorityQueue #可以設定優先順序的queue,調整執行順序

def get_detail_html(queue):
    while True:
        url = queue.get()
        print("starting get detail html")
        time.sleep(4)
        print("ending get detail html")

def get_detail_url(queue):
    while True:
        print("starting get detail url")
        time.sleep(2)
        for i in range(20):
            queue.put(i)
        print("ending get detail url")

if __name__ == '__main__':
    detail_url_queue = Queue(maxsize=10)
    thread1 = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
    thread2 = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
    start_time = time.time()
    thread1.start()
    thread2.start()thread1.join()
    thread2.join()
    print("using time: {}".format(time.time() - start_time))

簡單的說就是:put往queue裡放東西,get從queue裡拿東西。這樣就能達到執行緒間的通訊以及執行緒安全。

既然執行緒中的資料大家都能用,會不會出現一個執行緒在修改資料,結果在還沒有修改完之後,GIL鎖已經被切換。切換後的執行緒也修改這個資料,那就會出現資料的錯亂,髒資料。針對這種情況,python中的lock就完美的解決了這個情況。

from threading import Lock, RLock  #可重入鎖
#RLock使得一個執行緒中,可以連續呼叫多次acquire,但是的有相對應數量的release
import threading

total = 0
lock = Lock()
"""
1、用鎖會影響效能
2、鎖會引起死鎖   1、acquire之後沒有release
                2、執行緒相互等待 兩個資料被兩個執行緒分別獲取,相互等待對方的資源釋放
"""
def add(lock):
    global total
    for i in range(1000000):
     # 上鎖 lock.acquire() total += 1
     # 釋放鎖 lock.release() def desc(lock): global total for i in range(1000000):
     # 上鎖 lock.acquire() total -= 1
# 釋放鎖 lock.release()
thread1 = threading.Thread(target=add, args=(lock,)) thread2 = threading.Thread(target=desc, args=(lock,)) thread1.start() thread2.start() thread1.join() thread2.join() print(total)

針對一般情況下我們lock就可以搞定問題,那有沒有更牛逼的工具讓我們應對更加複雜的情況呢?答案當然是肯定的:

conditon:條件變數,用於複雜的執行緒間同步的鎖

import threading
# condition(條件變數)是用於複雜的執行緒間同步的鎖
from threading import Condition

cond = Condition()
class XiaoAi(threading.Thread):
    def __init__(self, cond):
        super(XiaoAi, self).__init__(name="小愛")
        self.cond = cond

    def run(self):
        # with是一個魔法方法,相當於先上鎖操作完之後再解鎖,和with open相似
        with self.cond:
            # 等待狀態,等著接受通知,接到通知後往下放執行
            self.cond.wait()
            print("{} : 在".format(self.name))
            # 傳送通知
            self.cond.notify()
            # 等待狀態,等著接受通知,接到通知後往下放執行
            self.cond.wait()
            print("{} : 好啊".format(self.name))


class TianMao(threading.Thread):
    def __init__(self, cond):
        super(TianMao, self).__init__(name="天貓")
        self.cond = cond

    def run(self):
        with self.cond:
            print("{} : 小愛在麼?".format(self.name))
            # 傳送通知
            self.cond.notify()
            # 等待狀態,等著接受通知,接到通知後往下放執行
            self.cond.wait()

            print("{} : 我們來對古詩吧!".format(self.name))
            # 傳送通知
            self.cond.notify()

if __name__ == '__main__':
    xiaoai = XiaoAi(cond)
    tianmao = TianMao(cond)

    xiaoai.start()
    tianmao.start()

    xiaoai.join()
    tianmao.join()

在condition中我們可以執行任意多的執行緒。那麼如果我們想控制執行緒的數量該怎麼辦呢?針對這個場景python也是給我們一個工具的:

semaphore 是控制進入數量的鎖, 基於condition實現

# semaphore 是控制進入數量的鎖, 基於condition實現
# 檔案的讀寫,寫一般只有一個執行緒寫,讀可以允許多個

# 爬蟲,控制爬蟲的數量
import threading
import time

class HtmlSpider(threading.Thread):
    def __init__(self, url, sem):
        super().__init__()
        self.url = url
        self.sem = sem

    def run(self):
        time.sleep(2)
        print('get html text success')
        # 釋放鎖, 一共釋放3次
        self.sem.release()
class UrlProducer(threading.Thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem
    def run(self):
        for i in range(20):
            # 呼叫一次,次數減一,為0時,執行緒鎖住
            self.sem.acquire()
            html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
            html_thread.start()
if __name__ == '__main__':
    sem = threading.Semaphore(3)
    url_producer = UrlProducer(sem)
    url_producer.start()

是不是常聽說執行緒池?那這個池子到底是個啥呢?當然是放執行緒的池子唄!!在python中是用

ThreadPoolExecutor來實現執行緒池的!!
第一種用法:
from concurrent.futures import (
            wait, # 使主執行緒阻塞
            ThreadPoolExecutor,   # 執行緒池
            as_completed,   #是一個生成器
            Future   # 未來物件, task的返回容器
            )

"""
執行緒池
主執行緒中可以獲取某一個執行緒的狀態或者某一個任務的狀態,以及返回值
當一個執行緒完成的時候主執行緒能夠立刻知道
futures可以讓多執行緒和多程序介面一致
"""
import time

def get_html(times):
    time.sleep(times)
    print('get page {} success'.format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)


# 通過submit函式提交的函式到執行緒池中,submit是立刻返回的
task1 = executor.submit(get_html, (3))
task2 = executor.submit(get_html, (2))

# done 用於判定某個任務是否完成
print(task1.done())
# cancle取消該執行緒,取消成功返回True,失敗返回False  注意:執行緒在執行過程中不能被取消
# 此時開啟兩個執行緒,所以task1和task2新增之後立刻執行,因此取消不成功
print(task2.cancel())
time.sleep(3)
print(task1.done())

# 可以獲取task1的執行結果
print(task1.result())



"""
# 通過executor 的map(yield task的返回值)獲取已經完成的task值
for data in executor.map(get_html, urls):
    print('future get {} page'.format(data))
"""

第二種用法:

from concurrent.futures import (
            wait, # 使主執行緒阻塞
            ThreadPoolExecutor,   # 執行緒池
            as_completed,   #是一個生成器
            Future   # 未來物件, task的返回容器
            )
import time

def get_html(times):
    time.sleep(times)
    print('get page {} success'.format(times))
    return times
executor = ThreadPoolExecutor(max_workers=2)

# 獲取已經成功的task
urls = [3, 3, 2, 4, 8]
# 新增到excutor之後子執行緒立刻開始執行
all_tasks = [executor.submit(get_html, (url)) for url in urls]
#  使主執行緒阻塞,所有子執行緒執行完成後主執行緒再往下邊執行
wait(all_tasks)
print(12314123)
# as_completed 的程式碼塊和子執行緒的執行並不是同步的
for future in as_completed(all_tasks):
    data = future.result()
    print('get {} page 111111'.format(data))

第三種用法:

from concurrent.futures import (
            wait, # 使主執行緒阻塞
            ThreadPoolExecutor,   # 執行緒池
            as_completed,   #是一個生成器
            Future   # 未來物件, task的返回容器
            )
import time

def get_html(times):
    time.sleep(times)
    print('get page {} success'.format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
# 通過executor 的map(yield task的返回值)獲取已經完成的task值
for data in executor.map(get_html, urls):
    print('future get {} page'.format(data))

 

好了多執行緒就講這麼多吧,有點粗糙,更多的細節以後再補充吧! 下一篇記錄一下多程序!!

&n