1. 程式人生 > >python 中生產者與消費者 模式

python 中生產者與消費者 模式

生產者與消費者 模式

摘要: 最近一段時間,寫了一些生產者與消費者模型, 特此此文 作為總結. 文中總結了不同的 生產者與消費者的情況. 一個生產者, 多個消費者,與一個生產者,多個消費者 的程式設計模式.

一. 生產者與消費者

在軟體開發的過程中,經常碰到這樣的場景: 某些模組負責生產資料,這些資料由其他模組來負責處理(此處的模組可能是:函式、執行緒、程序等). 產生資料的模組稱為生產者,而處理資料的模組稱為消費者

二. 生產者與消費者模型的優點:

  • 解耦耦合
  • 併發
  • 閒忙不均
一.多執行緒實現生產者與消費者模式 例項
  1. 一個生產者與一個消費者
# !/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@author: Frank
@contact: 
[email protected]
@file: 18.py @time: 2018/7/10 下午8:59 一個簡單的demon 生產者與消費者 """ import time import queue import threading import random class Producer(threading.Thread): """ 只負責產生資料 """ FINISHED = True def __init__(self, name, queue): # python3的寫法 super().__init__(name=name) self.queue = queue def run(self): for i in range(10): print("%s is producing %d to the queue!" % (self.getName(), i)) self.queue.put(i) time.sleep(random.randint(1, 10) * 0.1) # 設定完成的標誌位 self.queue.put(self.FINISHED) print("%s finished!" % self.getName()) class Consumer(threading.Thread): """ 資料處理, 寫入到資料庫裡面 """ FINISHED = True def __init__(self, name, queue): super().__init__(name=name) self.queue = queue def run(self): while True: value = self.queue.get() # 用來退出執行緒 if value is self.FINISHED: break print("{} is consuming. {} in the queue is consumed!".format(self.getName(), value)) print("%s finished!" % self.getName()) if __name__ == '__main__': queue = queue.Queue() producer = Producer('producer', queue) consumer = Consumer('consumer', queue) producer.start() consumer.start() consumer.join() producer.join() print('All threads done')

結果如下:

producer is producing 0 to the queue!
consumer is consuming. 0 in the queue is consumed!
producer is producing 1 to the queue!
consumer is consuming. 1 in the queue is consumed!
producer is producing 2 to the queue!
consumer is consuming. 2 in the queue is consumed!
producer is producing 3 to the queue!
consumer is consuming. 3 in the queue is consumed!
producer is producing 4 to the queue!
consumer is consuming. 4 in the queue is consumed!
producer is producing 5 to the queue!
consumer is consuming. 5 in the queue is consumed!
producer is producing 6 to the queue!
consumer is consuming. 6 in the queue is consumed!
producer is producing 7 to the queue!
consumer is consuming. 7 in the queue is consumed!
producer is producing 8 to the queue!
consumer is consuming. 8 in the queue is consumed!
producer is producing 9 to the queue!
consumer is consuming. 9 in the queue is consumed!
producer finished!
consumer finished!
All threads  done

Process finished with exit code 0
FINISHED 通過這個屬性來通知消費者生產者已經完成生產. 相當於一個標誌,告訴消費者 生產者已經生產完成.
  1. 但是當有多個消費者就會出現問題. 看下面演示程式碼, 消費者就會阻塞在這裡 .
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@author: Frank 
@contact: [email protected]
@file: 18.py
@time: 2018/7/10 下午8:59

一個簡單的demon   生產者與消費者

"""
import time
from datetime import datetime, timedelta
import queue

import threading
import random

# 哨兵
_sentinel = object()


class Producer(threading.Thread):
    """
    只負責產生資料
    """

    def __init__(self, name, queue):
        # python3的寫法
        super().__init__(name=name)
        self.queue = queue

    def run(self):
        for i in range(5):
            print("%s is producing %d to the queue!" % (self.getName(), i))
            self.queue.put(i)
            time.sleep(random.randint(1, 10) * 0.1)

        # 設定完成的標誌位
        self.queue.put(_sentinel)
        print("%s finished!" % self.getName())


class Consumer(threading.Thread):
    """
    資料處理,
    寫入到資料庫裡面
    """

    def __init__(self, name, queue):
        super().__init__(name=name)
        self.queue = queue

    def run(self):
        while True:
            # 預設是阻塞的等待.
            value = self.queue.get()
            # 用來退出執行緒
            if value is _sentinel:
                break
            print("{} is consuming. {} in the queue is consumed!".format(self.getName(), value))

        print("%s finished!" % self.getName())


if __name__ == '__main__':
    queue = queue.Queue()
    producer = Producer('producer', queue)

    consumer = Consumer('consumer', queue)
    consumer2 = Consumer('consumer2', queue)

    producer.start()

    consumer.start()
    consumer2.start()

    producer.join()
    consumer.join()
    consumer2.join()

    print('All threads  done')
producer is producing 0 to the queue!
consumer is consuming. 0 in the queue is consumed!
producer is producing 1 to the queue!
consumer is consuming. 1 in the queue is consumed!
producer is producing 2 to the queue!
consumer2 is consuming. 2 in the queue is consumed!
producer is producing 3 to the queue!
consumer is consuming. 3 in the queue is consumed!
producer is producing 4 to the queue!
consumer2 is consuming. 4 in the queue is consumed!
producer finished!
consumer finished!


上面就就阻塞在第二個消費者那裡了, 因為第一個消費者把 標誌位拿走啦,並且退出了執行緒. 消費者2 發現佇列空了,就在那裡等待, 所以 就這樣阻塞在這裡.

  1. 當出現一個生產者和多個消費者 的時候.

解決方法,每個執行緒消費完成後, 在把標誌位放回佇列中, 這樣其他的消費者執行緒就可以退出來了. 來看下例子:

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@author: Frank 
@contact: [email protected]
@file: 18.py
@time: 2018/7/10 下午8:59

一個簡單的demon   生產者與消費者

"""
import time
import queue

import threading
import random

# 哨兵
_sentinel = object()


class Producer(threading.Thread):
    """
    只負責產生資料
    """

    def __init__(self, name, queue):
        # python3的寫法
        super().__init__(name=name)
        self.queue = queue

    def run(self):
        for i in range(15):
            print("%s is producing %d to the queue!" % (self.getName(), i))
            self.queue.put(i)
            time.sleep(random.randint(1, 20) * 0.1)

        # 設定完成的標誌位
        self.queue.put(_sentinel)
        print("%s finished!" % self.getName())


class Consumer(threading.Thread):
    """
    資料處理,對資料進行消費.
    """

    def __init__(self, name, queue):
        super().__init__(name=name)
        self.queue = queue

    def run(self):
        while True:
            value = self.queue.get()
            # 用來退出執行緒
            if value is _sentinel:
                # 新增哨兵,讓其他執行緒有機會退出來
                self.queue.put(value)
                break
            print("{} is consuming. {} in the queue is consumed!".format(self.getName(), value))

        print("%s finished!" % self.getName())


if __name__ == '__main__':
    queue = queue.Queue()
    producer = Producer('producer', queue)

    producer.start()

    consumer_threads = []
    for i in range(5):
        consumer = Consumer('consumer_' + str(i), queue)
        consumer_threads.append(consumer)
        consumer.start()

    producer.join()
    for consumer in consumer_threads:
        consumer.join()

    producer.join()

    print('All threads  done')
結果如下:

producer is producing 0 to the queue!
consumer_0 is consuming. 0 in the queue is consumed!
producer is producing 1 to the queue!
consumer_0 is consuming. 1 in the queue is consumed!
producer is producing 2 to the queue!
consumer_1 is consuming. 2 in the queue is consumed!
producer is producing 3 to the queue!
consumer_2 is consuming. 3 in the queue is consumed!
producer is producing 4 to the queue!
consumer_3 is consuming. 4 in the queue is consumed!
producer is producing 5 to the queue!
consumer_4 is consuming. 5 in the queue is consumed!
producer is producing 6 to the queue!
consumer_0 is consuming. 6 in the queue is consumed!
producer is producing 7 to the queue!
consumer_1 is consuming. 7 in the queue is consumed!
producer is producing 8 to the queue!
consumer_2 is consuming. 8 in the queue is consumed!
producer is producing 9 to the queue!
consumer_3 is consuming. 9 in the queue is consumed!
producer is producing 10 to the queue!
consumer_4 is consuming. 10 in the queue is consumed!
producer is producing 11 to the queue!
consumer_0 is consuming. 11 in the queue is consumed!
producer is producing 12 to the queue!
consumer_1 is consuming. 12 in the queue is consumed!
producer is producing 13 to the queue!
consumer_2 is consuming. 13 in the queue is consumed!
producer is producing 14 to the queue!
consumer_3 is consuming. 14 in the queue is consumed!
producer finished!
consumer_4 finished!
consumer_0 finished!
consumer_1 finished!
consumer_2 finished!
consumer_3 finished!
All threads  done

Process finished with exit code 0

當出現 多個生產和 多個消費者模型

二. 多個生產者與多個消費者 模型
  1. 第一種 實現方式
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@author: Frank
@contact: [email protected]
@file: test_producer_consumer.py
@time: 2018/8/31 上午9:55

一個簡單的demon   生產者與消費者模型
多個生產者 與多個消費者

2個生產者 與 多個消費者 模型.

"""

import time
import queue

import threading
import random

from collections import deque

# 哨兵
_sentinel = object()
_sentinel2 = object()


class Producer(threading.Thread):
    """
    只負責產生資料
    """

    def __init__(self, name, queue):
        # python3的寫法
        super().__init__(name=name)
        self._queue = queue

    def run(self):
        for i in range(5):
            print("{} is producing {} to the queue!".format(self.getName(), i))

            self._queue.put(i)
            time.sleep(random.randint(1, 20) * 0.1)

        # 設定完成的標誌位
        self._queue.put(_sentinel)
        print("%s finished!" % self.getName())


class Producer2(Producer):

    def run(self):
        for i in range(65, 70):
            item = chr(i)

            print("{} is producing {} to the queue!".format(self.getName(), item))
            self._queue.put(item)
            time.sleep(random.randint(1, 20) * 0.8)

        # 設定完成的標誌位
        self._queue.put(_sentinel2)
        print("%s finished!" % self.getName())


class Consumer(threading.Thread):
    """
    資料處理
    """

    _deque = deque()

    def __init__(self, name, queue, lock):
        super().__init__(name=name)
        self._queue = queue
        self._lock = lock

    def run(self):
        while True:
            value = self._queue.get(block=True, timeout=10)
            # 用來退出執行緒
            if value in (_sentinel, _sentinel2):
                with self._lock:
                    if value not in Consumer._deque:
                        Consumer._deque.append(value)

                self._queue.put(value)
                if len(Consumer._deque) == 2:
                    print('Consumer._deque  ==2  break')
                    break
            else:
                print("{} is consuming. {} in the queue is consumed!".format(self.getName(), value))

        print("{} finished!".format(self.getName()))


if __name__ == '__main__':

    q = queue.Queue()

    lock = threading.Lock()

    sentienl_queue = queue.Queue()

    producer = Producer('producer111', q)
    producer2 = Producer2('producer222', q)

    producer2.start()
    producer.start()

    consumer_threads = []
    for i in range(5):
        consumer = Consumer('consumer_' + str(i), q, lock)
        consumer_threads.append(consumer)
        consumer.start()

    for consumer in consumer_threads:
        consumer.join()

    producer.join()
    producer2.join()

    print('All threads  done')

結果如下:

producer222 is producing A to the queue!
producer111 is producing 0 to the queue!
consumer_0 is consuming. A in the queue is consumed!
consumer_0 is consuming. 0 in the queue is consumed!
producer111 is producing 1 to the queue!
consumer_0 is consuming. 1 in the queue is consumed!
producer111 is producing 2 to the queue!
consumer_1 is consuming. 2 in the queue is consumed!
producer111 is producing 3 to the queue!
consumer_2 is consuming. 3 in the queue is consumed!
producer111 is producing 4 to the queue!
consumer_3 is consuming. 4 in the queue is consumed!
producer111 finished!
producer222 is producing B to the queue!
consumer_4 is consuming. B in the queue is consumed!
producer222 is producing C to the queue!
consumer_4 is consuming. C in the queue is consumed!
producer222 is producing D to the queue!
consumer_1 is consuming. D in the queue is consumed!
producer222 is producing E to the queue!
consumer_0 is consuming. E in the queue is consumed!
producer222 finished!
Consumer._deque  ==2  break
consumer_1 finished!
Consumer._deque  ==2  break
consumer_2 finished!
Consumer._deque  ==2  break
consumer_3 finished!
Consumer._deque  ==2  break
consumer_4 finished!
Consumer._deque  ==2  break
consumer_0 finished!
All threads  done

Process finished with exit code 0

  1. 第二種 實現方式
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
@author: Frank
@contact: [email protected]
@file: test_producer_consumer.py
@time: 2018/8/31 上午9:55

一個簡單的demon   生產者與消費者模型
多個生產者 與多個消費者


2個生產者 與 多個消費者 模型.

"""
import time
import queue

import threading
import random
from collections import deque

# 兩個哨兵  作為結束標誌位
_sentinel = object()
_sentinel2 = object()


class Producer(threading.Thread):
    """
    只負責產生資料
    """

    def __init__(self, name, queue):
        # python3的寫法
        super().__init__(name=name)
        self._queue = queue

    def run(self):
        for i in range(5):
            print("{} is producing {} to the queue!".format(self.getName(), i))

            self._queue.put(i)
            time.sleep(0.1)

        # 設定完成的標誌位
        self._queue.put(_sentinel)
        print("%s finished!" % self.getName())


class Producer2(Producer):

    def run(self):
        for i in range(65, 70):
            item = chr(i)

            print("{} is producing {} to the queue!".format(self.getName(), item))
            self._queue.put(item)
            time.sleep(random.randint(1, 20) * 0.8)

        # 設定完成的標誌位
        self._queue.put(_sentinel2)
        print("%s finished!" % self.getName())


class Consumer(threading.Thread):
    """
    資料處理
    """

    _deque = deque()

    def __init__(self, name, queue):
        super().__init__(name=name)
        self._queue = queue

    @staticmethod
    def consume_data(datas):
        """
        消費資料 的方法 ,具體 怎麼消費資料, 看如何實現. 比如寫入檔案, 寫入資料庫等.
        :param datas:
        :return: int
        """

        return 0

    def run(self):
        while True:
            try:
                datas = self._queue.get(block=True, timeout=10)
            except queue.Empty:
                # print(f"queue  is  empty. ")
                datas = None

            # 用來退出執行緒
            if datas in (_sentinel, _sentinel2):
                if datas not in Consumer._deque:
                    print(f'put {datas} into the  Consumer._deque')
                    Consumer._deque.append(datas)
                if len(Consumer._deque) == 2:
                    print('Consumer._deque length == 2 ,break, current_thread:{} has finished'.format(self.getName()))
                    break
            else:
                # 消費資料
                if datas:
                    # 消費資料
                    sucess_number = self.consume_data(datas)
                    print("{} is consuming.  datas: {} in the queue is consumed! sucess_number:{}".format(self.getName(), datas, sucess_number))
                else:
                    print('datas is  None...')
                    if len(Consumer._deque) == 2:
                        print('Consumer._deque length == 2 ,break, current_thread:{} has finished'.format(self.getName()))
                        break


if __name__ == '__main__':

    q = queue.Queue()

    producer = Producer('producer111', q)
    producer2 = Producer2('producer222', q)

    producer.start()
    producer2.start()

    consumer_threads = []
    for i in range(5):
        consumer = Consumer('consumer_' + str(i), q)
        consumer_threads.append(consumer)
        consumer.start()

    for consumer in consumer_threads:
        consumer.join()

    producer2.join()
    producer.join()

    print('All threads  done')

執行結果的一部分如下圖:


producer222 is producing D to the queue!
consumer_0 is consuming.  datas: D in the queue is consumed! sucess_number:0
datas is  None...
datas is  None...
datas is  None...
datas is  None...
datas is  None...
datas is  None...
datas is  None...
datas is  None...
datas is  None...
producer222 is producing E to the queue!
consumer_0 is consuming.  datas: E in the queue is consumed! sucess_number:0
producer222 finished!
put <object object at 0x102720110> into the  Consumer._deque
Consumer._deque length == 2 ,break, current_thread:consumer_1 has finished
datas is  None...
Consumer._deque length == 2 ,break, current_thread:consumer_2 has finished
datas is  None...
Consumer._deque length == 2 ,break, current_thread:consumer_3 has finished
datas is  None...
Consumer._deque length == 2 ,break, current_thread:consumer_4 has finished
datas is  None...
Consumer._deque length == 2 ,break, current_thread:consumer_0 has finished
All threads  done

總結: 本文簡單總結 的 Python中 消費者與生產者模式這種程式設計模式的寫法,通過多執行緒來實現生產者與消費者模型.

分享快樂,留住感動.2018-09-30 22:46:42 --frank