1. 程式人生 > >7. 多執行緒併發相關問題和解決技巧

7. 多執行緒併發相關問題和解決技巧

一. 如何使用多執行緒

實際案例

https://intrinio.com/tutorial/web_api

我們通過上述網站提供的API獲取了股市資訊的CSV資料,
現在要下載大量CSV資料檔案, 並將其轉化為xml檔案

如何使用執行緒來提高下載並處理的效率?

解決方案

  • 使用標準庫 threading.Thread 建立執行緒,在每一個執行緒中下載並轉換一隻股票資料
import requests
import base64
from io import StringIO
import csv
from xml.etree.ElementTree import ElementTree, Element, SubElement

USERNAME = b'7f304a2df40829cd4f1b17d10cda0304'
PASSWORD = b'aff978c42479491f9541ace709081b99'

def download_csv(page_number):
    print('download csv data [page=%s]' % page_number)
    url = "http://api.intrinio.com/prices.csv?ticker=AAPL&hide_paging=true&page_size=200&page_number=%s" % page_number
    auth = b'Basic ' + base64.b64encode(b'%s:%s' % (USERNAME, PASSWORD))
    headers = {'Authorization' : auth}
    response = requests.get(url, headers=headers)

    if response.ok:
        return StringIO(response.text)

def csv_to_xml(csv_file, xml_path):
    print('Convert csv data to %s' % xml_path)
    reader = csv.reader(csv_file)
    headers = next(reader)

    root = Element('Data')
    root.text = '\n\t'
    root.tail = '\n'

    for row in reader:
        book = SubElement(root, 'Row')
        book.text = '\n\t\t'
        book.tail = '\n\t'

        for tag, text in zip(headers, row):
            e = SubElement(book, tag)
            e.text = text
            e.tail = '\n\t\t'
        e.tail = '\n\t'

    ElementTree(root).write(xml_path, encoding='utf8')

def download_and_save(page_number, xml_path):
    # IO
    csv_file = None
    while not csv_file:
        csv_file = download_csv(page_number)
    # CPU
    csv_to_xml(csv_file, 'data%s.xml' % page_number)

from threading import Thread
class MyThread(Thread):
    def __init__(self, page_number, xml_path):
        super().__init__()
        self.page_number = page_number
        self.xml_path = xml_path

    def run(self):
        download_and_save(self.page_number, self.xml_path)

if __name__ == '__main__':
    import time
    t0 = time.time()
    thread_list = []
    for i in range(1, 6):
        t = MyThread(i, 'data%s.xml' % i)
      # t = Thread(target=download_and_save, args=(1, 'data%s.xml' % i))

        t.start()
        thread_list.append(t)

    for t in thread_list:
        t.join()    # 主執行緒 join等待  子執行緒完成

    print(time.time() - t0)
    print('main thread end.')

二. 如何執行緒中通訊

實際案例

在上一節中,  我們從 intrinio.com 下載多隻股票的csv資料, 並將其轉換為 xml 檔案

在python中由於全域性直譯器鎖 (GIL)的存在, 多執行緒CPU密集型操作並不能提高執行效率, 我們修改程式架構:

1. 使用多個 DownloadThread 執行緒進行下載(I/O)
2. 使用多個 ConvertThread  執行緒進行轉換(CPU)
3. 下載執行緒把下載資料安全地傳遞給轉換執行緒

GIL

  • 在每個程序中, 存在一把GIL, 該程序中的執行緒間共享GIL
  • 多執行緒進行時, 只有有GIL的那個執行緒能執行
  • 通過執行緒間快速 傳遞GIL, 達到表象上的多執行緒, 其實同一時間只有一個執行緒在工作

解決方案

  • 使用標準庫中的 queue.Queue, 它是一個執行緒安全的佇列

  1. Download 執行緒把下載資料放入佇列
  2. Convert 執行緒從佇列裡提取資料
import requests
import base64
from io import StringIO
import csv
from xml.etree.ElementTree import ElementTree, Element, SubElement
from threading import Thread

USERNAME = b'7f304a2df40829cd4f1b17d10cda0304'
PASSWORD = b'aff978c42479491f9541ace709081b99'

# class MyThread(Thread):
#     def __init__(self, page_number, xml_path):
#         super().__init__()
#         self.page_number = page_number
#         self.xml_path = xml_path
#
#     def run(self):
#         download_and_save(self.page_number, self.xml_path)

class DownloadThread(Thread):
    def __init__(self, page_number, queue):
        super().__init__()
        self.page_number = page_number
        self.queue = queue

    def run(self):
        csv_file = None
        while not csv_file:
            csv_file = self.download_csv(self.page_number)
        self.queue.put((self.page_number, csv_file))   # 放入佇列

    def download_csv(self, page_number):
        print('download csv data [page=%s]' % page_number)
        url = "http://api.intrinio.com/prices.csv?ticker=AAPL&hide_paging=true&page_size=200&page_number=%s" % page_number
        auth = b'Basic ' + base64.b64encode(b'%s:%s' % (USERNAME, PASSWORD))
        headers = {'Authorization' : auth}
        response = requests.get(url, headers=headers)

        if response.ok:
            return StringIO(response.text)

class ConvertThread(Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue

    def run(self):
        while True:  # 無限迴圈地從佇列中  獲取資料
            page_number, csv_file = self.queue.get()
            self.csv_to_xml(csv_file, 'data%s.xml' % page_number)

    def csv_to_xml(self, csv_file, xml_path):
        print('Convert csv data to %s' % xml_path)
        reader = csv.reader(csv_file)
        headers = next(reader)

        root = Element('Data')
        root.text = '\n\t'
        root.tail = '\n'

        for row in reader:
            book = SubElement(root, 'Row')
            book.text = '\n\t\t'
            book.tail = '\n\t'

            for tag, text in zip(headers, row):
                e = SubElement(book, tag)
                e.text = text
                e.tail = '\n\t\t'
            e.tail = '\n\t'

        ElementTree(root).write(xml_path, encoding='utf8')


from queue import Queue   # collections中的queue 不帶鎖, 不是執行緒安全的

if __name__ == '__main__':
    queue = Queue()  # 佇列例項
    thread_list = []
    for i in range(1, 6):
        t = DownloadThread(i, queue)
        t.start()
        thread_list.append(t)

    convert_thread = ConvertThread(queue)
    convert_thread.start()

    for t in thread_list:
        t.join()
    print('main thread end.')

三. 如何線上程間進行事件通知?

實際案例

在上一節中, 我們從 intrinio.com 下載多隻股票的 csv 資料, 並將其轉換為 xml 檔案。


額外需求:
實現一個執行緒 TarThread, 將轉換出的 xml 檔案壓縮打包, 比如轉換執行緒每生產出100個 xml 檔案, 
就通知打包執行緒將它們打包成一個 xxx.tgz 檔案, 並刪除 xml 檔案。 
打包完成後, 打包執行緒反過來通知轉換執行緒, 轉換執行緒繼續轉換。

解決方案

執行緒間的事件通知, 可以使用標準庫中 Threading.Event

  1. 等待事件一端呼叫wait, 等待事件
  2. 通知事件一端呼叫 set, 通知事件

程式碼

import requests
import base64
from io import StringIO
import csv
from xml.etree.ElementTree import ElementTree, Element, SubElement
from threading import Thread
from queue import Queue
import tarfile
import os

USERNAME = b'7f304a2df40829cd4f1b17d10cda0304'
PASSWORD = b'aff978c42479491f9541ace709081b99'

class DownloadThread(Thread):
    def __init__(self, page_number, queue):
        super().__init__()
        self.page_number = page_number
        self.queue = queue

    def run(self):
        csv_file = None
        while not csv_file:
            csv_file = self.download_csv(self.page_number)
        self.queue.put((self.page_number, csv_file))

    def download_csv(self, page_number):
        print('download csv data [page=%s]' % page_number)
        url = "http://api.intrinio.com/prices.csv?ticker=AAPL&hide_paging=true&page_size=100&page_number=%s" % page_number
        auth = b'Basic ' + base64.b64encode(b'%s:%s' % (USERNAME, PASSWORD))
        headers = {'Authorization' : auth}
        response = requests.get(url, headers=headers)

        if response.ok:
            return StringIO(response.text)

class ConvertThread(Thread):
    def __init__(self, queue, c_event, t_event):
        super().__init__()
        self.queue = queue
        self.c_event = c_event
        self.t_event = t_event

    def run(self):
        count = 0
        while True:
            page_number, csv_file = self.queue.get()
            if page_number == -1:
                self.c_event.set()
                self.t_event.wait()
                break

            self.csv_to_xml(csv_file, 'data%s.xml' % page_number)
            count += 1
            if count == 2: # 有了2個xml檔案, 就通知開始打包
                count = 0
                # 通知轉換完成
                self.c_event.set()
                
                # 等待打包完成
                self.t_event.wait()
                self.t_event.clear()
                

    def csv_to_xml(self, csv_file, xml_path):
        print('Convert csv data to %s' % xml_path)
        reader = csv.reader(csv_file)
        headers = next(reader)

        root = Element('Data')
        root.text = '\n\t'
        root.tail = '\n'

        for row in reader:
            book = SubElement(root, 'Row')
            book.text = '\n\t\t'
            book.tail = '\n\t'

            for tag, text in zip(headers, row):
                e = SubElement(book, tag)
                e.text = text
                e.tail = '\n\t\t'
            e.tail = '\n\t'

        ElementTree(root).write(xml_path, encoding='utf8')

class TarThread(Thread):  # 守護執行緒
    def __init__(self, c_event, t_event):
        super().__init__(daemon=True)   # daemon 表示 是 守護執行緒, 主執行緒結束時, 該執行緒自動結束
        self.count = 0
        self.c_event = c_event
        self.t_event = t_event

    def run(self):
        while True:
            # 等待足夠的xml
            self.c_event.wait()   # 等待 轉換完成的 通知
            self.c_event.clear()  # 為了 下次還能使用event 要clear
            
            print('DEBUG')
            # 打包
            self.tar_xml()

            # 通知打包完成
            self.t_event.set()

    def tar_xml(self):
        self.count += 1
        tfname = 'data%s.tgz' % self.count
        print('tar %s...' % tfname)
        tf = tarfile.open(tfname, 'w:gz')
        for fname in os.listdir('.'):
            if fname.endswith('.xml'):
                tf.add(fname)
                os.remove(fname)
        tf.close()

        if not tf.members:
            os.remove(tfname)

from threading import Event

if __name__ == '__main__':
    queue = Queue()
    c_event= Event()  # 轉換事件
    t_event= Event()  # 打包事件
    thread_list = []
    for i in range(1, 15):
        t = DownloadThread(i, queue)
        t.start()
        thread_list.append(t)

    convert_thread = ConvertThread(queue, c_event, t_event)
    convert_thread.start()

    tar_thread = TarThread(c_event, t_event)
    tar_thread.start()
    
    # 等待下載執行緒結束
    for t in thread_list:
        t.join()

    # 通知Convert執行緒退出
    queue.put((-1, None))

    # 等待轉換執行緒結束
    convert_thread.join()
    print('main thread end.')


四.如何使用執行緒本地資料?

實際案例

我們實現了一個 web 視訊監控伺服器, 伺服器端採集攝像頭資料,客戶端使用瀏覽器通過 http 請求接收資料。
伺服器使用推送的方式 (multipart/x-mixed-replace) 一直使用一個 tcp 連線向客戶端傳遞資料。
這種方式將持續佔用一個執行緒, 導致單執行緒伺服器無法處理多客戶端請求。

改寫程式, 在每個執行緒中處理一個客戶端請求, 支援多客戶端訪問。

解決方案

  • threading.local 函式可以建立執行緒本地資料空間, 其下屬性對每個執行緒獨立存在

五. 如何使用執行緒池?

實際案例

上一節實現了一個多執行緒 web 視訊監控伺服器,
由於我們伺服器資源有限(CPU,記憶體, 頻寬), 
需要對請求連線數(執行緒數)做限制, 避免因資源耗盡而癱瘓。

可以使用執行緒池,替代原來的每次請求建立執行緒。
即 提前建立固定數量的執行緒, 用的時候去裡面取, 避免建立過多的執行緒。

解決方法

使用標準庫中concurrent.futures 下的ThreadPoolExecutor 
物件的 submit和map方法可以用來啟動執行緒池中執行緒執行任務

使用方法

import threading, time, random

def f(a, b):
  print(threading.current_thread().name, ":", a, b)
  time.sleep(random.randint(5, 10))
  return a*b

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(3)
future = executor.submit(f, 2, 3)   # 會返回future物件

future.reslut()  # 可以得到6  , 但這是一個阻塞 函式, 他會等的愛   f執行完


# 讓執行緒池中的 每個執行緒都執行 f函式   可以使用map 實現

executor.map(f, range(1,6), range(2,7))  # 即1,2    2,3    3,4  。。。。。為引數

# map返回生成器, 可以用list包裹, 得到每個執行緒的結果
list(executor.map(f, range(1,6), range(2,7)))   # [2,6,12,20,30]

綜合實現web監控伺服器

import os, cv2, time, struct, threading
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import TCPServer, ThreadingTCPServer
from threading import Thread, RLock
from select import select

class JpegStreamer(Thread):
    def __init__(self, camera):
        super().__init__()
        self.cap = cv2.VideoCapture(camera)
        self.lock = RLock()
        self.pipes = {}

    def register(self):
        pr, pw = os.pipe()
        self.lock.acquire()
        self.pipes[pr] = pw
        self.lock.release()
        return pr

    def unregister(self, pr):
        self.lock.acquire()
        pw = self.pipes.pop(pr)
        self.lock.release()
        os.close(pr)
        os.close(pw)

    def capture(self):
        cap = self.cap
        while cap.isOpened():
            ret, frame = cap.read()
            if ret:
                ret, data = cv2.imencode('.jpg', frame, (cv2.IMWRITE_JPEG_QUALITY, 40))
                yield data.tostring()

    def send_frame(self, frame):
        n = struct.pack('l', len(frame))
        self.lock.acquire()
        if len(self.pipes):
            _, pipes, _ = select([], self.pipes.values(), [], 1)
            for pipe in pipes:
                os.write(pipe, n)
                os.write(pipe, frame)
        self.lock.release()

    def run(self):
        for frame in self.capture():
            self.send_frame(frame)

class JpegRetriever:
    def __init__(self, streamer):
        self.streamer = streamer
        self.local = threading.local()

    def retrieve(self):
        while True:
            ns = os.read(self.local.pipe, 8)
            n = struct.unpack('l', ns)[0]
            data = os.read(self.local.pipe, n)
            yield data

    def __enter__(self):
        if hasattr(self.local, 'pipe'):
            raise RuntimeError()

        self.local.pipe = streamer.register()
        return self.retrieve()

    def __exit__(self, *args):
        self.streamer.unregister(self.local.pipe)
        del self.local.pipe
        return True

class WebHandler(BaseHTTPRequestHandler):
    retriever = None

    @staticmethod
    def set_retriever(retriever):
        WebHandler.retriever = retriever

    def do_GET(self):
        if self.retriever is None:
            raise RuntimeError('no retriver')

        if self.path != '/':
            return

        self.send_response(200) 
        self.send_header('Content-type', 'multipart/x-mixed-replace;boundary=jpeg_frame')
        self.end_headers()

        with self.retriever as frames:
            for frame in frames:
                self.send_frame(frame)

    def send_frame(self, frame):
        sh  = b'--jpeg_frame\r\n'
        sh += b'Content-Type: image/jpeg\r\n'
        sh += b'Content-Length: %d\r\n\r\n' % len(frame)
        self.wfile.write(sh)
        self.wfile.write(frame)

from concurrent.futures import ThreadPoolExecutor
class ThreadingPoolTCPServer(ThreadingTCPServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, thread_n=100):
        super().__init__(server_address, RequestHandlerClass, bind_and_activate=True)

        self.executor = ThreadPoolExecutor(thread_n)

    def process_request(self, request, client_address):
        self.executor.submit(self.process_request_thread, request, client_address)

if __name__ == '__main__':
    # 建立Streamer,開啟攝像頭採集。
    streamer = JpegStreamer(0)
    streamer.start()

    # http服務建立Retriever
    retriever = JpegRetriever(streamer)
    WebHandler.set_retriever(retriever)

    # 開啟http伺服器
    HOST = 'localhost'
    PORT = 9000
    print('Start server... (http://%s:%s)' % (HOST, PORT))
    httpd = ThreadingPoolTCPServer((HOST, PORT), WebHandler, thread_n=3)
    #httpd = ThreadingTCPServer((HOST, PORT), WebHandler)
    httpd.serve_forever()


六. 如何使用多程序

實際案例

由於python中全域性直譯器鎖GIL的存在, 在任意時刻只允許一個執行緒在直譯器中執行。
因此python的多執行緒不適合處理cpu密集型的任務。


想要處理cpu密集型的任務, 可以使用多程序模型。

解決方案

使用標準庫中的'multiprocessing.Process', 它可以啟動子程序執行任務。
操作介面, 程序間通訊, 程序間同步等都與'Threading.Thread'類似。

判斷是否為水仙花數

from threading import Thread
from multiprocessing import Process
from queue import Queue as Thread_Queue   # 執行緒的queue
from multiprocessing import Queue as Process_Queue   # 程序的queue

def is_armstrong(n):
    a, t = [], n
    while t:
        a.append(t % 10)
        t //= 10
    k = len(a)
    return sum(x ** k for x in a) == n

def find_armstrong(a, b, q=None):
    res = [x for x in range(a, b) if is_armstrong(x)]
    if q:
        q.put(res)
    return res

def find_by_thread(*ranges):
    q = Thread_Queue()
    workers = []
    for r in ranges:
        a, b = r
        t = Thread(target=find_armstrong, args=(a, b, q))
        t.start()
        workers.append(t)

    res = []
    for _ in range(len(ranges)):
        res.extend(q.get())

    return res

def find_by_process(*ranges):
    q = Process_Queue()
    workers = []
    for r in ranges:
        a, b = r
        t = Process(target=find_armstrong, args=(a, b, q))
        t.start()
        workers.append(t)

    res = []
    for _ in range(len(ranges)):
        res.extend(q.get())

    return res


    
if __name__ == '__main__':
    import time
    t0 = time.time()
    #res = find_by_thread([10000000, 15000000], [15000000, 20000000], 
    #                     [20000000, 25000000], [25000000, 30000000]) 
    res = find_by_process([10000000, 15000000], [15000000, 20000000], 
                         [20000000, 25000000], [25000000, 30000000]) 
    print(res)
    print(time.time() - t0)