1. 程式人生 > >python的多執行緒程式設計

python的多執行緒程式設計

1,python中一個執行緒對應於c語言中的一個執行緒
gil使得同一個時刻只有一個執行緒在一個cpu上執行位元組碼, 無法將多個執行緒對映到多個cpu上執行
gil會根據執行的位元組碼行數以及時間片釋放gil,gil在遇到io的操作時候主動釋放
total = 0

def add():
    #1. dosomething1
    #2. io操作
    # 1. dosomething3
    global total
    for i in range(1000000):
        total += 1
def desc():
    global total
    for i in
range(1000000): total -= 1 import threading thread1 = threading.Thread(target=add) thread2 = threading.Thread(target=desc) thread1.start() thread2.start() thread1.join() thread2.join() print(total)

每一次執行的結果都會不一樣,所以有GIL的python執行緒也不是安全的,但是python遇到io操作的話,會等到io操作時候主動釋放GIL,

2,多執行緒程式設計

①對於io操作來說,多執行緒和多程序效能差別不大

----------------------------------------------------

方式1:

通過Thread類來例項化

import time
import threading

def get_detail_html(url):
    print("get detail html started")
    time.sleep(2)
    print("get detail html end")


def get_detail_url(url):
    print("get detail url started")
    time.sleep(
4) print("get detail url end") if __name__ =="__main__": thread1 = threading.Thread(target=get_detail_html, args=("",)) thread2 = threading.Thread(target=get_detail_url, args=("",)) start_time = time.time() thread1.start() thread2.start() print("last time {}".format(time.time()-start_time))
get detail html started
get detail url started
last time 0.0010006427764892578
get detail html end
get detail url end

執行時間居然是0,兩個執行緒並行時間不應該是2秒嗎?其實實際上這是有3個執行緒,可以通過pycharm的IDE中進行debug

可以看得到其實是三個執行緒的

那就意味著三個執行緒並行,2個執行緒睡2秒,但第三個執行緒依舊可以繼續向下進行,因為他們是並行的,因此,時間才會接近於0,

但是此時雖然主執行緒結束了,但是並沒有退出!子執行緒依舊可以執行,如何設定主執行緒退出之後立即kill掉子執行緒呢?

thread1 = threading.Thread(target=get_detail_html, args=("",))
    thread2 = threading.Thread(target=get_detail_url, args=("",))
    start_time = time.time()
    thread1.setDaemon(True)  # setDaemon 設定為True是將其設定為守護執行緒
    thread2.setDaemon(True)
    thread1.start()
    thread2.start()

但是如何讓這個主執行緒等待其餘2個子執行緒結束之後再去執行呢?

thread1 = threading.Thread(target=get_detail_html, args=("",))
    thread2 = threading.Thread(target=get_detail_url, args=("",))
    start_time = time.time()
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

join()就是設定主執行緒必須等待子執行緒結束之後才能夠退出,注意:必須在start()之後寫

那如何簡化多執行緒程式設計呢?(繼承Thread類)

②通過繼承Thread來實現多執行緒

class GetDetailHtml(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):  過載run方法
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")


class GetDetailUrl(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print("get detail url started")
        time.sleep(4)
        print("get detail url end")
if __name__ == "__main__":
    thread1 = GetDetailHtml("get_detail_html")
    thread2 = GetDetailUrl("get_detail_url")
    start_time = time.time()
    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()

    # 當主執行緒退出的時候, 子執行緒kill掉
    print("last time: {}".format(time.time() - start_time))

那歸根到底就能夠自定義很多複雜的邏輯了

---------------------------------------------------------

執行緒間的通訊和共享變數

從第一個例子中我們就公用了同一個total變數

但是共享變數會導致變數被反覆修改

# 通過queue的方式進行執行緒間同步
from queue import Queue
import time
import threading


def get_detail_html(queue):
    # 爬取文章詳情頁
    while True:
        url = queue.get()  # queue是一個阻塞方法,佇列中沒有值得時候他會一直阻塞
        # for url in detail_url_list:
        print("get detail html started")
        time.sleep(2)
        print("get detail html end")


def get_detail_url(queue):
    # 爬取文章列表頁
    while True:
        print("get detail url started")
        time.sleep(4)
        for i in range(20):
            queue.put("http://projectsedu.com/{id}".format(id=i))  # 佇列滿了也會阻塞住
        print("get detail url end")


# 1. 執行緒通訊方式- 共享變數

if __name__ == "__main__":
    detail_url_queue = Queue(maxsize=1000)  # 宣告最大值的訊息佇列,執行緒是安全的

    thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
    for i in range(10):
        html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
        html_thread.start()
    # # thread2 = GetDetailUrl("get_detail_url")
    start_time = time.time()
    # thread_detail_url.start()
    # thread_detail_url1.start()
    #
    # thread1.join()
    # thread2.join()
    detail_url_queue.task_done()  # 必須呼叫
    detail_url_queue.join()  # 和執行緒一致

    # 當主執行緒退出的時候, 子執行緒kill掉
    print("last time: {}".format(time.time() - start_time))

因此,當涉及到共享變數的時候,首先推薦採用queue來完成

1,執行緒安全

2,對於可以採用task_done 隨時停止

-----------------------------------------------------------------------------------------

4,執行緒同步:(鎖機制)

# -*- coding:UTF-8 -*-
__autor__ = 'zhouli'
__date__ = '2018/12/18 21:44'


from threading import Lock


total = 0
lock = RLock()


def add():
    # 1. dosomething1
    # 2. io操作
    # 1. dosomething3
    global lock
    global total
    for i in range(1000000):
        lock.acquire()
        total += 1
        lock.release()


def desc():
    global total
    global lock
    for i in range(1000000):
        lock.acquire()
        total -= 1
        lock.release()


import threading

thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()

#
thread1.join()
thread2.join()
print(total)

# 1. 用鎖會影響效能
# 2. 鎖會引起死鎖
# 死鎖的情況 A(a,b)

加鎖一定要釋放!!否則死鎖!!

因為使用鎖的情況下會很繞,所以python給我們重新定義了一個Rlock(可重入的鎖)

# 在同一個執行緒裡面,可以連續呼叫多次acquire, 一定要注意acquire的次數要和release的次數相等

程式碼修改如下:

from threading import Lock, RLock, Condition  # 可重入的鎖

# 在同一個執行緒裡面,可以連續呼叫多次acquire, 一定要注意acquire的次數要和release的次數相等
total = 0
lock = RLock()


def add():
    # 1. dosomething1
    # 2. io操作
    # 1. dosomething3
    global lock
    global total
    for i in range(1000000):
        lock.acquire()
        lock.acquire()
        total += 1
        lock.release()
        lock.release()


def desc():
    global total
    global lock
    for i in range(1000000):
        lock.acquire()
        total -= 1
        lock.release()


import threading

thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()

#
thread1.join()
thread2.join()
print(total)

# 1. 用鎖會影響效能
# 2. 鎖會引起死鎖
# 死鎖的情況 A(a,b)
"""
A(a、b)
acquire (a)
acquire (b)

B(a、b)
acquire (a)
acquire (b)
"""

在同一個執行緒裡面才是如此,不同執行緒之間還是一個互相競爭的關係!

多執行緒的難點:condition(條件變數)

他是多執行緒中用於複雜的多執行緒通訊中的鎖,條件變數

通過原始碼可知其中的wait和notify方法

其中wait()方法是等待執行緒的的啟動,notify去通知另一個執行緒的啟動

import threading


# 條件變數, 用於複雜的執行緒間同步
# class XiaoAi(threading.Thread):
#     def __init__(self, lock):
#         super().__init__(name="小愛")
#         self.lock = lock
#
#     def run(self):
#         self.lock.acquire()
#         print("{} : 在 ".format(self.name))
#         self.lock.release()
#
#         self.lock.acquire()
#         print("{} : 好啊 ".format(self.name))
#         self.lock.release()
#
#
# class TianMao(threading.Thread):
#     def __init__(self, lock):
#         super().__init__(name="天貓精靈")
#         self.lock = lock
#
#     def run(self):
#         self.lock.acquire()
#         print("{} : 小愛同學 ".format(self.name))
#         self.lock.release()
#
#         self.lock.acquire()
#         print("{} : 我們來對古詩吧 ".format(self.name))
#         self.lock.release()


# 通過condition完成協同讀詩

class XiaoAi(threading.Thread):
    def __init__(self, cond):
        super().__init__(name="小愛")
        self.cond = cond

    def run(self):
        with self.cond:  # 一定要使用with語句
            self.cond.wait()  # 後說話使用先要等待
            print("{} : 在 ".format(self.name))
            self.cond.notify()  # 去通知

            self.cond.wait()
            print("{} : 好啊 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 君住長江尾 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 共飲長江水 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 此恨何時已 ".format(self.name))
            self.cond.notify()

            self.cond.wait()
            print("{} : 定不負相思意 ".format(self.name))
            self.cond.notify()


class TianMao(threading.Thread):
    def __init__(self, cond):
        super().__init__(name="天貓精靈")
        self.cond = cond

    def run(self):
        with self.cond:
            print("{} : 小愛同學 ".format(self.name))
            self.cond.notify()  # 先去通知
            self.cond.wait()  # 等待

            print("{} : 我們來對古詩吧 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 我住長江頭 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 日日思君不見君 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 此水幾時休 ".format(self.name))
            self.cond.notify()
            self.cond.wait()

            print("{} : 只願君心似我心 ".format(self.name))
            self.cond.notify()
            self.cond.wait()


if __name__ == "__main__":
    from concurrent import futures

    cond = threading.Condition()
    xiaoai = XiaoAi(cond)
    tianmao = TianMao(cond)

    # 啟動順序很重要
    # 在呼叫with cond之後才能呼叫wait或者notify方法
    # condition有兩層鎖, 一把底層鎖會線上程呼叫了wait方法的時候釋放, 上面的鎖會在每次呼叫wait的時候分配一把並放入到cond的等待佇列中,等到notify方法的喚醒
    xiaoai.start()
    tianmao.start()

 5,Semaphore的使用

# Semaphore 是用於控制進入數量的鎖
# 檔案, 讀、寫, 寫一般只是用於一個執行緒寫,讀可以允許有多個

# 做爬蟲
import threading
import time


class HtmlSpider(threading.Thread):
    def __init__(self, url, sem):
        super().__init__()
        self.url = url
        self.sem = sem

    def run(self):
        time.sleep(2)
        print("got html text success")
        self.sem.release()  # 一定要注意鎖的釋放的位置,一旦鎖被釋放sem就會增加1


class UrlProducer(threading.Thread):
    def __init__(self, sem):
        super().__init__()
        self.sem = sem

    def run(self):
        for i in range(20):
            self.sem.acquire()
            html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
            html_thread.start()


if __name__ == "__main__":
    sem = threading.Semaphore(3)
    url_producer = UrlProducer(sem)
    url_producer.start()

6,執行緒池

from concurrent.futures import ThreadPoolExecutor

為什麼要執行緒池?

主執行緒中可以獲取某一個執行緒的狀態或者某一個任務的狀態,以及返回值

當一個執行緒完成的時候我們主執行緒能立即知道

 futures可以讓多執行緒和多程序編碼介面一致

 

 

from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED


# 未來物件,task的返回容器


# 執行緒池, 為什麼要執行緒池
# 主執行緒中可以獲取某一個執行緒的狀態或者某一個任務的狀態,以及返回值
# 當一個執行緒完成的時候我們主執行緒能立即知道
# futures可以讓多執行緒和多程序編碼介面一致
import time


def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times


executor = ThreadPoolExecutor(max_workers=2)
# 通過submit函式提交執行的函式到執行緒池中, submit 是立即返回
task1 = executor.submit(get_html, (3,))  # 第一個引數是函式名稱,第二個引數是引數
task2 = executor.submit(get_html, (2,))  # submit的返回時是非常重要,用於判斷是否執行成功等
print(task1.done)  # 判斷任務是否完成

結果

當然task1.result()方法也是可以的,檢視task的結果

實際上我們也可以將某一個任務關閉掉,但是要注意,任務在執行中或者是執行完成時是無法取消的,只有未開始執行才會被cancel()掉

# 要獲取已經成功的task的返回
urls = [3, 2, 4]
all_task = [executor.submit(get_html, (url,)) for url in urls]

for future in as_completed(all_task):  # as_completed 實際上是一個生成器,將已經完成的返回
    data = future.result()
    print("get {} page".format(data))

這個執行結果順序是誰先完成任務誰先出來

或者

# 要獲取已經成功的task的返回
urls = [3, 2, 4]
all_task = [executor.submit(get_html, (url,)) for url in urls]
# for future in as_completed(all_task):  # as_completed 實際上是一個生成器,將已經完成的返回
#     data = future.result()
#     print("get {} page".format(data))
# 通過executor的map獲取已經完成的task的值
for data in executor.map(get_html, urls):  # map方法更加簡單
    print("get {} page".format(data))

但是這樣和上面的不一樣的是,這邊直接返回的就是結果了,也就是data = future.result()這一步被省略了

而且map方法返回的順序是列表的順序

wait 方法:(讓主執行緒進行阻塞)

# 要獲取已經成功的task的返回
urls = [3, 2, 4]
all_task = [executor.submit(get_html, (url,)) for url in urls]
wait(all_task, return_when=FIRST_COMPLETED)  # 讓主執行緒阻塞,如果沒有return_when引數 預設是等待全部任務結束放行
print("main")
# for future in as_completed(all_task):  # as_completed 實際上是一個生成器,將已經完成的返回
#     data = future.result()
#     print("get {} page".format(data))
# 通過executor的map獲取已經完成的task的值
for data in executor.map(get_html, urls):  # map方法更加簡單
    print("get {} page".format(data))

放上完整版

from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
from concurrent.futures import Future
from multiprocessing import Pool

# 未來物件,task的返回容器


# 執行緒池, 為什麼要執行緒池
# 主執行緒中可以獲取某一個執行緒的狀態或者某一個任務的狀態,以及返回值
# 當一個執行緒完成的時候我們主執行緒能立即知道
# futures可以讓多執行緒和多程序編碼介面一致
import time


def get_html(times):
    time.sleep(times)
    print("get page {} success".format(times))
    return times


executor = ThreadPoolExecutor(max_workers=2)
# 通過submit函式提交執行的函式到執行緒池中, submit 是立即返回
task1 = executor.submit(get_html, (3,))  # 第一個引數是函式名稱,第二個引數是引數
task2 = executor.submit(get_html, (2,))  # submit的返回時是非常重要,用於判斷是否執行成功等


# 要獲取已經成功的task的返回
urls = [3, 2, 4]
all_task = [executor.submit(get_html, (url,)) for url in urls]
wait(all_task, return_when=FIRST_COMPLETED)  # 讓主執行緒阻塞,如果沒有return_when引數 預設是等待全部任務結束放行
print("main")
# for future in as_completed(all_task):  # as_completed 實際上是一個生成器,將已經完成的返回
#     data = future.result()
#     print("get {} page".format(data))
# 通過executor的map獲取已經完成的task的值
for data in executor.map(get_html, urls):  # map方法更加簡單
    print("get {} page".format(data))


# #done方法用於判定某個任務是否完成
# print(task1.done())
# print(task2.cancel())
# time.sleep(3)
# print(task1.done())
#
# #result方法可以獲取task的執行結果
# print(task1.result())