1. 程式人生 > >python ZeroMQ實現1:N,非同步收發訊息(也可向指定客戶端傳送訊息)

python ZeroMQ實現1:N,非同步收發訊息(也可向指定客戶端傳送訊息)

ZeroMQ的python版本和C/C++版本的介面差不多,要實現一個server對N個client,非同步方式,而且可以對指定的client傳送訊息,可以這樣: server採用ROUTER方式,client採用DEALER方式,而且要自己制定client的zmq.IDENTITY(如果不指定,zmq就會自動生成一個,不好控制對制定的client傳送訊息。) server.py

#!/usr/bin/python
#-*-coding:utf-8-*-
import time
import zmq
import chardet

#import zhelpers

context = zmq.Context(
) socket = context.socket(zmq.ROUTER) #server不需要指定 #socket.setsockopt_string(zmq.IDENTITY, u"desktop") socket.bind("tcp://*:5555") while True: #zhelpers.dump(socket) #這裡可以打印出幀的具體內容 [address,contents]=socket.recv_multipart() print("[%s]%s\n"%(address,contents)) reply = "[get server reply:"
+ contents + "]" socket.send_multipart([address, reply]) #這裡的address就可以指定客戶端發訊息
#!/usr/bin/python
#-*-coding:utf-8-*-
import zmq
import sys
import os

import threading
import ctypes
import inspect

class ZmqClientThread(threading.Thread):

    def __init__(self, func, serverIp, port, identity)
: threading.Thread.__init__(self) self.context = zmq.Context() self.socket = self.context.socket(zmq.DEALER) self.serverIp = serverIp self.identity = identity self.port = port self.func = func self.socket.setsockopt_string(zmq.IDENTITY, identity) #預設使用utf-8編碼 #先設定IDENTITY,再connect,順序不能顛倒 self.socket.connect( "tcp://{0}:{1}".format(serverIp, port) ) #("tcp://localhost:5555")或者("tcp://127.0.0.1:5555")都可以 #向server傳送字串 def sendMsg(self, data): if not self.socket.closed: self.socket.send(data) else: print "sock is closed,can't send message..." def run(self): self.func(self.socket) #收從server發來的字串 def loop(socket): while True: if not socket.closed: message = socket.recv() print message else: print "sock is closed,can't receive any message..." break def main(): serverIp = "127.0.0.1" port = 5555 identity = u"client1" zmqThread = ZmqClientThread(loop, serverIp, port, identity) zmqThread.start() while(True): data = raw_input("input your data:") if data == 'q': print "data == q" os._exit(1) else: zmqThread.sendMsg(data) #這種方式發字串 if __name__=='__main__': main()

知識點: Context使用完,在C/C++中需要手動關閉,而python中會在垃圾回收的時候自動呼叫term關閉。

Close or terminate the context. This can be called to close the context by hand. If this is not called, the context will automatically be closed when it is garbage collected.

Socket也是一樣,使用完後,在C/C++中需要手動關閉,而python中會在垃圾回收的時候自動呼叫term關閉。

Close the socket. If linger is specified, LINGER sockopt will be set prior to closing. This can be called to close the socket by hand. If this is not called, the socket will automatically be closed when it is garbage collected.

具體檢視官方文件:https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket 還有需要注意的是Router <->Dealer模式,需要客戶端先發一個數據幀(空白的也可以)到伺服器端,之後伺服器端才能指定客戶端發訊息(這是一個建立路由的過程)

zhelper.py (這個檔案是從github的一個開源專案上下載下來的,用dump可以打印出幀的具體內容,方便除錯)

# encoding: utf-8
"""
Helper module for example applications. Mimics ZeroMQ Guide's zhelpers.h.
"""
from __future__ import print_function

import binascii
import os
from random import randint

import zmq

def socket_set_hwm(socket, hwm=-1):
    """libzmq 2/3/4 compatible sethwm"""
    try:
        socket.sndhwm = socket.rcvhwm = hwm
    except AttributeError:
        socket.hwm = hwm


def dump(msg_or_socket):
    """Receives all message parts from socket, printing each frame neatly"""
    if isinstance(msg_or_socket, zmq.Socket):
        # it's a socket, call on current message
        msg = msg_or_socket.recv_multipart()
    else:
        msg = msg_or_socket
    print("----------------------------------------")
    for part in msg:
        print("[%03d]" % len(part), end=' ')
        is_text = True
        try:
            print(part.decode('ascii'))
        except UnicodeDecodeError:
            print(r"0x%s" % (binascii.hexlify(part).decode('ascii')))


def set_id(zsocket):
    """Set simple random printable identity on socket"""
    identity = u"%04x-%04x" % (randint(0, 0x10000), randint(0, 0x10000))
    zsocket.setsockopt_string(zmq.IDENTITY, identity)


def zpipe(ctx):
    """build inproc pipe for talking to threads
    mimic pipe used in czmq zthread_fork.
    Returns a pair of PAIRs connected via inproc
    """
    a = ctx.socket(zmq.PAIR)
    b = ctx.socket(zmq.PAIR)
    a.linger = b.linger = 0
    a.hwm = b.hwm = 1
    iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
    a.bind(iface)
    b.connect(iface)
    return a,b