1. 程式人生 > >python(十一)、線程

python(十一)、線程

nts 不能 target 進程同步 UC 只有一個 pan 頻繁 %s

一、基本概念

  進程是資源分配的基本單位,而線程則是CPU調度和分派的基本單位。系統需要執行創建進程、撤銷進程和進程切換等任務,但創建進程開銷大限制了並發的提高。因此,成百上千個進程會造成內存浪費,並且頻繁切換導致每個進程執行(時間變短)效率降低。因此有了線程的概念。

  引入進程的目的是為了使多個程序並發執行,以改善資源利用率、提高系統吞吐量;引入線程的目的則是為了減少程序並發執行時造成的時空開銷。即線程既能降低系統資源頻繁切換,又能滿足進程這種多任務並發異步執行的功能。

  線程和進程關系

  1.一個進程可以有多個線程,但至少要有一個線程;一個線程只能在一個進程的地址空間內活動。

  2.資源分配給進程,同一進程的所有線程共享該進程內的所有資源。

  3.處理機分配給線程,真正在處理機上運行的是線程。

  4.線程在執行過程中需要協作同步。不同進程的線程要利用消息通信的辦法實現同步。

  5.由於線程擁有較少的資源,但又具有傳統進程的許多特性,因此線程可被稱為輕型進程(light weight process, LWP),傳統進程相對稱為重型進程(heavy weight process, HWP)。

  6.一個線程可以創建和撤銷另一個線程

  線程優點:易於調度;提高並發量;開銷少;能夠充分發揮多處理器的功能。

  線程模型:和進程一樣,包括TCB(Thread Controller Block 線程控制塊)、程序和數據。Thread結構包括線程標識符、調度狀態信息、核心棧指針、用戶棧指針及私有存儲區等。

  內核級線程和用戶級線程

    - 內核級線程(Kernel Supported threads,KST):內核控制線程的創建、撤銷和切換,並為每個內核級線程創建TCB,從而感知其存在。內核級線程的優點是:1.在多處理器上,內核可以調用同一進程中的多個線程同時工作;2.如果一個進程中的某個線程阻塞,其他線程仍然可以繼續運行。其缺點是:由於線程由CPU調度和分派,用戶態線程要經由操作系統進入內核,用戶態不同進程的多個線程進行切換時,都要進入內核再進行切換,切換代價較大。

    - 用戶級線程(User Level Threads,ULT):開放給程序員的、可以通過線程庫(如python的Threading.py)創建的線程。用戶級線程只存在於用戶空間,內核並不能看到用戶線程,並且內核資源的分配仍然是按照進程進行分配的;各個用戶線程只能在進程內進行資源競爭。用戶級線程的優點是:1.同進程內線程切換不需要轉換到內核空間,節省了內核空間;2.線程調度算法可以是進程內專用,由用戶程序進行指定;3.用戶級線程實現和操作系統無關。其缺點是:1.如果系統調用同一進程中某個線程時阻塞,整個進程阻塞;2.一個進程只能在一個cpu上

獲得執行。

    - 用戶級線程和內核級線程有著一對一、一對多和混合型的映射關系,具體映射關系由操作系統來決定。技術分享圖片

  線程狀態:線程的狀態和進程類似。運行狀態:線程在CPU上執行;就緒狀態:具備運行條件,一旦分配到CPU就可以立即執行;阻塞狀態:線程在等待某個條件的發生從而轉為就緒狀態。

技術分享圖片

  其它有關線程的概念都可以參考進程有關概念。

二、python線程模塊

  threading是Python中內置的線程模塊,能夠實現用戶級線程的管理。在Cpython中,python中的一個線程對應c語言中的一個線程。

  1.線程創建

  線程創建可以通過函數或者子類的方式實現。The Thread class represents an activity that is run in a separate thread of control. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass. No other methods (except for the constructor) should be overridden in a subclass. In other words, only override the __init__() and run() methods of this class。

from threading import Thread
def desc(step):
    global num
    for i in range(step):
        # print("desc-----: ", num)
        num -= 1
    print("----------num------------", num)
def add(step):
    global num
    for i in range(step):
        # print("add: ", num)
        num += 1
    print("----------num------------", num)
if __name__ == __main__:
    num = 0            # 由於共享進程資源,num被子線程共享
    step = 1000        # 也可以作為參數傳進去來共享變量,而進程必須用隊列或者管道
    p1 = Thread(target=desc, args=(step, ))
    p2 = Thread(target=add, args=(step, ))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(num)

  以子類繼承的方式重現上述邏輯。

from threading import Thread

class Desc(Thread):
    def __init__(self, step):
        super().__init__()
        self.step = step
    def run(self):
        global num
        for i in range(self.step):
            print("desc-----: ", num)
            num -= 1

class Add(Thread):
    def __init__(self, step):
        super().__init__()
        self.step = step
    def run(self):
        global num
        for i in range(self.step):
            print("add: ", num)
            num += 1
if __name__ == __main__:
    num = 0
    step = 1000000
    p1 = Desc(step)
    p2 = Add(step)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(num)

  錯誤示例在於將共享變量賦給對象屬性後,對對象屬性進行了自增(自減運算)而沒有操作共享變量num。

技術分享圖片
from threading import Thread

class Desc(Thread):
    def __init__(self, step):
        super().__init__()
        self.step = step
        self.num = num
    def run(self):
        for i in range(self.step):
            print("desc-----: ", self.num)
            self.num -= 1
        print("----------num------------", self.num)

class Add(Thread):
    def __init__(self, step):
        super().__init__()
        self.step = step
        self.num = num
    def run(self):
        for i in range(self.step):
            print("add: ", self.num)
            self.num += 1
        print("----------num------------", self.num)
if __name__ == __main__:
    num = 0
    step = 1000
    p1 = Desc(step)
    p2 = Add(step)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(num)
錯誤示例

  一些線程自帶的函數。

from threading import Thread
from threading import (active_count, current_thread, get_ident, enumerate, main_thread)
import time
class Example(Thread):
    def __init__(self):
        super().__init__()
    def run(self):
        print("current_thread: ", current_thread())  # 當前線程標識符
        print("get_ident: ", get_ident())  # 當前線程
        time.sleep(3)
        print("-------------------------------------------------")


if __name__ == __main__:
    p1 = Example()
    p1.start()
    # p1.setDaemon(True)                      # 守護線程,主線程結束子線程如果沒結束就直接被kill掉
    print("active_count: ", active_count())   # 活躍線程數:2
    print("enumerate: ", enumerate())         # 當前進程內活躍的線程對象
    p1.join()    # 主線程等待子線程結束再結束/不寫時主線程結束,子線程繼續執行
    print("active_count: ", active_count())   # 活躍線程數:1 - 主線程
    print("current_thread: ", current_thread()) # 當前線程標識符
    print("get_ident: ", get_ident())         # 當前線程
    print("main_thread: ", main_thread())     # 主線程對象

  2.全局解釋鎖GIL

  如果將上面的step設置一個非常大的值,那麽num值就有各種結果。這裏(解釋器Cpython)就要說到全局解釋鎖GIL (Global interpreter Lock)。它有兩個特點:

  1.設計之初為了追求簡單,會在Cpython上加一把全局鎖,能夠控制多線程對同一資源的訪問。但後來導致的問題是,在同一時刻只有一個線程在一個CPU上執行,也即多個線程無法利用多個CPU。

  2.python會按照一定字節碼數量(比如1000行字節碼)和一定時間間隔(比如15毫秒)主動釋放GIL鎖。多個線程此時可以爭搶GIL鎖。這破壞了全局鎖的初衷(限制多線程的資源訪問,保證數據的準確性),導致GIL鎖變得很雞肋。

  3.python會在遇到IO操作時會主動釋放GIL。因此python多線程在做I/O操作時任務時(如爬蟲)會具有優勢。

  因此,通過共享變量的方式進行線程間通信是不安全的。一般會通過隊列的方式實現線程間通信,它是線程安全的(隊列裏的數據只有一份。。。)。

from threading import Thread
from queue import Queue

def desc(step):
    for i in range(step):
        num = q.get() - 1
        print("desc-----: ", num)
        q.put(num)
def add(step):
    for i in range(step):
        num = q.get() + 1
        print("add: ", num)
        q.put(num)

if __name__ == __main__:
    q = Queue()   # queue隊列實現了線程安全
    q.put(0)
    step = 1000000
    p1 = Thread(target=desc, args=(step,))
    p2 = Thread(target=add, args=(step,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(q.get())

  3、Lock和Rlock,Semaphore

  由於GIL鎖的主動釋放,在線程間共享變量進行同步計算時,會導致結果不準確,也就是多線程切換計算,會造成重復賦值的極端情況。實質上是在STORE_FAST這一步發生了切換。

import dis
def add(num):
    num -= 1if __name__ == __main__:
    print(dis.dis(add))
  3           0 LOAD_FAST                0 (num)
2 LOAD_CONST 1 (1)
4 INPLACE_SUBTRACT
6 STORE_FAST 0 (num)
8 LOAD_CONST 0 (None)
10 RETURN_VALUE
None

  線程鎖Lock是在保證原子操作的基礎上,對共享變量進行同步限制。根據同步原語(獲得鎖 -- dosomething -- 釋放鎖),Lock有兩個方法acquire和release。前者獲取鎖,release釋放鎖,中間部分則是不可分割的代碼邏輯。線程鎖是全局對象,用於操作所有線程。

技術分享圖片
from threading import Thread, Lock

class Desc(Thread):
    def __init__(self, step):
        super().__init__()
        self.step = step
    def run(self):
        global num
        for i in range(self.step):
            lock.acquire()
            num -= 1
            lock.release()

class Add(Thread):
    def __init__(self, step):
        super().__init__()
        self.step = step
    def run(self):
        global num
        for i in range(self.step):
            lock.acquire()
            num += 1
            lock.release()
if __name__ == __main__:
    num = 0
    step = 1000000
    lock = Lock()
    p1 = Desc(step)
    p2 = Add(step)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(num)
Lock示例

  acquire和release的所包裹的代碼要盡可能小,本例中只包含可能發生重復賦值(線程不安全)的那行代碼,如此並不影響兩個線程for循環的切換。

  線程鎖的弊端在於:1.線程會影響性能;2.會造成死鎖。註意,這句話是相對多線程共享數據操作而言的,對於隊列不適用。另外,acquire和release之間的狀態是阻塞的。

  Lock只能讓acquire和release成對出現,當想要訪問多個共享變量時,在一個鎖內控制多個共享變量顯然是不符合實需的,另外,在鎖內加鎖(嵌套鎖)Lock也是無法實現的。

  遞歸鎖則可以實現上面的缺陷。它也要求有多少個acquire就要有多少個release。

from threading import Thread, Lock, RLock

class Desc(Thread):
    def __init__(self, step):
        super().__init__()
        self.step = step
    def run(self):
        global num
        for i in range(self.step):
            lock.acquire()
            num -= 1
            lock.release()

class Add(Thread):
    def __init__(self, step):
        super().__init__()
        self.step = step
    def run(self):
        global num
        for i in range(self.step):
            lock.acquire()
            num += 2
            lock.acquire()
            num -= 1
            lock.release()
            lock.release()
if __name__ == __main__:
    num = 0
    step = 1000000
    lock = RLock()
    p1 = Desc(step)
    p2 = Add(step)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(num)

  Lock和RLock都支持上下文管理,即with語句。

  Semaphore基於Condition和RLock、Lock生成一個信號量("鎖池"),而不是無限制的使用acquire和release。在多線程時,如果鎖池內的鎖被用完了,那麽其它線程進入阻塞狀態,等待占有鎖的線程釋放鎖。

from threading import Thread, Semaphore, current_thread
import time

class Fn(Thread):
    def __init__(self, sm):
        super().__init__()
        self.sm = sm
    def run(self):
        self.sm.acquire()
        print(current_thread: {}, {}.format(current_thread().name, current_thread().ident))
        time.sleep(2)
        self.sm.release()
if __name__ == __main__:
    sm=Semaphore(3)
    t_list = []
    for i in range(10):
        t = Fn(sm)
        t_list.append(t)
    for t in t_list:
        t.start()

  4、線程同步

  條件變量Condition用於線程間同步執行。線程同步和進程同步相似,實質上是通過線程鎖互斥,將並行異步變成了串行同步(阻塞)。Condittion也是基於Lock和RLock實現的。

  A condition variable obeys the context management protocol: using the with statement acquires the associated lock for the duration of the enclosed block.

  官方解釋提了兩個重要的信息:1.可以用with語句創建condition,此時不用寫acquire和release,只需要在with上下文內寫邏輯即可;2.可以通過acquire和relrease獲取和釋放鎖,邏輯寫在鎖內部。

  The wait() method releases the lock, and then blocks until another thread awakens it by calling notify() or notify_all(). Once awakened, wait() re-acquires the lock and returns. It is also possible to specify a timeout.

  wait和notify(notify_all)是一對方法。wait用於本線程阻塞,直到得到其它線程的notify通知,再從阻塞狀態轉到就緒狀態(運行);notify用於本線程通知其它一個(notify_all是多個)線程,可以從阻塞狀態轉到就緒狀態(運行)。請註意前文配圖

from threading import Thread, Condition

class Poetry1(Thread):
    def __init__(self, con, poetry):
        super().__init__()
        self.poetry = poetry
        self.con = con
    def run(self):
        global lis      
        with self.con:       
            for line in self.poetry:
                lis.append(line)
                self.con.notify()
                self.con.wait()
class Poetry2(Thread):
    def __init__(self, con, poetry):
        super().__init__()
        self.poetry = poetry
        self.con = con
    def run(self):
        global lis
        with self.con:
            for line in self.poetry:
                self.con.wait()
                lis.append(line)
                self.con.notify()
if __name__ == __main__:
    con = Condition()
    lis = []
    poy1 = ["楚國多豪俊,", "每與豺狼交,"]
    poy2 = ["相比得劍術。", "片血不沾衣。"]
    p1 = Poetry1(con, poy1)
    p2 = Poetry2(con, poy2)
    p2.start()     # 必須讓wait的線程先跑起來,從新生狀態轉到阻塞狀態,等待notify激活
    p1.start()
    p1.join()
    p2.join()
    print("\r\n".join(lis))

"""
楚國多豪俊,
相比得劍術。
每與豺狼交,
片血不沾衣。
"""

  第二種寫法: con.acquire()和con.release()。

技術分享圖片
from threading import Thread, Condition

class Poetry1(Thread):
    def __init__(self, con, poetry):
        super().__init__()
        self.poetry = poetry
        self.con = con
    def run(self):
        global lis
        self.con.acquire()
        for line in self.poetry:
            lis.append(line)
            self.con.notify()
            self.con.wait()
        self.con.release()


class Poetry2(Thread):
    def __init__(self, con, poetry):
        super().__init__()
        self.poetry = poetry
        self.con = con
    def run(self):
        global lis
        self.con.acquire()
        for line in self.poetry:
            self.con.wait()
            lis.append(line)
            self.con.notify()
        self.con.release()
if __name__ == __main__:
    con = Condition()
    lis = []
    poy1 = ["楚國多豪俊,", "每與豺狼交,"]
    poy2 = ["相比得劍術。", "片血不沾衣。"]
    p1 = Poetry1(con, poy1)
    p2 = Poetry2(con, poy2)
    p2.start()
    p1.start()
    p1.join()
    p2.join()
    print("\r\n".join(lis))
Condition

  官方文檔給出了管用的線程鎖的模型:

# Consume one item
with cv:
    while not an_item_is_available():
        cv.wait()
    get_an_available_item()

# Produce one item
with cv:
    make_an_item_available()
    cv.notify()

  現在來復原這段代碼:

  生產者(Producer): 如果隊列中的包子數量小於20,立刻生產10個包子;消費者(Consumer):如果隊列中的包子數量大於20,立刻消費3個包子。

  設置4個生產者和10個消費者,開啟循環。

from threading import Thread, Condition, current_thread
from queue import Queue
import time

class Producer(Thread):
    def __init__(self, con, q):
        super().__init__()
        self.con = con
        self.q = q

    def run(self):
        while True:
            with self.con:
                while self.q._qsize() > 20:
                    self.con.wait()
                for i in range(10):
                    self.q.put("包子")
                print("{}: 生產了10個包子.".format(current_thread().name))
                self.con.notify()

class Consumer(Thread):
    def __init__(self, con, q):
        super().__init__()
        self.con = con
        self.q = q

    def run(self):
        while True:
            with self.con:
                while self.q._qsize() < 20:
                    self.con.wait()
                    time.sleep(2)
                for i in range(3):
                    self.q.get()
                print("{}: 消費了3個包子。".format(current_thread().name))
                self.con.notify()

if __name__ == __main__:
    q = Queue()
    con = Condition()
    t_list = []
    for i in range(4):
        t = Producer(con, q)
        t_list.append(t)
    for i in range(10):
        t = Consumer(con, q)
        t_list.append(t)
    for t in t_list: t.start()
    for t in t_list: t.join()

  5、線程池

  concurrent.futures實現了線程池。concurrent.futures提供了一致線程和進程的接口。 

  來一個簡單的例子。

from concurrent.futures import ThreadPoolExecutor
import time

def fn(num):
    print(num)
    time.sleep(2)

if __name__ == __main__:
    executor = ThreadPoolExecutor(max_workers=2)  # 創建一個線程池
    executor.submit(fn, 100)
    executor.submit(fn, 200)   # 提交執行,第一個參數是函數,第二個參數是函數的參數

  官方示例:

import concurrent.futures
import urllib.request

URLS = [http://www.foxnews.com/,
        http://www.cnn.com/,
        http://europe.wsj.com/,
        http://www.bbc.co.uk/,
        http://some-made-up-domain.com/]

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print(%r generated an exception: %s % (url, exc))
        else:
            print(%r page is %d bytes % (url, len(data)))

  現在來改寫上一章節多進程爬取天龍八部小說的代碼。

from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Thread
from queue import Queue
import urllib
from bs4 import BeautifulSoup

class UrlMaker(Thread):
    def __init__(self, number):
        super().__init__()
        self.number = number
    def run(self):
        num = 2024354
        for i in range(self.number):
            url = "https://www.ybdu.com/xiaoshuo/10/10237/{}.html".format(num)
            q.put(url)
            print(url)
            num += 1
        q.put("over")

def urlParser(file):
    url = q.get()  # 從列表中獲取url
    if url == "over":
        return {
            "code": False,
            "url": False,
        }
    else:
        html = urllib.request.urlopen(url)  # 請求html
        html_bytes = html.read()  # 讀取字節數據
        soup = BeautifulSoup(html_bytes, "html.parser")
        title = soup.find("div", attrs={"class": "h1title"}).h1.get_text()
        string = soup.find("div", attrs={"class": "contentbox", "id": "htmlContent"}).get_text()  # 獲取小說內容
        lines = string.split()
        with open(file, mode="a", encoding="utf-8") as f:  # 寫入文件
            f.write(title + "\r\n")
            for i, line in enumerate(lines[: -6]):
                f.write("   " + line + "\r\n")
    return {
        "code": True,
        "url": url,
        "title": title,
    }

def callback(msg):
    if msg["code"]:
        print("Process handled url: {}, title: {}.".format(msg["url"], msg["title"]))
    else:
        print("All urls had parsed.")


if __name__ == __main__:
    q = Queue()
    executor = ThreadPoolExecutor(max_workers=5)
    p1 = UrlMaker(10)
    p1.start()
    p1.join()
    task_list = []
    for i in range(20):
        task = executor.submit(urlParser, "天龍八部1.txt")
        task_list.append(task)

    for task in as_completed(task_list):   # as_completed是任務執行後的一些數據的封裝
        data = task.result()  # 獲取執行結果
        print("Task: {} , data: {}.".format(task, data["url"])) 

python(十一)、線程