1. 程式人生 > >zmq筆記三:socket和mailbox

zmq筆記三:socket和mailbox

它的 如何 like and targe 多個 pri commands png

int major, minor, patch;
zmq_version(&major, &minor, &patch); //4.2.0

本文主要是分析代碼,方便自己日後查閱.

=========================================

1. socket類型

每個socket類型有一個類與之對應. 所有的這些類都繼承於socket_base_t.各子類的繼承關系圖請查看筆記一.

   class socket_base_t :
        public own_t,
        public array_item_t <>,
        public i_poll_events,
        public i_pipe_events
    {
        friend class reaper_t;

    public:

        ......
        int send (zmq::msg_t *msg_, int flags_);
        int recv (zmq::msg_t *msg_, int flags_);
        int add_signaler (signaler_t *s);
        int remove_signaler (signaler_t *s);
        int close ();

        //  These functions are used by the polling mechanism to determine
        //  which events are to be reported from this socket.
        bool has_in ();
        bool has_out ();
        ......
        //  i_poll_events implementation. This interface is used when socket
        //  is handled by the poller in the reaper thread.
        void in_event ();
        void out_event ();
        void timer_event (int id_);
         ......
    protected:
        socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_ = false);
        virtual ~socket_base_t ();
        .....

        //  The default implementation assumes that send is not supported.
        virtual bool xhas_out ();
        virtual int xsend (zmq::msg_t *msg_);

        //  The default implementation assumes that recv in not supported.
        virtual bool xhas_in ();
        virtual int xrecv (zmq::msg_t *msg_);
        ......
    private:
        //  Creates new endpoint ID and adds the endpoint to the map.
        void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe);

        //  Map of open endpoints.
        typedef std::pair <own_t *, pipe_t*> endpoint_pipe_t;
        typedef std::multimap <std::string, endpoint_pipe_t> endpoints_t;
        endpoints_t endpoints;

        //  Map of open inproc endpoints.
        typedef std::multimap <std::string, pipe_t *> inprocs_t;
        inprocs_t inprocs;

        //  Moves the flags from the message to local variables,
        //  to be later retrieved by getsockopt.
        void extract_flags (msg_t *msg_);
        ......
        int process_commands (int timeout_, bool throttle_);
// Socket‘s mailbox object. i_mailbox *mailbox; // List of attached pipes. typedef array_t <pipe_t, 3> pipes_t; pipes_t pipes; // Reaper‘s poller and handle of this socket within it. poller_t *poller; poller_t::handle_t handle; ...... };

socket_base_t這個父類做了大部分邏輯,子類再按需實現函數重載. 拿req_t為例, req_t繼承dealer_t,dealer_t繼承socket_base_t. 子類以實現xsend/xrecv等帶x前綴的重載函數為主,而父類socket_base_t對外暴露的是不帶前綴x的函數.

   class req_t : public dealer_t
    {
    public:

        req_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
        ~req_t ();

        //  Overrides of functions from socket_base_t.
        int xsend (zmq::msg_t *msg_);
        int xrecv (zmq::msg_t *msg_);
        bool xhas_in ();
        bool xhas_out ();
        int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
        void xpipe_terminated (zmq::pipe_t *pipe_);

    protected:
        ......
    private:
        ......
        //  The pipe the request was sent to and where the reply is expected.
        zmq::pipe_t *reply_pipe;
        ......
        req_t (const req_t&);
        const req_t &operator = (const req_t&);
    };

2.mailbox

基類socket_base_t有一個成員變量 i_mailbox *mailbox. 這就是socket的郵箱了,所有投遞給socket的命令消息command_t都會放到這個郵箱的隊列裏.

zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) :
    own_t (parent_, tid_),
    ......
    thread_safe (thread_safe_),
    reaper_signaler (NULL)
{
    options.socket_id = sid_;
    options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
    options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;

    if (thread_safe)
        mailbox = new mailbox_safe_t(&sync);
    else {
        mailbox_t *m = new mailbox_t();
        if (m->get_fd () != retired_fd)
            mailbox = m;
        else {
            LIBZMQ_DELETE (m);
            mailbox = NULL;
        }
    }
}

由構造函數可知,mailbox是有線程安全的分別的, mailbox_safe_t和mailbox_t都是mutex_t sync作為訪問互斥. 這是因為mailbox的消息隊列 ypipe_t是無鎖鏈表,讀寫需要同步,ypipe_t更詳細的實現和分析可參考這篇博客.

        //  The pipe to store actual commands.
        typedef ypipe_t <command_t, command_pipe_granularity> cpipe_t;
        cpipe_t cpipe;

郵箱的sync在mailbox_safe_t是以socket_base_t的sync指針來初始化的,而mailbox_t則是獨立於socket本身的.

對於mailbox_t來說,任意時刻只能有一個線程去讀它的命令消息隊列,讀消息不用加鎖,並只需要一個signaler去通知讀線程; 而寫入消息隊列時,卻可能有多個線程寫,所以需要在寫入隊列時加鎖互斥.

        //  Signaler to pass signals from writer thread to reader thread.
        signaler_t signaler;

對於mailbox_safe_t則是根據對socket本身的互斥訪問來讀寫它的命令消息隊列,並且有多個signaler來通知可讀狀態.

std::vector <zmq::signaler_t* > signalers;

實際上讀的時候它使用的是pthread_cond_wait和pthread_cond_broadcast的組合來獲得鎖.

void zmq::mailbox_safe_t::send (const command_t &cmd_)
{
    sync->lock ();
    cpipe.write (cmd_, false);
    const bool ok = cpipe.flush ();

    if (!ok) {
        cond_var.broadcast (); //調用pthread_cond_broadcast喚醒正在等待pthread_cond_wait返回的讀線程
        for (std::vector<signaler_t*>::iterator it = signalers.begin(); it != signalers.end(); ++it){
            (*it)->send(); //喚醒各個reader有消息可讀
        }
    }

    sync->unlock ();
}

int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_)
{
    //  Try to get the command straight away.
    if (cpipe.read (cmd_)) //無鎖隊列,能獲取消息則必定由一個線程取出,compare_and_swap原子操作
        return 0;
    
    //  Wait for signal from the command sender.
    int rc = cond_var.wait (sync, timeout_); //獲取sync的鎖,並休眠等待pthread_cond_broadcast信號喚醒; 註意,pthread_cond_wait返回後,其實同時也獲得了sync的鎖
    if (rc == -1) {
        errno_assert (errno == EAGAIN || errno == EINTR);
        return -1;
    }

    //  Another thread may already fetch the command
    const bool ok = cpipe.read (cmd_);

    if (!ok) {
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

筆者的分析是基於mailbox_t而不是mailbox_safe_t,所以對mailbox_safe_t的使用場合並沒有經驗研究.

3.signaler

郵箱是否有可待讀取的命令消息,依靠signaler來通知.先來看一下這個類結構:

    class signaler_t
    {
    public:

        signaler_t ();
        ~signaler_t ();

        fd_t get_fd () const;
        void send ();
        int wait (int timeout_);
        void recv ();
        int recv_failable ();
        ......
    private:

        //  Creates a pair of file descriptors that will be used
        //  to pass the signals.
        static int make_fdpair (fd_t *r_, fd_t *w_);

        //  Underlying write & read file descriptor
        //  Will be -1 if we exceeded number of available handles
        fd_t w;
        fd_t r;

        ......
    };

signaler類主要是提供一對socket句柄(w/r).在支持socketpair的平臺下(*nix),可直接調用返回;而在windows平臺下,是通過打通w/r兩個socket句柄的通信.當有寫線程給mailbox發送命令消息時,判斷如果持有mailbox的讀線程掛起了,就調用mailbox的signaler->send():

void zmq::signaler_t::send ()
{
#if defined HAVE_FORK
    if (unlikely (pid != getpid ())) {
        //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
        return; // do not send anything in forked child context
    }
#endif
#if defined ZMQ_HAVE_EVENTFD
    ......
#elif defined ZMQ_HAVE_WINDOWS
    unsigned char dummy = 0;
    int nbytes = ::send (w, (char *) &dummy, sizeof (dummy), 0);
    wsa_assert (nbytes != SOCKET_ERROR);
    zmq_assert (nbytes == sizeof (dummy));
#else
    unsigned char dummy = 0;
    while (true) {
        ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0); 
        if (unlikely (nbytes == -1 && errno == EINTR))
            continue;
#if defined(HAVE_FORK)
        if (unlikely (pid != getpid ())) {
            //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
            errno = EINTR;
            break;
        }
#endif
        zmq_assert (nbytes == sizeof dummy);
        break;
    }
#endif
}

給w發送消息,這樣r變成可讀狀態,掛起的select阻塞調用立即返回. mailbox.get_fd()返回的其實就是mailbox.signaler.r. *請註意*, signaler的w/r套接字句柄是可阻塞的.

對於非線程安全的mailbox_t,對於socket類對象,它們本身並沒有I/O線程的loop()輪詢函數,那麽它的mailbox的可讀消息狀態是由signaler的r句柄通知,由signaler.wait()函數對r進行select調用,而signaler.wait()是一般是通過socket_base_t:process_commands() -> mailbox_t:recv () -> signaler:wait () 調用鏈.當mailbox的命令隊列為空,r也沒可讀狀態時,signaler:wait (int timeout) ,傳入的timeout=-1,由於signaler的w/r是可阻塞的,這時調用process_commands()的線程將會阻塞在wait()的select調用.當然,context的I/O線程依然會繼續loop()輪詢.

那麽阻塞了socket的線程如何被喚醒? 答案是通過給socket的mailbox發送消息.

zmq::socket_base_t *zmq::ctx_t::create_socket (int type_){
......
// Create the socket and register its mailbox. socket_base_t *s = socket_base_t::create (type_, this, slot, sid); if (!s) { empty_slots.push_back (slot); slot_sync.unlock (); return NULL; } sockets.push_back (s); slots [slot] = s->get_mailbox ();
......
}

在create_socket這個函數裏,為context新增一個socket時,socket的mailbox就加入了slots的數組管理器裏. 當I/O線程(或其他知道該scoket的mailbox對應的slot id的實例)給對應的mailbox發送消息,就會喚醒正在阻塞的socket了.

zmq筆記三:socket和mailbox