1. 程式人生 > >Python 多線程同步隊列模型

Python 多線程同步隊列模型

並且 highlight 多線程 use lib star 保存 env module

Python 多線程同步隊列模型


我面臨的問題是有個非常慢的處理邏輯(比如分詞、句法),有大量的語料,想用多線程來處理。

這一個過程可以抽象成一個叫“同步隊列”的模型。 具體來講,有一個生產者(Dispatcher)一方面從語料中讀入句子,並且存入隊列中,一方面看有沒有空閑的消費者(Segmentor),如果有,就把句子從隊列中彈出並交給這個空閑的消費者處理。 然後消費者把處理完成的結果交給生產者輸出,生產者要保證輸出與輸入順序一致。

消費者是典型的threading,它需要看見生成者的隊列,從而從隊列中拿一些數據。

對於生產者,python中有一個叫Queue的module,實現了FIFO的同步隊列。 但它只能保證輸入與交付消費者的順序的有序,但不能保障生產者在輸出時有序,所以需要一個buffer來保存輸出順序。 程序的模型大概是這樣的。有一個master(),用來分發任務。有N個多線程的slave用來處理任務。

具體程序如下:

#!/usr/bin/env python
# real    3m0.263s
# user    0m0.016s
# sys     0m0.012s

from time import sleep
from random import random
from Queue import Queue
from threading import Thread, Lock

class Segmentor(Thread):
    def __init__(self, dispatcher):
        Thread.__init__(self)
        self.d = dispatcher

    def run(self):
        while True:
            idx, item = self.d.get()
            # segment section
            sleep(random() * 5)
            # output section
            d.output( idx, item )
            self.d.task_done()

class Dispatcher(Queue):
    def __init__(self):
        Queue.__init__(self)
        self.idx = 0
        self.box = {}
        self.lock = Lock()

    def output(self, idx, item):
        self.lock.acquire()
        if idx > self.idx:
            self.box[idx] = item
        elif idx == self.idx:
            self._output(item)
            self.idx += 1
            while self.idx in self.box:
                item = self.box[self.idx]
                self._output(item)
                self.idx += 1

        self.lock.release()

    def _output(self, item):
        print item

if __name__=="__main__":
    d = Dispatcher()
    for i in xrange(4):
        t = Segmentor(d)
        t.daemon = True
        t.start()

    num = 0
    for line in open("data", "r"):
        d.put( (num, line.strip()) )
        num += 1

    d.join()

在300句的條件下,單線程的處理速度約為750s=12m,開4個線程後3m可以處理完成,並且輸出是有序的。

其他語言應該可以仿照這個方式編寫程序,對於沒有同步隊列的語言,實現時可以參考這個http://hg.python.org/cpython/file/2.7/Lib/Queue.py

Python 多線程同步隊列模型