1. 程式人生 > >Python multiprocessing模組中Queue的通訊問題

Python multiprocessing模組中Queue的通訊問題

先簡單說一下Queue的常用方法:

put:向佇列插入一個數據

get:從佇列取出並刪除一個數據(先進先出)

empty:判斷佇列是否為空

最近發現了一個有意思的地方,在多程序加入Queue後,如果你不告訴其中通過Queue取東西的程序Queue此時已經空了,那麼該程序不會自動中斷,而且好像也不會繼續做其他的事情,像這樣:

# encoding=utf8

from multiprocessing import Process, Queue
from time import sleep

def get(q):
	while True:
		info = q.get()
		print 'get %s' % info
		print 'is running'
		sleep(1)

def put(q):
	for i in range(5):
		q.put(str(i))
	print 'put is done'

def main():
	print 'main task start'
	q = Queue()
	p1 = Process(target=put, args=(q, ))
	p2 = Process(target=get, args=(q, ))

	p1.start()
	p2.start()
	p1.join()
	p2.join()

	print 'main task done'

if __name__ == '__main__':
	main()

執行結果:

main task start
get 0
is running
put is done
get 1
is running
get 2
is running
get 3
is running
get 4
is running

通過執行結果可以知道,主程序一隻被p2程序阻塞著,因為主程序最後的“main task done”一直沒有被打印出來。而且正常來講,就算p2程序沒有停止,也應該持續列印“is running”,但是結果顯示並沒有,暫時還未搞明白是為什麼。不過阻止上面p2造成主程序阻塞的方法還是有的,最簡單的就是呼叫Queue的empty方法:

# encoding=utf8

from multiprocessing import Process, Queue
from time import sleep

def get(q):
	while True:
		info = q.get()
		print 'get %s' % info
		print 'is running'
		if q.empty():	# 如果佇列空了,就退出迴圈
			break
		sleep(1)

def put(q):
	for i in range(5):
		q.put(str(i))
	print 'put is done'

def main():
	print 'main task start'
	q = Queue()
	p1 = Process(target=put, args=(q, ))
	p2 = Process(target=get, args=(q, ))

	p1.start()
	p2.start()
	p1.join()
	p2.join()

	print 'main task done'

if __name__ == '__main__':
	main()

執行結果:

main task start
get 0
is running
put is done
get 1
is running
get 2
is running
get 3
is running
get 4
is running
main task done

另一種方法就是在put中告訴佇列,我已經空了:

# encoding=utf8

from multiprocessing import Process, Queue
from time import sleep

def get(q):
	while True:
		info = q.get()
		print 'get %s' % info
		print 'is running'
		if not info:
			break
		sleep(1)

def put(q):
	for i in range(5):
		q.put(str(i))
	q.put(None)
	print 'put is done'

def main():
	print 'main task start'
	q = Queue()
	p1 = Process(target=put, args=(q, ))
	p2 = Process(target=get, args=(q, ))

	p1.start()
	p2.start()
	p1.join()
	p2.join()

	print 'main task done'

if __name__ == '__main__':
	main()

執行結果:

main task start
put is done
get 0
is running
get 1
is running
get 2
is running
get 3
is running
get 4
is running
get None
is running
main task done

也就是說,想要佇列知道自己是不是空了,一種是呼叫empty方法,另一種就是自己告訴自己一個“結束標記”。但是這兩種方法只適用於get和put各自只有一個程序的情況,當有多個程序在通過同一個Queue中get和put的時候,上面的方法就不適用了。

JoinableQueue

解決上面問題的一個方法就是使用JoinableQueue,不過自己還沒弄太明白,參照別人的例子自己寫了一個:

# encoding=utf8

import multiprocessing
from multiprocessing import Process, JoinableQueue
from time import sleep
import random

s = '\033[31;42m'
e = '\033[0m'

def get(q):
	name = multiprocessing.current_process().name
	while True:
		info = q.get()
		print '--- %s%s get %s%s' % (s, name, info, e)
		sleep(random.randint(1,2))
		q.task_done()

def put(q):
	name = multiprocessing.current_process().name
	for i in range(3):
		print '%s put %s' % (name, i)
		q.put(str(i))
		sleep(random.randint(1, 2))
	q.join()

def main():
	print 'main task start'
	q = JoinableQueue()
	p1 = Process(name='p1', target=put, args=(q, ))
	p2 = Process(name='p2', target=put, args=(q, ))
	p3 = Process(name='p3', target=put, args=(q, ))

	q1 = Process(name='q1', target=get, args=(q, ))
	q2 = Process(name='q2', target=get, args=(q, ))
	q1.daemon = True
	q2.daemon = True

	for item in [p1, p2, p3, q1, q2]:
		item.start()
	p1.join()
	p2.join()
	p3.join()

	print 'main task done'

if __name__ == '__main__':
	main()

執行結果:


說明:

JoinableQueue是Queue的子類,增加了task_done()和join()方法。

task_done()用來告訴queue一個task完成。一般地在呼叫get()獲得一個task,在task結束後呼叫task_done()來通知Queue當前task完成。

join() 阻塞直到queue中的所有的task都被處理(即task_done方法被呼叫)。

在上面程式碼中,對於呼叫get的兩個子程序應該設定為守護程序(daemon = True),這裡子程序不會直接被結束,可能是因為JoinableQueue自己會協調put和get,這樣當JoinableQueue中的資料全部被取出後,這兩個子程序才會自動結束。