1. 程式人生 > >python大佬養成計劃----執行緒與多執行緒

python大佬養成計劃----執行緒與多執行緒

建立執行緒

一個程序必有一個執行緒,程序也可由多個執行緒組成,但有一個執行緒為主執行緒。若一個任務需要花10Mins,當只有一個執行緒時,花費10Mins,當有十個執行緒時,可能就花費1Mins,所以多執行緒可以提升任務執行時間,提高工作效率。python裡與執行緒有關的模組:

  • _thread 底層
  • threading

檢視當前執行的執行緒個數:threading.current_thread()檢視當前執行緒資訊:threading.active_count()

import _thread
import threading

def job():

    print("當前執行緒個數:",threading.active_count())
    print("當前執行緒資訊",threading.current_thread())

if __name__=='__main__':
    job()

圖片描述

_thread建立多執行緒

呼叫thread模組中的start_new_thread()函式來產生新執行緒。thread.start_new_thread(function,args = ())

#_thread建立多執行緒
import _thread
import time

def job(name):
    print("name:%s,time:%s" %(name,time.ctime()))

if __name__=="__main__":
    # 建立多個執行緒, 但是沒有開始執行任務
    _thread.start_new_thread(job,('thread1',))
    _thread.start_new_thread(job,('thread2',))
    while True: #盲等待
        pass

圖片描述

threading通過例項化Thread類建立多執行緒

_thread模組提供了低級別的、原始的執行緒以及一個簡單的鎖。threading模組是對_thread再封裝,對使用者更友好通過例項化Thread物件建立執行緒,Thread的方法有:

  • run() #Method representing the thread's activity.
  • start() #Start the thread's activity.
  • join() #Wait until the thread terminates.
  • is_alive() #Return whether the thread is alive.
import threading

def job(name):
    print("當前執行的任務名:",name)
    print("當前執行緒個數:",threading.active_count())
    print("當前執行緒資訊:",threading.current_thread())

if __name__=="__main__":
    t1 = threading.Thread(target=job,name='thread1',args=('job1',))
    t2 = threading.Thread(target=job,name='thread2',args=('job2',))
    t1.start()  #Start the thread's activity.
    t2.start()

圖片描述

使用多執行緒與不使用多執行緒的對比

不使用多執行緒執行任務,程式會一直等待sleep時間過去,在執行下一條命令。

#不使用多執行緒
import time

def music(name):
    for i in range(2):
        print("i am listening :",name)
        time.sleep(2)
def read(book):
    for i in range(2):
        print("i am reading :",book)
        time.sleep(1)
if __name__ == '__main__':
    start_time = time.time()
    music("空空如也")
    read('面紗')
    print("花費時間: %s" %(time.time()-start_time))

圖片描述

使用多執行緒執行任務,在遇到某一執行緒需要等待時,會執行其他執行緒Thread.join()會等待當前執行緒執行結束,再執行主執行緒。

import threading
import time


def music(name):
    for i in range(2):
        print("i am listening :",name)
        time.sleep(2)
def read(book):
    for i in range(2):
        print("i am reading :",book)
        time.sleep(1)
if __name__=="__main__":
    start_time = time.time()
    t1 = threading.Thread(target=music,args=('空空如也',))
    t2 = threading.Thread(target=read,args=('面紗',))
    t1.start()
    t2.start()
    t1.join()   #等待執行緒執行結束,才執行主程式,防止主執行緒阻塞子執行緒
    t2.join()
    end_time = time.time()
    print("任務執行時間:",end_time-start_time)

圖片描述

守護執行緒setDeamon

當申明一個子執行緒為守護執行緒時,主執行緒結束時,子執行緒也結束。申明守護執行緒需要在開啟執行緒之前。

import threading
import time

def music(name):
    for i in range(2):
        print("listening music :",name)
        time.sleep(4)

def code(pro):
    for i in range(2):
        print('i am coding :',pro)
        time.sleep(5)

if __name__=='__main__':
    st_time = time.time()
    t1 = threading.Thread(target=music,args=('hello',))
    t2 = threading.Thread(target=code,args=('mydiff',))
    #將執行緒申明為守護執行緒,如果設定為True,當主執行緒結束,子執行緒也結束
    #必須在啟動執行緒之前進行設定
    t1.setDaemon(True)
    t2.setDaemon(True)  #主執行緒執行結束之後,子執行緒還沒來得及執行結束,整個程式就退出了
    t1.start()
    t2.start()
    end_time = time.time()
    print('執行時間:',end_time-st_time)

圖片描述

執行緒同步

如果多個執行緒共同對某個資料修改,則可能出現不可預料的結果,為了保證資料的正確性,需要對多個執行緒進行同步。 使用Thread物件的Lock和Rlock可以實現簡單的執行緒同步,這兩個物件都有acquire方法和release方法,對於那些需要每次只允許一個執行緒操作的資料,可以將其操作放到acquire和release方法之間。

import threading

def add(lock):
    #操作變數之前加鎖
    lock.acquire()
    global money
    for i in range(1389993):
        money+=1
    #變數操作完成之後,解鎖
    lock.release()

def reduce(lock):
    #操作變數之前加鎖
    lock.acquire()
    global money
    for i in range(4728937):
        money-=1
    #變數操作完成之後,解鎖
    lock.release()

if __name__=="__main__":
    money = 0
    lock = threading.Lock() #示例化一個鎖物件
    t1 = threading.Thread(target=add,args=(lock,))
    t2 = threading.Thread(target=reduce,args=(lock,))
    t1.start()
    t2.start()

    t1.join()
    t2.join()
    print('最終金額為:',money)

圖片描述

GIL全域性直譯器鎖

Python 程式碼的執行由 Python 虛擬機器(也叫直譯器主迴圈)來控制。Python 在設計之初就考慮到要在主迴圈中,同時只有一個執行緒在執行,就像單 CPU 的系統中執行多個程序那樣,記憶體中可以存放多個程式,但任意時刻,只有一個程式在 CPU 中執行。同樣地,雖然 Python 直譯器中可以“執行”,多個執行緒,但在任意時刻,只有一個執行緒在直譯器中執行。

對 Python 虛擬機器的訪問由全域性直譯器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個執行緒在執行。

執行過程:

1). 設定GIL
2). 切換到執行緒去執行對應的任務;
3). 執行
    - 執行完了
    - time.sleep()
    - 獲取其他資訊才能繼續執行, eg: 從網路上獲取網頁資訊等;
3. 把執行緒設定為睡眠狀態
4. 解鎖GIL
5.再次重複執行上述內容;

生產者消費者模型

在工作中,某些模組生成一些資料,由另一些模組負責處理。產生資料的模組,就形象地稱為生產者;而處理資料的模組,就稱為消費者。在生產者與消費者之間在加個緩衝區,我們形象的稱之為倉庫,生產者負責往倉庫了進商品,而消費者負責從倉庫裡拿商品,這就構成了生產者消費者模式。這裡,我們用生產者消費者模型來實現多執行緒的網址訪問,節省時間。

#多執行緒實現生產者消費者模型
#實現不同的網址或ip訪問
import threading
from urllib.request import urlopen


def create_data():
    with open('ips.txt','w') as f:
        f.write("www.baidu.com\n")
        f.write("www.163.com\n")
        for i in range(100):
            f.write('172.25.254.%s\n' %(i+1))
def creat_url(filename='ips.txt'):
    ports=[80,443]
    with open(filename) as f:
        ips = [url_info.strip() for url_info in f.readlines()]
    urls = ['http://%s:%s' %(ip,port) for ip in ips for port in ports]
    return urls

def job(url):
    try:
        urlObj = urlopen(url)
    except Exception as e :
        print('Warnning!!!    %s不可訪問' %(url))
    else:
        print("%s可以訪問" %(url))

if __name__=="__main__":
    urls = creat_url()
    threads = []
    for url in urls:
        t = threading.Thread(target=job,args=(url,))
        threads.append(t)
        t.start()
    [thread.join() for thread in threads]
    print("任務執行結束")

圖片描述

再封裝threading.Thread類

無參版

對threading.Thread類的再封裝,執行時無需傳遞引數

from threading import Thread
class IpThread(Thread):
    def __init__(self):
        super(IpThread, self).__init__()
# 將多執行緒需要執行的任務重寫到run方法中;
    def run(self):
        print("this is a JOB")
        print(type(self))

t = IpThread()
t.start()

圖片描述

含參版

實現訪問Ip地址

import json
from threading import Thread
from urllib.request import urlopen

class IpThread(Thread):
    #重寫構造方法,如果執行的任務需要傳遞引數,那將引數與self繫結
    def __init__(self,jobname,ip):
        super(IpThread, self).__init__()
        self.jobname = jobname
        self.ip = ip
    #將多執行緒需要執行的任務重寫到run方法中
    def run(self):
        print('this is a %s job' %(self.jobname))
        #需要有一個引數,傳遞ip
        url = "http://ip.taobao.com/service/getIpInfo.php?ip=%s" % (self.ip)
        try :
            # 根據url獲取網頁的內容, 並且解碼為utf-8格式, 識別中文;
            result = urlopen(url).read().decode('utf-8')
        except Exception as e:
            print("訪問%s失敗" %(self.ip))
        else:
             # 將獲取的字串型別轉換為字典, 方便處理
            d = json.loads(result)['data']
            country = d['country']
            city = d['city']
        print("%s位於%s,城市為%s" %(self.ip,country,city))

if __name__=="__main__":
    ips = ['172.25.254.22','8.8.8.8','89.31.136.0']
    threads = []
    for ip in ips :
        t = IpThread(jobname='Clawer',ip=ip)
        threads.append(t)
        t.start()
    [thread.join() for thread in threads]
    print("程式執行結束")

圖片描述