1. 程式人生 > >ZMQ原始碼分析(七) --程序內通訊

ZMQ原始碼分析(七) --程序內通訊

之前兩節分析了zmq的tcp通訊流程,除了tcp之外,zmq還支援許多其他的通訊模式,比如inproc,ipc,pgm,epgm,tipc等。這一節接著分析inpro,即程序內通訊。

和tcp通訊相比,程序內通訊要簡單許多,因為不涉及到遠端連線的認證以及資料的編碼和解碼,只是簡單的在兩個socket_base_t之間連線一個pipe,通過pipe線上程間傳遞資料即可。

inproc通訊也兩個套接字分別呼叫bind和connect進行連線,但是同樣對順序沒有要求,下面分別看一下bind和connect對inproc的實現,首先是connect方法:

int zmq::socket_base_t::connect (const
char *addr_) { if (unlikely (ctx_terminated)) { errno = ETERM; return -1; } // Process pending commands, if any. int rc = process_commands (0, false); if (unlikely (rc != 0)) return -1; // Parse addr_ string. std::string protocol; std::string address; if
(parse_uri (addr_, protocol, address) || check_protocol (protocol)) return -1; if (protocol == "inproc") { // TODO: inproc connect is specific with respect to creating pipes // as there's no 'reconnect' functionality implemented. Once that // is in place we should follow generic pipe creation algorithm.
// Find the peer endpoint. endpoint_t peer = find_endpoint (addr_); // The total HWM for an inproc connection should be the sum of // the binder's HWM and the connector's HWM. int sndhwm = 0; if (peer.socket == NULL) sndhwm = options.sndhwm; else if (options.sndhwm != 0 && peer.options.rcvhwm != 0) sndhwm = options.sndhwm + peer.options.rcvhwm; int rcvhwm = 0; if (peer.socket == NULL) rcvhwm = options.rcvhwm; else if (options.rcvhwm != 0 && peer.options.sndhwm != 0) rcvhwm = options.rcvhwm + peer.options.sndhwm; // Create a bi-directional pipe to connect the peers. object_t *parents [2] = {this, peer.socket == NULL ? this : peer.socket}; pipe_t *new_pipes [2] = {NULL, NULL}; bool conflate = options.conflate && (options.type == ZMQ_DEALER || options.type == ZMQ_PULL || options.type == ZMQ_PUSH || options.type == ZMQ_PUB || options.type == ZMQ_SUB); int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm}; bool conflates [2] = {conflate, conflate}; int rc = pipepair (parents, new_pipes, hwms, conflates); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. attach_pipe (new_pipes [0]); if (!peer.socket) { // The peer doesn't exist yet so we don't know whether // to send the identity message or not. To resolve this, // we always send our identity and drop it later if // the peer doesn't expect it. msg_t id; rc = id.init_size (options.identity_size); errno_assert (rc == 0); memcpy (id.data (), options.identity, options.identity_size); id.set_flags (msg_t::identity); bool written = new_pipes [0]->write (&id); zmq_assert (written); new_pipes [0]->flush (); const endpoint_t endpoint = {this, options}; pend_connection (std::string (addr_), endpoint, new_pipes); } else { // If required, send the identity of the local socket to the peer. if (peer.options.recv_identity) { msg_t id; rc = id.init_size (options.identity_size); errno_assert (rc == 0); memcpy (id.data (), options.identity, options.identity_size); id.set_flags (msg_t::identity); bool written = new_pipes [0]->write (&id); zmq_assert (written); new_pipes [0]->flush (); } // If required, send the identity of the peer to the local socket. if (options.recv_identity) { msg_t id; rc = id.init_size (peer.options.identity_size); errno_assert (rc == 0); memcpy (id.data (), peer.options.identity, peer.options.identity_size); id.set_flags (msg_t::identity); bool written = new_pipes [1]->write (&id); zmq_assert (written); new_pipes [1]->flush (); } // Attach remote end of the pipe to the peer socket. Note that peer's // seqnum was incremented in find_endpoint function. We don't need it // increased here. send_bind (peer.socket, new_pipes [1], false); } // Save last endpoint URI last_endpoint.assign (addr_); // remember inproc connections for disconnect inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0])); return 0; } }

connect方法首先判斷需要連線的地址是否已經繫結過,之後建立pipepair,把其中的一條attach在自身,如果需要連線socket_base_t已經存在,則向它傳送繫結命令。這期間還會根據是否需要identity資訊來決定是否傳送identity訊息。如果需要連線的socket_base_t不存在,則connect方法呼叫pend_connection:

void zmq::ctx_t::pend_connection (const std::string &addr_,
        const endpoint_t &endpoint_, pipe_t **pipes_)
{
    const pending_connection_t pending_connection =
        {endpoint_, pipes_ [0], pipes_ [1]};

    endpoints_sync.lock ();

    endpoints_t::iterator it = endpoints.find (addr_);
    if (it == endpoints.end ()) {
        // Still no bind.
        endpoint_.socket->inc_seqnum ();
        pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
    }
    else
        // Bind has happened in the mean time, connect directly
        connect_inproc_sockets (it->second.socket, it->second.options, pending_connection, connect_side);

    endpoints_sync.unlock ();
}

pend_connection呼叫endpoints_sync鎖,之後會在判斷需要連線的地址是否剛剛新增進來(其他執行緒的操作),如果有馬上呼叫connect_inproc_sockets,如果沒有則在pending_connections註冊這一條connect操作。

void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
    options_t& bind_options, const pending_connection_t &pending_connection_, side side_)
{
    bind_socket_->inc_seqnum();
    pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());

    if (!bind_options.recv_identity) {
        msg_t msg;
        const bool ok = pending_connection_.bind_pipe->read (&msg);
        zmq_assert (ok);
        const int rc = msg.close ();
        errno_assert (rc == 0);
    }

    int sndhwm = 0;
    if (pending_connection_.endpoint.options.sndhwm != 0 && bind_options.rcvhwm != 0)
        sndhwm = pending_connection_.endpoint.options.sndhwm + bind_options.rcvhwm;

    int rcvhwm = 0;
    if (pending_connection_.endpoint.options.rcvhwm != 0 && bind_options.sndhwm != 0)
        rcvhwm = pending_connection_.endpoint.options.rcvhwm + bind_options.sndhwm;

    bool conflate = pending_connection_.endpoint.options.conflate &&
            (pending_connection_.endpoint.options.type == ZMQ_DEALER ||
             pending_connection_.endpoint.options.type == ZMQ_PULL ||
             pending_connection_.endpoint.options.type == ZMQ_PUSH ||
             pending_connection_.endpoint.options.type == ZMQ_PUB ||
             pending_connection_.endpoint.options.type == ZMQ_SUB);

    int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
    pending_connection_.connect_pipe->set_hwms(hwms [1], hwms [0]);
    pending_connection_.bind_pipe->set_hwms(hwms [0], hwms [1]);

    if (side_ == bind_side) {
        command_t cmd;
        cmd.type = command_t::bind;
        cmd.args.bind.pipe = pending_connection_.bind_pipe;
        bind_socket_->process_command (cmd);
        bind_socket_->send_inproc_connected (pending_connection_.endpoint.socket);
    }
    else
        pending_connection_.connect_pipe->send_bind (bind_socket_, pending_connection_.bind_pipe, false);

    if (pending_connection_.endpoint.options.recv_identity) {
        msg_t id;
        int rc = id.init_size (bind_options.identity_size);
        errno_assert (rc == 0);
        memcpy (id.data (), bind_options.identity, bind_options.identity_size);
        id.set_flags (msg_t::identity);
        bool written = pending_connection_.bind_pipe->write (&id);
        zmq_assert (written);
        pending_connection_.bind_pipe->flush ();
    }
}

connect_inproc_sockets操作主要是把pipe和socket_bast_t進行繫結。這裡需要注意identity,connect中如果判斷需要連線的socket_base_t不存在也會發送一條identity。所以bind socket建立之後如果不需要identity,要先讀出這條髒資料處理掉。

接下來看bind操作:

int zmq::socket_base_t::bind (const char *addr_)
{
    if (unlikely (ctx_terminated)) {
        errno = ETERM;
        return -1;
    }

    //  Process pending commands, if any.
    int rc = process_commands (0, false);
    if (unlikely (rc != 0))
        return -1;

    //  Parse addr_ string.
    std::string protocol;
    std::string address;
    if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
        return -1;

    if (protocol == "inproc") {
        const endpoint_t endpoint = { this, options };
        const int rc = register_endpoint (addr_, endpoint);
        if (rc == 0) {
            connect_pending (addr_, this);
            last_endpoint.assign (addr_);
        }
        return rc;
    }
}

bind操作首先呼叫register_endpoint:

int zmq::ctx_t::register_endpoint (const char *addr_,
        const endpoint_t &endpoint_)
{
    endpoints_sync.lock ();

    const bool inserted = endpoints.insert (
        endpoints_t::value_type (std::string (addr_), endpoint_)).second;

    endpoints_sync.unlock ();

    if (!inserted) {
        errno = EADDRINUSE;
        return -1;
    }
    return 0;
}

該方法比較簡單,用endpoints儲存所有繫結的地址。socket_base_t之後呼叫connect_pending:

void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
{
    endpoints_sync.lock ();

    std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_);

    for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
        connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);

    pending_connections.erase(pending.first, pending.second);
    endpoints_sync.unlock ();
}

connect_pending檢查是否之前是否有socket_base_t請求連線剛剛bind的地址,如果有,分別呼叫connect_inproc_sockets進行連線,該方法上面已經分析過,注意該方法的最後一個引數是判斷呼叫該方法的執行緒和bind_socket是否在一個執行緒內,已決定command是需要傳送到郵箱中還是直接呼叫對應的處理方法。

連線建立好之後,執行緒間就可以通過socket_base_t互相通訊了。

之前說過,除了inproc外,zmq還提供了很多其他的通訊模式:
ipc:主要用於程序間通訊
pgm/epgm :多路廣播
tipc:基於tipc協議的通訊
這幾種通訊模式用到的比較少,這裡不做詳細分析(其實我也沒細看這幾種的實現方式)。