1. 程式人生 > >Python多執行緒的原理與實現

Python多執行緒的原理與實現

Python多執行緒原理與實戰

目的:

(1)瞭解python執行緒執行原理

(2)掌握多執行緒程式設計與執行緒同步

(3)瞭解執行緒池的使用

1 執行緒基本概念

1.1 執行緒是什麼?

執行緒是指程序內的一個執行單元,也是程序內的可排程實體.

與程序的區別:
(1) 地址空間:程序內的一個執行單元;程序至少有一個執行緒;它們共享程序的地址空間;而程序有自己獨立的地址空間;
(2) 資源擁有:程序是資源分配和擁有的單位,同一個程序內的執行緒共享程序的資源
(3) 執行緒是處理器排程的基本單位,但程序不是.
(4) 二者均可併發執行.

簡而言之,一個程式至少有一個程序,一個程序至少有一個執行緒.

執行緒的劃分尺度小於程序,使得多執行緒程式的併發性高。
另外,程序在執行過程中擁有獨立的記憶體單元,而多個執行緒共享記憶體,從而極大地提高了程式的執行效率。

1.2 執行緒和程序關係?

​ 程序就是一個應用程式在處理機上的一次執行過程,它是一個動態的概念,而執行緒是程序中的一部分,程序包含多個執行緒在執行。

​ 多執行緒可以共享全域性變數,多程序不能。多執行緒中,所有子執行緒的程序號相同;多程序中,不同的子程序程序號不同。

​ 程序是具有一定獨立功能的程式關於某個資料集合上的一次執行活動,程序是系統進行資源分配和排程的一個獨立單位.
​ 執行緒是程序的一個實體,是CPU排程和分派的基本單位,它是比程序更小的能獨立執行的基本單位.執行緒自己基本上不擁有系統資源,只擁有一點在執行中必不可少的資源(如程式計數器,一組暫存器和棧),但是它可與同屬一個程序的其他的執行緒共享程序所擁有的全部資源.
​ 一個執行緒可以建立和撤銷另一個執行緒;同一個程序中的多個執行緒之間可以併發執行.

2 Python執行緒模組

​ python主要是通過thread和threading這兩個模組來實現多執行緒支援。python的thread模組是比較底層的模組,python的threading模組是對thread做了一些封裝,可以更加方便的被使用。但是python(cpython)由於GIL的存在無法使用threading充分利用CPU資源,如果想充分發揮多核CPU的計算能力需要使用multiprocessing模組(Windows下使用會有諸多問題)。

2.1 如何建立執行緒

​ python3.x中已經摒棄了Python2.x中採用函式式thread模組中的start_new_thread()函式來產生新執行緒方式。

​ python3.x中通過threading模組建立新的執行緒有兩種方法:一種是通過threading.Thread(Target=executable Method)-即傳遞給Thread物件一個可執行方法(或物件);第二種是繼承threading.Thread定義子類並重寫run()方法。第二種方法中,唯一必須重寫的方法是run()

(1)通過threading.Thread進行建立多執行緒

import threading
import time
def target():
    print("the current threading %s is runing"
       %(threading.current_thread().name))
    time.sleep(1)
    print("the current threading %s is ended"%(threading.current_thread().name))

print("the current threading %s is runing"%(threading.current_thread().name))
## 屬於執行緒t的部分
t = threading.Thread(target=target)
t.start()
## 屬於執行緒t的部分
t.join() # join是阻塞當前執行緒(此處的當前執行緒時主執行緒) 主執行緒直到Thread-1結束之後才結束
print("the current threading %s is ended"%(threading.current_thread().name))

(2)通過繼承threading.Thread定義子類建立多執行緒

​ 使用Threading模組建立執行緒,直接從threading.Thread繼承,然後重寫init方法和run方法:

import threading
import time

class myThread(threading.Thread):  # 繼承父類threading.Thread
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter

   def run(self):  # 把要執行的程式碼寫到run函式裡面 執行緒在建立後會直接執行run函式
      print("Starting " + self.name)
      print_time(self.name, self.counter, 5)
      print("Exiting " + self.name)


def print_time(threadName, delay, counter):
   while counter:
      time.sleep(delay)
      print("%s process at: %s" % (threadName, time.ctime(time.time())))
      counter -= 1


# 建立新執行緒
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)

# 開啟執行緒
thread1.start()
thread2.start()

# 等待執行緒結束
thread1.join()
thread2.join()

print("Exiting Main Thread")

通過以上案例可以知道,thread1和thread2執行順序是亂序的。要使之有序,需要進行執行緒同步

3 執行緒間同步

​ 如果多個執行緒共同對某個資料修改,則可能出現不可預料的結果,為了保證資料的正確性,需要對多個執行緒進行同步。

​ 使用Thread物件的Lock和Rlock可以實現簡單的執行緒同步,這兩個物件都有acquire方法和release方法,對於那些需要每次只允許一個執行緒操作的資料,可以將其操作放到acquire和release方法之間。

​ 需要注意的是,Python有一個GIL(Global Interpreter Lock)機制,任何執行緒在執行之前必須獲取這個全域性鎖才能執行,每當執行完100條位元組碼,全域性鎖才會釋放,切換到其他執行緒執行。

3.1 執行緒同步問題

多執行緒實現同步有四種方式:

鎖機制,訊號量,條件判斷和同步佇列。

下面我主要關注兩種同步機制:鎖機制和同步佇列。

(1)鎖機制

threading的Lock類,用該類的acquire函式進行加鎖,用realease函式進行解鎖

import threading
import time
class myThread(threading.Thread):
   def __init__(self, threadID, name, counter):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.counter = counter
   def run(self):
      print("Starting " + self.name)
      # 獲得鎖,成功獲得鎖定後返回True
      # 可選的timeout引數不填時將一直阻塞直到獲得鎖定
      # 否則超時後將返回False
      threadLock.acquire()
      print_time(self.name, self.counter, 5)
      # 釋放鎖
      threadLock.release()
def print_time(threadName, delay, counter):
   while counter:
      time.sleep(delay)
      print("%s: %s" % (threadName, time.ctime(time.time())))
      counter -= 1

threadLock = threading.Lock()
threads = []
# 建立新執行緒
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 開啟新執行緒
thread1.start()
thread2.start()
# 新增執行緒到執行緒列表
threads.append(thread1)
threads.append(thread2)
# 等待所有執行緒完成
for t in threads:
   t.join()

print("Exiting Main Thread")

(2) 執行緒同步佇列queue

python2.x中提供的Queue, Python3.x中提供的是queue

見import queue.

Python的queue模組中提供了同步的、執行緒安全的佇列類,包括FIFO(先入先出)佇列Queue,LIFO(後入先出)佇列LifoQueue,和優先順序佇列PriorityQueue。這些佇列都實現了鎖原語,能夠在多執行緒中直接使用。可以使用佇列來實現執行緒間的同步。

queue模組中的常用方法:

  • queue.qsize() 返回佇列的大小
  • queue.empty() 如果佇列為空,返回True,反之False
  • queue.full() 如果佇列滿了,返回True,反之False
  • queue.full 與 maxsize 大小對應
  • queue.get([block[, timeout]])獲取佇列,timeout等待時間
  • queue.get_nowait() 相當Queue.get(False)
  • queue.put(item) 寫入佇列,timeout等待時間
  • queue.put_nowait(item) 相當Queue.put(item, False)
  • queue.task_done() 在完成一項工作之後,Queue.task_done()函式向任務已經完成的佇列傳送一個訊號
  • queue.join() 實際上意味著等到佇列為空,再執行別的操作

案例1:

import queue
import threading
import time

exitFlag = 0

class myThread(threading.Thread):
   def __init__(self, threadID, name, q):
      threading.Thread.__init__(self)
      self.threadID = threadID
      self.name = name
      self.q = q

   def run(self):
      print("Starting " + self.name)
      process_data(self.name, self.q)
      print("Exiting " + self.name)

def process_data(threadName, q):
   while not exitFlag:
      queueLock.acquire()
      if not workQueue.empty():
         data = q.get()
         queueLock.release()
         print("%s processing %s" % (threadName, data))
      else:
         queueLock.release()
      time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1

# 建立新執行緒
for tName in threadList:
   thread = myThread(threadID, tName, workQueue)
   thread.start()
   threads.append(thread)
   threadID += 1

# 填充佇列
queueLock.acquire()
for word in nameList:
   workQueue.put(word)
queueLock.release()

# 等待佇列清空
while not workQueue.empty():
   pass

# 通知執行緒是時候退出
exitFlag = 1

# 等待所有執行緒完成
for t in threads:
   t.join()
print("Exiting Main Thread")

案例2:

import time
import threading
import queue

class Worker(threading.Thread):
    def __init__(self, name, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.start()    #執行run()

    def run(self):
        #迴圈,保證接著跑下一個任務
        while True:
            # 佇列為空則退出執行緒
            if self.queue.empty():
                break
            # 獲取一個佇列資料
            foo = self.queue.get()
            # 延時1S模擬你要做的事情
            time.sleep(1)
            # 列印
            print(self.getName() + " process " + str(foo))
            # 任務完成
            self.queue.task_done()


# 佇列
queue = queue.Queue()
# 加入100個任務佇列
for i in range(100):
    queue.put(i)
# 開10個執行緒
for i in range(10):
    threadName = 'Thread' + str(i)
    Worker(threadName, queue)
# 所有執行緒執行完畢後關閉
queue.join()

4 執行緒池

傳統多執行緒問題?

​ 傳統多執行緒方案會使用“即時建立, 即時銷燬”的策略。儘管與建立程序相比,建立執行緒的時間已經大大的縮短,但是如果提交給執行緒的任務是執行時間較短,而且執行次數極其頻繁,那麼伺服器將處於不停的建立執行緒,銷燬執行緒的狀態。

​ 一個執行緒的執行時間可以分為3部分:執行緒的啟動時間、執行緒體的執行時間和執行緒的銷燬時間。在多執行緒處理的情景中,如果執行緒不能被重用,就意味著每次建立都需要經過啟動、銷燬和執行3個過程。這必然會增加系統相應的時間,降低了效率。

有沒有一種高效的解決方案呢? —— 執行緒池

執行緒池基本原理:

​ 我們把任務放進佇列中去,然後開N個執行緒,每個執行緒都去佇列中取一個任務,執行完了之後告訴系統說我執行完了,然後接著去佇列中取下一個任務,直至佇列中所有任務取空,退出執行緒。

使用執行緒池:
​ 由於執行緒預先被建立並放入執行緒池中,同時處理完當前任務之後並不銷燬而是被安排處理下一個任務,因此能夠避免多次建立執行緒,從而節省執行緒建立和銷燬的開銷,能帶來更好的效能和系統穩定性。

multhreading-struct

執行緒池要設定為多少?

伺服器CPU核數有限,能夠同時併發的執行緒數有限,並不是開得越多越好,以及執行緒切換是有開銷的,如果執行緒切換過於頻繁,反而會使效能降低

執行緒執行過程中,計算時間分為兩部分:

  • CPU計算,佔用CPU
  • 不需要CPU計算,不佔用CPU,等待IO返回,比如recv(), accept(), sleep()等操作,具體操作就是比如
    訪問cache、RPC呼叫下游service、訪問DB,等需要網路呼叫的操作

那麼如果計算時間佔50%, 等待時間50%,那麼為了利用率達到最高,可以開2個執行緒:
假如工作時間是2秒, CPU計算完1秒後,執行緒等待IO的時候需要1秒,此時CPU空閒了,這時就可以切換到另外一個執行緒,讓CPU工作1秒後,執行緒等待IO需要1秒,此時CPU又可以切回去,第一個執行緒這時剛好完成了1秒的IO等待,可以讓CPU繼續工作,就這樣迴圈的在兩個執行緒之前切換操作。

那麼如果計算時間佔20%, 等待時間80%,那麼為了利用率達到最高,可以開5個執行緒:
可以想象成完成任務需要5秒,CPU佔用1秒,等待時間4秒,CPU線上程等待時,可以同時再啟用4個執行緒,這樣就把CPU和IO等待時間,最大化的重疊起來

抽象一下,計算執行緒數設定的公式就是:
N核伺服器,通過執行業務的單執行緒分析出本地計算時間為x,等待時間為y,則工作執行緒數(執行緒池執行緒數)設定為 N*(x+y)/x,能讓CPU的利用率最大化。
由於有GIL的影響,python只能使用到1個核,所以這裡設定N=1

import queue
import threading
import time

# 宣告執行緒池管理類
class WorkManager(object):
   def __init__(self, work_num=1000, thread_num=2):
      self.work_queue = queue.Queue()  # 任務佇列
      self.threads = []  # 執行緒池
      self.__init_work_queue(work_num)  # 初始化任務佇列,新增任務
      self.__init_thread_pool(thread_num) # 初始化執行緒池,建立執行緒

   """
      初始化執行緒池
   """
   def __init_thread_pool(self, thread_num):
      for i in range(thread_num):
         # 建立工作執行緒(執行緒池中的物件)
         self.threads.append(Work(self.work_queue))


   """
      初始化工作佇列
   """
   def __init_work_queue(self, jobs_num):
      for i in range(jobs_num):
         self.add_job(do_job, i)

   """
      新增一項工作入隊
   """
   def add_job(self, func, *args):
      self.work_queue.put((func, list(args)))  # 任務入隊,Queue內部實現了同步機制

   """
      等待所有執行緒執行完畢
   """
   def wait_allcomplete(self):
      for item in self.threads:
         if item.isAlive(): item.join()


class Work(threading.Thread):
   def __init__(self, work_queue):
      threading.Thread.__init__(self)
      self.work_queue = work_queue
      self.start()

   def run(self):
      # 死迴圈,從而讓建立的執行緒在一定條件下關閉退出
      while True:
         try:
            do, args = self.work_queue.get(block=False)  # 任務異步出隊,Queue內部實現了同步機制
            do(args)
            self.work_queue.task_done()  # 通知系統任務完成
         except:
            break

# 具體要做的任務
def do_job(args):
   time.sleep(0.1)  # 模擬處理時間
   print(threading.current_thread())
   print(list(args))


if __name__ == '__main__':
   start = time.time()
   work_manager = WorkManager(100, 10)  # 或者work_manager =  WorkManager(10000, 20)
   work_manager.wait_allcomplete()
   end = time.time()
   print("cost all time: %s" % (end - start))

程序石油系統分配資源、執行緒是由CPU排程、協程由使用者控制

5 協程

​ 在python GIL之下,同一時刻只能有一個執行緒在執行,那麼對於CPU計算密集的程式來說,執行緒之間的切換開銷就成了拖累,而以I/O為瓶頸的程式正是協程所擅長的:

Python中的協程經歷了很長的一段發展歷程。其大概經歷瞭如下三個階段:

  1. 最初的生成器變形yield/send
  2. 引入@asyncio.coroutine和yield from
  3. 在最近的Python3.5版本中引入async/await關鍵字

(1)從yield說起

先看一段普通的計算斐波那契續列的程式碼

def fibs(n):
   res = [0] * n
   index = 0
   a = 0
   b = 1
   while index < n:
      res[index] = b
      a, b = b, a + b
      index += 1
   return res


for fib_res in fibs(20):
   print(fib_res)

​ 如果我們僅僅是需要拿到斐波那契序列的第n位,或者僅僅是希望依此產生斐波那契序列,那麼上面這種傳統方式就會比較耗費記憶體。

這時,yield就派上用場了。

def fib(n):
   index = 0
   a = 0
   b = 1
   while index < n:
      yield b
      a, b = b, a + b
      index += 1

for fib_res in fib(20):
   print(fib_res)

​ 當一個函式中包含yield語句時,python會自動將其識別為一個生成器。這時fib(20)並不會真正呼叫函式體,而是以函式體生成了一個生成器物件例項。

​ yield在這裡可以保留fib函式的計算現場,暫停fib的計算並將b返回。而將fib放入for…in迴圈中時,每次迴圈都會呼叫next(fib(20)),喚醒生成器,執行到下一個yield語句處,直到丟擲StopIteration異常。此異常會被for迴圈捕獲,導致跳出迴圈。

(2) Send來了

​ 從上面的程式中可以看到,目前只有資料從fib(20)中通過yield流向外面的for迴圈;如果可以向fib(20)傳送資料,那不是就可以在Python中實現協程了嘛。

​ 於是,Python中的生成器有了send函式,yield表示式也擁有了返回值。

​ 我們用這個特性,模擬一個慢速斐波那契數列的計算:

import time
import random

def stupid_fib(n):
   index = 0
   a = 0
   b = 1
   while index < n:
      sleep_cnt = yield b
      print('let me think {0} secs'.format(sleep_cnt))
      time.sleep(sleep_cnt)
      a, b = b, a + b
      index += 1


print('-' * 10 + 'test yield send' + '-' * 10)
N = 20
sfib = stupid_fib(N)
fib_res = next(sfib) #第一次必須要執行next()函式,讓程式控制到yield b 位置
while True:
   print(fib_res)
   try:
      fib_res = sfib.send(random.uniform(0, 0.5))
   except StopIteration:
      break

python 進行併發程式設計

​ 在Python 2的時代,高效能的網路程式設計主要是使用Twisted、Tornado和Gevent這三個庫,但是它們的非同步程式碼相互之間既不相容也不能移植。

​ asyncio是Python 3.4版本引入的標準庫,直接內建了對非同步IO的支援。

asyncio的程式設計模型就是一個訊息迴圈。我們從asyncio模組中直接獲取一個EventLoop的引用,然後把需要執行的協程扔到EventLoop中執行,就實現了非同步IO。

​ Python的在3.4中引入了協程的概念,可是這個還是以生成器物件為基礎。

​ Python 3.5添加了async和await這兩個關鍵字,分別用來替換asyncio.coroutineyield from

​ python3.5則確定了協程的語法。下面將簡單介紹asyncio的使用。實現協程的不僅僅是asyncio,tornado和gevent, vloop都實現了類似的功能。

(1)協程定義

asyncio實現Hello world程式碼如下:

import asyncio

@asyncio.coroutine
def hello():
    print("Hello world!")
    # 非同步呼叫asyncio.sleep(1)-->協程函式:
    r = yield from asyncio.sleep(1)  #此處為另外一個協程,不是休眠
    print("Hello again!")

# 獲取EventLoop(事件迴圈器):
loop = asyncio.get_event_loop()
# 執行coroutine
loop.run_until_complete(hello())
loop.close()

​ @asyncio.coroutine把一個generator標記為coroutine型別,然後,我們就把這個coroutine扔到EventLoop中執行。 hello()會首先打印出Hello world!,然後,yield from語法可以讓我們方便地呼叫另一個generator。由於asyncio.sleep()也是一個coroutine,所以執行緒不會等待asyncio.sleep(),而是直接中斷並執行下一個訊息迴圈。當asyncio.sleep()返回時,執行緒就可以從yield from拿到返回值(此處是None),然後接著執行下一行語句。

​ 把asyncio.sleep(1)看成是一個耗時1秒的IO操作,在此期間,主執行緒並未等待,而是去執行EventLoop中其他可以執行的coroutine了,因此可以實現併發執行。

我們用Task封裝兩個coroutine試試:

import threading
import asyncio

@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

觀察執行過程:

Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(暫停約1秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)

由列印的當前執行緒名稱可以看出,兩個coroutine是由同一個執行緒併發執行的。

如果把asyncio.sleep()換成真正的IO操作,則多個coroutine就可以由一個執行緒併發執行。

asyncio案例實戰

我們用asyncio的非同步網路連線來獲取sina、sohu和163的網站首頁:

async_wget.py

import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80) #等待開啟host:80埠
    reader, writer = yield from connect #開始連結。如果連線成功,則返回Reader和寫writer的操作物件
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

結果資訊如下:

wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段時間)
(打印出sohu的header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(打印出sina的header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(打印出163的header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
...

可見3個連線由一個執行緒通過coroutine併發完成。

(3) 使用async/await

import asyncio
import re

async def browser(host, port=80):
    # 連線host
    reader, writer = await asyncio.open_connection(host, port)
    print(host, port, '連線成功!')

    # 發起 / 主頁請求(HTTP協議)
    # 傳送請求頭必須是兩個空行
    index_get = 'GET {} HTTP/1.1\r\nHost:{}\r\n\r\n'.format('/', host)
    writer.write(index_get.encode())

    await writer.drain()  # 等待向連線寫完資料(請求傳送完成)

    # 開始讀取響應的資料報頭
    while True:
        line = await reader.readline()  # 等待讀取響應資料
        if line == b'\r\n':
            break

        print(host, '<header>', line)

    # 讀取響應的資料body
    body = await reader.read()
    print(encoding)
    print(host, '<content>', body)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()

    tasks = [browser(host) for host in ['www.dushu.com', 'www.sina.com.cn', 'www.baidu.com']]

    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

    print('---over---')

小結

asyncio提供了完善的非同步IO支援;

非同步操作需要在coroutine中通過yield from完成;

多個coroutine可以封裝成一組Task然後併發執行。