Python 多線程同步隊列模型
阿新 • • 發佈:2017-07-22
並且 highlight 多線程 use lib star 保存 env module
Python 多線程同步隊列模型
我面臨的問題是有個非常慢的處理邏輯(比如分詞、句法),有大量的語料,想用多線程來處理。
這一個過程可以抽象成一個叫“同步隊列”的模型。 具體來講,有一個生產者(Dispatcher)一方面從語料中讀入句子,並且存入隊列中,一方面看有沒有空閑的消費者(Segmentor),如果有,就把句子從隊列中彈出並交給這個空閑的消費者處理。 然後消費者把處理完成的結果交給生產者輸出,生產者要保證輸出與輸入順序一致。
消費者是典型的threading,它需要看見生成者的隊列,從而從隊列中拿一些數據。
對於生產者,python中有一個叫Queue的module,實現了FIFO的同步隊列。 但它只能保證輸入與交付消費者的順序的有序,但不能保障生產者在輸出時有序,所以需要一個buffer來保存輸出順序。 程序的模型大概是這樣的。有一個master(),用來分發任務。有N個多線程的slave用來處理任務。
具體程序如下:
#!/usr/bin/env python
# real 3m0.263s
# user 0m0.016s
# sys 0m0.012s
from time import sleep
from random import random
from Queue import Queue
from threading import Thread, Lock
class Segmentor(Thread):
def __init__(self, dispatcher):
Thread.__init__(self)
self.d = dispatcher
def run(self):
while True:
idx, item = self.d.get()
# segment section
sleep(random() * 5)
# output section
d.output( idx, item )
self.d.task_done()
class Dispatcher(Queue):
def __init__(self):
Queue.__init__(self)
self.idx = 0
self.box = {}
self.lock = Lock()
def output(self, idx, item):
self.lock.acquire()
if idx > self.idx:
self.box[idx] = item
elif idx == self.idx:
self._output(item)
self.idx += 1
while self.idx in self.box:
item = self.box[self.idx]
self._output(item)
self.idx += 1
self.lock.release()
def _output(self, item):
print item
if __name__=="__main__":
d = Dispatcher()
for i in xrange(4):
t = Segmentor(d)
t.daemon = True
t.start()
num = 0