1. 程式人生 > >Python消息隊列

Python消息隊列

python 消息隊列 queue multiprocessing

消息中間件 --->就是消息隊列

異步方式:不需要立馬得到結果,需要排隊

同步方式:需要實時獲得數據,堅決不能排隊

例子:

#多進程模塊multiprocessing

from multiprocessing import Process

from multiprocessing import Queue

def write(q):

for i in ["a", "b", "c", "d"]:

q.put(i)

print ("put {0} to queue".format(i))

def read(q):

while 1:

result = q.get()

print ("get {0} from queue".format(result))

#寫一個主函數

def main():

q = Queue()

pw = Process(target=write, args=(q,))

pr = Process(target=read, args=(q,))

pw.start()

pr.start()

pw.join()

#終止pr線程

pr.terminate()

if __name__ == '__main__':

#調用主函數

main()

輸出:

put a to queue

put b to queue

put c to queue

put d to queue

多進程模塊multiprocessing中pipe方法實現消息隊列

例子:

from multiprocessing import Pipe, Process

import time

def proce1(pipe):

for i in xrange(1, 10):

pipe.send(i)

print ("send {0} to pipe".format(i))

time.sleep(1)

def proce2(pipe):

n = 9

while n > 0 :

result = pipe.recv()

print ("recv {0} from pipe".format(result))

def main():

pipe = Pipe(duplex=False)

print (type(pipe))

p1 = Process(target=proce1, args=(pipe[1],))

p2 = Process(target=proce2, args=(pipe[0],))

p1.start()

p2.start()

p1.join()

p2.join()

pipe[0].close()

pipe[1].close()

if __name__ == '__main__':

main()

輸出:

<type 'tuple'>

send 1 to pipe

recv 1 from pipe

recv 2 from pipe

send 2 to pipe

recv 3 from pipe

send 3 to pipe

recv 4 from pipe

send 4 to pipe

send 5 to pipe

recv 5 from pipe

recv 6 from pipe

send 6 to pipe

send 7 to pipe

recv 7 from pipe

send 8 to pipe

recv 8 from pipe

send 9 to pipe

recv 9 from pipe

模仿生產者和消費者的多線程消息隊列練習

例子:

from threading import Thread

from multiprocessing import Queue

import time

class Proceduer(Thread):

def __init__(self, queue):

super(Proceduer, self).__init__()

self.queue = queue

def run(self):

try:

for i in xrange(1, 10):

print ("put data is {0} to queue".format(i))

self.queue.put(i)

except Exception as e:

print ("put data error")

raise e

class Consumer_odd(Thread):

def __init__(self, queue):

super(Consumer_odd, self).__init__()

self.queue = queue

def run(self):

try:

while not self.queue.empty:

number = self.queue.get()

if number%2 != 0:

print ("get {0} from queue odd. thread name is {1}".format(number, self.getName()))

else:

self.queue.put(number)

time.sleep(1)

except Exception as e:

raise e

class Consumer_even(Thread):

def __init__(self, queue):

super(Consumer_even, self).__init__()

self.queue = queue

def run(self):

try:

while not self.queue.empty:

number = self.queue.get()

if number%2 == 0:

print ("get {0} from queue even.thread name is{1}".format(number, self.getName()))

else:

self.queue.put(number)

time.sleep(1)

except Exception as e:

raise e

def main():

queue = Queue()

p = Proceduer(queue=queue)

p.start()

p.join()

time.sleep(1)

c1 = Consumer_odd(queue=queue)

c2 = Consumer_even(queue=queue)

c1.start()

c2.start()

c1.join()

c2.join()

print ("ALL thread terminate")

if __name__ == '__main__':

main()


Python消息隊列