1. 程式人生 > >python與zmq系列(5)

python與zmq系列(5)

我們已經瞭解REQ/REP,PUB/SUB,PULL/PUSH這三種模式,也曾提到過,一個上下文可以建立多個socket套接字,那麼如何管理這些套接字呢?


假設我們的一個客戶端既有pull又有sub,他們兩個都需要接收訊息,該如何協調呢,畢竟,當一個socket要收訊息的時候,函式recv是阻塞的,所以,我們第一個思路是不讓它阻塞,看示例程式碼:


  
  1. #coding=utf-8
  2. '''
  3. Created on 2015-10-13
  4. 在這裡,同時處理多個套接字,那麼接收訊息的時候,就需要設定noblock
  5. 不然會在第一個接收訊息的地方堵塞
  6. @author: kwsy2015
  7. '''
  8. import zmq
  9. import time
  10. # Prepare our context and sockets
  11. context = zmq.Context()
  12. # Connect to task ventilator
  13. receiver = context.socket(zmq.PULL)
  14. receiver.connect( "tcp://localhost:5557")
  15. # Connect to weather server
  16. subscriber = context.socket(zmq.SUB)
  17. subscriber.connect( "tcp://localhost:5556")
  18. subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")
  19. # Process messages from both sockets
  20. # We prioritize traffic from the task ventilator
  21. while True:
  22. # Process any waiting tasks
  23. while True:
  24. try:
  25. #用了NOBLOCK,就意味著得不到訊息時不會堵塞在這裡
  26. msg = receiver.recv(zmq.NOBLOCK)
  27. except zmq.ZMQError:
  28. break
  29. # process task
  30. # Process any waiting weather updates
  31. while True:
  32. try:
  33. msg = subscriber.recv(zmq.NOBLOCK)
  34. except zmq.ZMQError:
  35. break
  36. # process weather update
  37. # No activity, so sleep for 1 msec
  38. time.sleep( 0.001)

      通過設定zmq.NOBLOCK,我們可以讓recv不再阻塞,但是呢,要捕捉zmq.ZMQError這個異常,這樣一來,兩個套接字就可以不發生衝突了

       但是明顯,你可以感受得到,這種做法的醜陋,看起來不是那麼的優雅,所以我們換一種做法

       


  
  1. #coding=utf-8
  2. '''
  3. Created on 2015-10-13
  4. 這種方式比msreader要更好一些
  5. @author: kwsy2015
  6. '''
  7. import zmq
  8. # Prepare our context and sockets
  9. context = zmq.Context()
  10. # Connect to task ventilator
  11. receiver = context.socket(zmq.PULL)
  12. receiver.connect( "tcp://localhost:5557")
  13. # Connect to weather server
  14. subscriber = context.socket(zmq.SUB)
  15. subscriber.connect( "tcp://localhost:5556")
  16. subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")
  17. # Initialize poll set
  18. poller = zmq.Poller()
  19. poller.register(receiver, zmq.POLLIN)
  20. poller.register(subscriber, zmq.POLLIN)
  21. # Process messages from both sockets
  22. while True:
  23. try:
  24. socks = dict(poller.poll())
  25. except KeyboardInterrupt:
  26. break
  27. if receiver in socks:
  28. message = receiver.recv()
  29. # process task
  30. if subscriber in socks:
  31. message = subscriber.recv()
  32. # process weather update

這種做法就很想socket的select模式,大家誰也別爭,誰也別搶,只要有訊息達到,我就通知你們,然後你們各自檢查是不是自己的訊息。我們在客戶端建立多個socket套接字可能是合理的,但是服務端就最好別這麼做了,REQ,PUSH,PUB,道理其實也很簡單,服務就是服務,多個員工可以擠在一個辦公司裡辦公,哪有多個老闆擠在一起辦公的。