1. 程式人生 > >ZMQ原始碼分析(五) --TCP通訊

ZMQ原始碼分析(五) --TCP通訊

zmq支援tcp,inpro,ipc,pgm,epgm,tipc等通訊方式。只要在address中指定地址格式即可透明支援對應的通訊方式。這裡我們以最常用的tcp為例分析zmq資料在網路間傳輸的流程,這部分是zmq中最複雜也是最重的部分,在分析流程之前,先看一下tcp通訊需要用到的主要的類:

tcp_listener & tcp_connect

首先看一下tcp_listener_t 的程式碼:

    class tcp_listener_t : public own_t, public io_object_t
    {
    public:

        tcp_listener_t
(zmq::io_thread_t *io_thread_, zmq::socket_base_t *socket_, const options_t &options_); ~tcp_listener_t (); // Set address to listen on. int set_address (const char *addr_); // Get the bound address for use with wildcard int get_address (std::string
&addr_); private: // Handlers for incoming commands. void process_plug (); void process_term (int linger_); // Handlers for I/O events. void in_event (); // Close the listening socket. void close (); // Accept the new connection. Returns the file descriptor of the
// newly created connection. The function may return retired_fd // if the connection was dropped while waiting in the listen backlog // or was denied because of accept filters. fd_t accept (); // Address to listen on. tcp_address_t address; // Underlying socket. fd_t s; // Handle corresponding to the listening socket. handle_t handle; // Socket the listerner belongs to. zmq::socket_base_t *socket; // String representation of endpoint to bind to std::string endpoint; tcp_listener_t (const tcp_listener_t&); const tcp_listener_t &operator = (const tcp_listener_t&); };

tcp_listener_t主要用於監聽連線,tcp_listener_t繼承自own_tio_object_t,繼承own_t是因為他要管理子物件,通過tcp_listener_t建立的session_base_t都要由tcp_listener_t負責銷燬。所有在io_thread_t中物件都要繼承自io_object_t,包括之後要分析的tcp_connect,session_base,stream_engine等。當新建一個tcp_listener_t物件時,會為它選擇一個對應的io_thread_t,之後tcp_listener_t將監聽描述符加入到io_thread_t的poller中進行連線監聽。當有新的連線時呼叫in_event方法:

void zmq::tcp_listener_t::in_event ()
{
    fd_t fd = accept ();

    //  If connection was reset by the peer in the meantime, just ignore it.
    //  TODO: Handle specific errors like ENFILE/EMFILE etc.
    if (fd == retired_fd) {
        socket->event_accept_failed (endpoint, zmq_errno());
        return;
    }

    tune_tcp_socket (fd);
    tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);

    // remember our fd for ZMQ_SRCFD in messages
    socket->set_fd(fd);

    //  Create the engine object for this connection.
    stream_engine_t *engine = new (std::nothrow)
        stream_engine_t (fd, options, endpoint);
    alloc_assert (engine);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create and launch a session object.
    session_base_t *session = session_base_t::create (io_thread, false, socket,
        options, NULL);
    errno_assert (session);
    session->inc_seqnum ();
    launch_child (session);
    send_attach (session, engine, false);
    socket->event_accepted (endpoint, fd);
}

tcp_listener_t建立的session_base_t會話不需要和tcp_listener_t在一個執行緒中,但是需要有tcp_listener_t負責管理銷燬。

接下來是tcp_connect類:

   class tcp_connecter_t : public own_t, public io_object_t
    {
    public:

        //  If 'delayed_start' is true connecter first waits for a while,
        //  then starts connection process.
        tcp_connecter_t (zmq::io_thread_t *io_thread_,
            zmq::session_base_t *session_, const options_t &options_,
            address_t *addr_, bool delayed_start_);
        ~tcp_connecter_t ();

    private:

        //  ID of the timer used to delay the reconnection.
        enum {reconnect_timer_id = 1};

        //  Handlers for incoming commands.
        void process_plug ();
        void process_term (int linger_);

        //  Handlers for I/O events.
        void in_event ();
        void out_event ();
        void timer_event (int id_);

        //  Internal function to start the actual connection establishment.
        void start_connecting ();

        //  Internal function to add a reconnect timer
        void add_reconnect_timer();

        //  Internal function to return a reconnect backoff delay.
        //  Will modify the current_reconnect_ivl used for next call
        //  Returns the currently used interval
        int get_new_reconnect_ivl ();

        //  Open TCP connecting socket. Returns -1 in case of error,
        //  0 if connect was successfull immediately. Returns -1 with
        //  EAGAIN errno if async connect was launched.
        int open ();

        //  Close the connecting socket.
        void close ();

        //  Get the file descriptor of newly created connection. Returns
        //  retired_fd if the connection was unsuccessfull.
        fd_t connect ();

        //  Address to connect to. Owned by session_base_t.
        address_t *addr;

        //  Underlying socket.
        fd_t s;

        //  Handle corresponding to the listening socket.
        handle_t handle;

        //  If true file descriptor is registered with the poller and 'handle'
        //  contains valid value.
        bool handle_valid;

        //  If true, connecter is waiting a while before trying to connect.
        const bool delayed_start;

        //  True iff a timer has been started.
        bool timer_started;

        //  Reference to the session we belong to.
        zmq::session_base_t *session;

        //  Current reconnect ivl, updated for backoff strategy
        int current_reconnect_ivl;

        // String representation of endpoint to connect to
        std::string endpoint;

        // Socket
        zmq::socket_base_t *socket;

        tcp_connecter_t (const tcp_connecter_t&);
        const tcp_connecter_t &operator = (const tcp_connecter_t&);
    };

tcp_connecter和tcp_listerner相反,他是由session_base_t建立的並負責管理銷燬的,tcp_connecter_t也不會一直存在,當連線成功之後就會銷燬:

void zmq::tcp_connecter_t::out_event ()
{
    rm_fd (handle);
    handle_valid = false;

    const fd_t fd = connect ();
    //  Handle the error condition by attempt to reconnect.
    if (fd == retired_fd) {
        close ();
        add_reconnect_timer ();
        return;
    }

    tune_tcp_socket (fd);
    tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);

    // remember our fd for ZMQ_SRCFD in messages
    socket->set_fd (fd);

    //  Create the engine object for this connection.
    stream_engine_t *engine = new (std::nothrow)
        stream_engine_t (fd, options, endpoint);
    alloc_assert (engine);

    //  Attach the engine to the corresponding session object.
    send_attach (session, engine);

    //  Shut the connecter down.
    terminate ();

    socket->event_connected (endpoint, fd);
}

當tcp_connecter連線失敗時會設定一個定時器以便重新連線,這就使zmq支援在lbind之前進行connect依舊是可以成功的。

session_base

每一條tcp連線都需要一對應的session_base(inproc連線不需要,socket_base互相直接連線)。session_base是stream_engine和socket_base之間的紐帶,他和socket_base之間有一個pipe_t進行連線,當socket_base需要發出一條資料的時候就把msg寫入out管道,之後session_base通過stream_engine傳送出去;當stream_engine讀取到msg時session_base會把資料寫入到session_base的in管道。

    class session_base_t : public own_t,public io_object_t,public i_pipe_events
    {
    public:

        //  Create a session of the particular type.
        static session_base_t *create (zmq::io_thread_t *io_thread_,
            bool active_, zmq::socket_base_t *socket_,
            const options_t &options_, address_t *addr_);

        //  To be used once only, when creating the session.
        void attach_pipe (zmq::pipe_t *pipe_);

        //  Following functions are the interface exposed towards the engine.
        virtual void reset ();
        void flush ();
        void engine_error (zmq::stream_engine_t::error_reason_t reason);

        //  i_pipe_events interface implementation.
        void read_activated (zmq::pipe_t *pipe_);
        void write_activated (zmq::pipe_t *pipe_);
        void hiccuped (zmq::pipe_t *pipe_);
        void pipe_terminated (zmq::pipe_t *pipe_);

        //  Delivers a message. Returns 0 if successful; -1 otherwise.
        //  The function takes ownership of the message.
        int push_msg (msg_t *msg_);

        int zap_connect ();
        bool zap_enabled ();

        //  Fetches a message. Returns 0 if successful; -1 otherwise.
        //  The caller is responsible for freeing the message when no
        //  longer used.
        int pull_msg (msg_t *msg_);

        //  Receives message from ZAP socket.
        //  Returns 0 on success; -1 otherwise.
        //  The caller is responsible for freeing the message.
        int read_zap_msg (msg_t *msg_);

        //  Sends message to ZAP socket.
        //  Returns 0 on success; -1 otherwise.
        //  The function takes ownership of the message.
        int write_zap_msg (msg_t *msg_);

        socket_base_t *get_socket ();

    protected:

        session_base_t (zmq::io_thread_t *io_thread_, bool active_,
            zmq::socket_base_t *socket_, const options_t &options_,
            address_t *addr_);
        virtual ~session_base_t ();

    private:

        void start_connecting (bool wait_);

        void reconnect ();

        //  Handlers for incoming commands.
        void process_plug ();
        void process_attach (zmq::i_engine *engine_);
        void process_term (int linger_);

        //  i_poll_events handlers.
        void timer_event (int id_);

        //  Remove any half processed messages. Flush unflushed messages.
        //  Call this function when engine disconnect to get rid of leftovers.
        void clean_pipes ();

        //  If true, this session (re)connects to the peer. Otherwise, it's
        //  a transient session created by the listener.
        const bool active;

        //  Pipe connecting the session to its socket.
        zmq::pipe_t *pipe;

        //  Pipe used to exchange messages with ZAP socket.
        zmq::pipe_t *zap_pipe;

        //  This set is added to with pipes we are disconnecting, but haven't yet completed
        std::set <pipe_t *> terminating_pipes;

        //  This flag is true if the remainder of the message being processed
        //  is still in the in pipe.
        bool incomplete_in;

        //  True if termination have been suspended to push the pending
        //  messages to the network.
        bool pending;

        //  The protocol I/O engine connected to the session.
        zmq::i_engine *engine;

        //  The socket the session belongs to.
        zmq::socket_base_t *socket;

        //  I/O thread the session is living in. It will be used to plug in
        //  the engines into the same thread.
        zmq::io_thread_t *io_thread;

        //  ID of the linger timer
        enum {linger_timer_id = 0x20};

        //  True is linger timer is running.
        bool has_linger_timer;

        //  Protocol and address to use when connecting.
        address_t *addr;

        session_base_t (const session_base_t&);
        const session_base_t &operator = (const session_base_t&);
    };

session_base_t有一個變數active,它用來標記是否在process_plug中進行connecting操作,start_connecting操作中主要是建立一個tcp_connecter_t 並掛載tcp_connecter_t 作為自己的子物件。

void zmq::session_base_t::process_plug ()
{
    if (active)
        start_connecting (false);
}
void zmq::session_base_t::start_connecting (bool wait_)
{
    zmq_assert (active);

    //  Choose I/O thread to run connecter in. Given that we are already
    //  running in an I/O thread, there must be at least one available.
    io_thread_t *io_thread = choose_io_thread (options.affinity);
    zmq_assert (io_thread);

    //  Create the connecter object.

    if (addr->protocol == "tcp") {
        if (!options.socks_proxy_address.empty()) {
            address_t *proxy_address = new (std::nothrow)
                address_t ("tcp", options.socks_proxy_address);
            alloc_assert (proxy_address);
            socks_connecter_t *connecter =
                new (std::nothrow) socks_connecter_t (
                    io_thread, this, options, addr, proxy_address, wait_);
            alloc_assert (connecter);
            launch_child (connecter);
        }
        else {
            tcp_connecter_t *connecter = new (std::nothrow)
                tcp_connecter_t (io_thread, this, options, addr, wait_);
            alloc_assert (connecter);
            launch_child (connecter);
        }
        return;
    }
    //其他協議的對應處理程式碼省略
    .......
}

之前說過,session_base 和socket_base_t之間有一條傳送msg的管道,這個管道是在process_attach的時候建立的,但是如果socket_base進行connect操作,並且制定了option的immediate為非1,則在socket_base_t的connect中直接建立管道。

void zmq::session_base_t::process_attach (i_engine *engine_)
{
    zmq_assert (engine_ != NULL);

    //  Create the pipe if it does not exist yet.
    if (!pipe && !is_terminating ()) {
        object_t *parents [2] = {this, socket};
        pipe_t *pipes [2] = {NULL, NULL};

        bool conflate = options.conflate &&
            (options.type == ZMQ_DEALER ||
             options.type == ZMQ_PULL ||
             options.type == ZMQ_PUSH ||
             options.type == ZMQ_PUB ||
             options.type == ZMQ_SUB);

        int hwms [2] = {conflate? -1 : options.rcvhwm,
            conflate? -1 : options.sndhwm};
        bool conflates [2] = {conflate, conflate};
        int rc = pipepair (parents, pipes, hwms, conflates);
        errno_assert (rc == 0);

        //  Plug the local end of the pipe.
        pipes [0]->set_event_sink (this);

        //  Remember the local end of the pipe.
        zmq_assert (!pipe);
        pipe = pipes [0];

        //  Ask socket to plug into the remote end of the pipe.
        send_bind (socket, pipes [1]);
    }

    //  Plug in the engine.
    zmq_assert (!engine);
    engine = engine_;
    engine->plug (io_thread, this);
}

session_base在attach_pipe 操作中會將自己設定為管道的資料事件監聽物件,這樣當管道讀寫狀態發生變化時,session_base_t可以通知對應的engine。

void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
{
    zmq_assert (!is_terminating ());
    zmq_assert (!pipe);
    zmq_assert (pipe_);
    pipe = pipe_;
    pipe->set_event_sink (this);
}

void zmq::session_base_t::read_activated (pipe_t *pipe_)
{
    // Skip activating if we're detaching this pipe
    if (unlikely (pipe_ != pipe && pipe_ != zap_pipe)) {
        zmq_assert (terminating_pipes.count (pipe_) == 1);
        return;
    }

    if (unlikely (engine == NULL)) {
        pipe->check_read ();
        return;
    }

    if (likely (pipe_ == pipe))
        engine->restart_output ();
    else
        engine->zap_msg_available ();
}

stream_engine和session_base_t進行msg傳遞主要通過下面兩個方法,分別是從管道中讀資料給engine傳送以及受到msg寫入管道中。

int zmq::session_base_t::pull_msg (msg_t *msg_)
{
    if (!pipe || !pipe->read (msg_)) {
        errno = EAGAIN;
        return -1;
    }

    incomplete_in = msg_->flags () & msg_t::more ? true : false;

    return 0;
}

int zmq::session_base_t::push_msg (msg_t *msg_)
{
    if (pipe && pipe->write (msg_)) {
        int rc = msg_->init ();
        errno_assert (rc == 0);
        return 0;
    }

    errno = EAGAIN;
    return -1;
}

stream_engine & mechanism_t

stream_engine 是真正的和網路互動的類,他負責從網路層接受資料併發送資料到網路層。stream_engine繼承自i_engine, 採用狀態機模式實現,邏輯比較複雜,下面是stream_engine 的變數:

        //  Underlying socket.
        fd_t s;

        //  True iff this is server's engine.
        bool as_server;

        msg_t tx_msg; // 從session_base 中load出來的msg

        handle_t handle;

       //讀入資料的buffer和對應的解析器
        unsigned char *inpos;
        size_t insize;
        i_decoder *decoder;

       // 寫出資料的buffer和對應的編碼器
        unsigned char *outpos;
        size_t outsize;
        i_encoder *encoder;

        //  Metadata to be attached to received messages. May be NULL.
        metadata_t *metadata;

        //  When true, we are still trying to determine whether
        //  the peer is using versioned protocol, and if so, which
        //  version.  When false, normal message flow has started.
        bool handshaking; // 連線是否屬於握手階段

        static const size_t signature_size = 10; // 簽名的位置

        //  Size of ZMTP/1.0 and ZMTP/2.0 greeting message
        static const size_t v2_greeting_size = 12; //v2的握手協議大小

        //  Size of ZMTP/3.0 greeting message
        static const size_t v3_greeting_size = 64; //v3的握手協議大小

        //  Expected greeting size.
        size_t greeting_size; //當前設定的握手協議大小

        //  Greeting received from, and sent to peer
        unsigned char greeting_recv [v3_greeting_size];//握手資料接受快取
        unsigned char greeting_send [v3_greeting_size];//握手資料傳送快取

        //  Size of greeting received so far
        unsigned int greeting_bytes_read; //當前讀到的握手資料大小

        //  The session this engine is attached to.
        zmq::session_base_t *session;

        options_t options;

        // String representation of endpoint
        std::string endpoint;

        bool plugged;

        int (stream_engine_t::*next_msg) (msg_t *msg_);// 狀態機相關的函式指標

        int (stream_engine_t::*process_msg) (msg_t *msg_);// 狀態機相關的函式指標

        bool io_error;

        //  Indicates whether the engine is to inject a phantom
        //  subscription message into the incoming stream.
        //  Needed to support old peers.
        bool subscription_required;

        mechanism_t *mechanism; //安全機制

        //  True iff the engine couldn't consume the last decoded message.
        bool input_stopped;

        //  True iff the engine doesn't have any message to encode.
        bool output_stopped;

        //  ID of the handshake timer
        enum {handshake_timer_id = 0x40};

        //  True is linger timer is running.
        bool has_handshake_timer;

        // Socket
        zmq::socket_base_t *socket;

        std::string peer_address;

stream_engine需要用到一個編碼器和一個解碼器以及一個安全機制的mechanism_t類來對資料編碼解碼以及加密,解密。在進行msg傳遞之前,stream_engine需要和tcp連線的另一端進行握手,這是在tcp的三次握手之後zmq應用層自己的握手協議,握手過程需要傳送的資料主要包括greeting和handshake兩部分:

void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
    session_base_t *session_)
{
    zmq_assert (!plugged);
    plugged = true;

    //  Connect to session object.
    zmq_assert (!session);
    zmq_assert (session_);
    session = session_;
    socket = session-> get_socket ();

    //  Connect to I/O threads poller object.
    io_object_t::plug (io_thread_);
    handle = add_fd (s);
    io_error = false;

    if (options.raw_sock) {
        ........
    }
    else {
        // start optional timer, to prevent handshake hanging on no input
        set_handshake_timer ();

        //  Send the 'length' and 'flags' fields of the identity message.
        //  The 'length' field is encoded in the long format.
        outpos = greeting_send;
        outpos [outsize++] = 0xff;
        put_uint64 (&outpos [outsize], options.identity_size + 1);
        outsize += 8;
        outpos [outsize++] = 0x7f;
    }

    set_pollin (handle);
    set_pollout (handle);
    //  Flush all the data that may have been already received downstream.
    in_event ();
}

首先,在stream_engine的pulg方法中,會把outpos指向 greeting_send,greeting_send是握手協議的資料快取,plug首先向greeting_send中寫入十位元組作為signature,之後會呼叫in_event事件,如果此時沒有資料讀入則等待poller的回撥。在in_event中如果判斷當前屬於握手狀態,則直接進入handshake方法:

bool zmq::stream_engine_t::handshake ()
{
    zmq_assert (handshaking);
    zmq_assert (greeting_bytes_read < greeting_size);
    //  Receive the greeting.
    while (greeting_bytes_read < greeting_size) {
        const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
                                greeting_size - greeting_bytes_read);
        if (n == 0) {
            error (connection_error);
            return false;
        }
        if (n == -1) {
            if (errno != EAGAIN)
                error (connection_error);
            return false;
        }

        greeting_bytes_read += n;

        //  We have received at least one byte from the peer.
        //  If the first byte is not 0xff, we know that the
        //  peer is using unversioned protocol.
        if (greeting_recv [0] != 0xff)
            break;

        if (greeting_bytes_read < signature_size)
            continue;

        //  Inspect the right-most bit of the 10th byte (which coincides
        //  with the 'flags' field if a regular message was sent).
        //  Zero indicates this is a header of identity message
        //  (i.e. the peer is using the unversioned protocol).
        if (!(greeting_recv [9] & 0x01))
            break;

        //  The peer is using versioned protocol.
        //  Send the major version number.
        if (outpos + outsize == greeting_send + signature_size) {
            if (outsize == 0)
                set_pollout (handle);
            outpos [outsize++] = 3;     //  Major version number
        }

        if (greeting_bytes_read > signature_size) {
            if (outpos + outsize == greeting_send + signature_size + 1) {
                if (outsize == 0)
                    set_pollout (handle);

                //  Use ZMTP/2.0 to talk to older peers.
                if (greeting_recv [10] == ZMTP_1_0
                ||  greeting_recv [10] == ZMTP_2_0)
                    outpos [outsize++] = options.type;
                else {
                    outpos [outsize++] = 0; //  Minor version number
                    memset (outpos + outsize, 0, 20);

                    zmq_assert (options.mechanism == ZMQ_NULL
                            ||  options.mechanism == ZMQ_PLAIN
                            ||  options.mechanism == ZMQ_CURVE
                            ||  options.mechanism == ZMQ_GSSAPI);

                    if (options.mechanism == ZMQ_NULL)
                        memcpy (outpos + outsize, "NULL", 4);
                    else
                    if (options.mechanism == ZMQ_PLAIN)
                        memcpy (outpos + outsize, "PLAIN", 5);
                    else
                    if (options.mechanism == ZMQ_GSSAPI)
                        memcpy (outpos + outsize, "GSSAPI", 6);
                    else
                    if (options.mechanism == ZMQ_CURVE)
                        memcpy (outpos + outsize, "CURVE", 5);
                    outsize += 20;
                    memset (outpos + outsize, 0, 32);
                    outsize += 32;
                    greeting_size = v3_greeting_size;
                }
            }
        }
    }

    //  Position of the revision field in the greeting.
    const size_t revision_pos = 10;

    //  Is the peer using ZMTP/1.0 with no revision number?
    //  If so, we send and receive rest of identity message
    if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
        if (session->zap_enabled ()) {
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

        encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
        alloc_assert (encoder);

        decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
        alloc_assert (decoder);

        //  We have already sent the message header.
        //  Since there is no way to tell the encoder to
        //  skip the message header, we simply throw that
        //  header data away.
        const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
        unsigned char tmp [10], *bufferp = tmp;

        //  Prepare the identity message and load it into encoder.
        //  Then consume bytes we have already sent to the peer.
        const int rc = tx_msg.init_size (options.identity_size);
        zmq_assert (rc == 0);
        memcpy (tx_msg.data (), options.identity, options.identity_size);
        encoder->load_msg (&tx_msg);
        size_t buffer_size = encoder->encode (&bufferp, header_size);
        zmq_assert (buffer_size == header_size);

        //  Make sure the decoder sees the data we have already received.
        inpos = greeting_recv;
        insize = greeting_bytes_read;

        //  To allow for interoperability with peers that do not forward
        //  their subscriptions, we inject a phantom subscription message
        //  message into the incoming message stream.
        if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
            subscription_required = true;

        //  We are sending our identity now and the next message
        //  will come from the socket.
        next_msg = &stream_engine_t::pull_msg_from_session;

        //  We are expecting identity message.
        process_msg = &stream_engine_t::process_identity_msg;
    }
    else
    if (greeting_recv [revision_pos] == ZMTP_1_0) {
        if (session->zap_enabled ()) {
           // reject ZMTP 1.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

        encoder = new (std::nothrow) v1_encoder_t (
            out_batch_size);
        alloc_assert (encoder);

        decoder = new (std::nothrow) v1_decoder_t (
            in_batch_size, options.maxmsgsize);
        alloc_assert (decoder);
    }
    else
    if (greeting_recv [revision_pos] == ZMTP_2_0) {
        if (session->zap_enabled ()) {
           // reject ZMTP 2.0 connections if ZAP is enabled
           error (protocol_error);
           return false;
        }

        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
        alloc_assert (encoder);

        decoder = new (std::nothrow) v2_decoder_t (
            in_batch_size, options.maxmsgsize);
        alloc_assert (decoder);
    }
    else {
        encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
        alloc_assert (encoder);

        decoder = new (std::nothrow) v2_decoder_t (
            in_batch_size, options.maxmsgsize);
        alloc_assert (decoder);

        if (options.mechanism == ZMQ_NULL
        &&  memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
            mechanism = new (std::nothrow)
                null_mechanism_t (session, peer_address, options);
            alloc_assert (mechanism);
        }
        else
        if (options.mechanism == ZMQ_PLAIN
        &&  memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
            if (options.as_server)
                mechanism = new (std::nothrow)
                    plain_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow)
                    plain_client_t (options);
            alloc_assert (mechanism);
        }
#ifdef HAVE_LIBSODIUM
        else
        if (options.mechanism == ZMQ_CURVE
        &&  memcmp (greeting_recv + 12, "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
            if (options.as_server)
                mechanism = new (std::nothrow)
                    curve_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow) curve_client_t (options);
            alloc_assert (mechanism);
        }
#endif
#ifdef HAVE_LIBGSSAPI_KRB5
        else
        if (options.mechanism == ZMQ_GSSAPI
        &&  memcmp (greeting_recv + 12, "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
            if (options.as_server)
                mechanism = new (std::nothrow)
                    gssapi_server_t (session, peer_address, options);
            else
                mechanism = new (std::nothrow) gssapi_client_t (options);
            alloc_assert (mechanism);
        }
#endif
        else {
            error (protocol_error);
            return false;
        }
        next_msg = &stream_engine_t::next_handshake_command;
        process_msg = &stream_engine_t::process_handshake_command;
    }

    // Start polling for output if necessary.
    if (outsize == 0)
        set_pollout (handle);

    //  Handshaking was successful.
    //  Switch into the normal message flow.
    handshaking = false;

    if (has_handshake_timer) {
        cancel_timer (handshake_timer_id);
        has_handshake_timer = false;
    }

    return true;

handshake方法首先判斷第一個位元組是不是0xff,如果是說明對方傳送的資料是zmtp協議,則等待10位元組的signature全部讀入,之後在輸出快取寫入3代表本方的zmtp版本為3.0。之後需要讀入對方的版很好,這裡zmq向前相容,新舊版本之間可以相互通訊。假如雙發都是zmtp3.0,則需要向輸出快取中寫入對應的認證機制, 並且初始化v2_encoder_t 和v2_decoder_t 作為編碼器和解碼器。zmq4中加入了安全認證和加密機智,目前zmq提供ZMQ_NULL,ZMQ_PLAIN,ZMQ_CURVE,ZMQ_GSSAPI四種認證機制,我們之後的分析是以最簡單的ZMQ_NULL為例,其他認證機制流程基本相似,只是認證方式不同。greeting在zmtp3.0中一共有64位元組大小。在greeting階段結束後,handshaking標記設為false,但是握手還沒有完成。接下來的握手和之後的資料傳送stream_engine都是由狀態機來完成的,狀態機的核心是兩個指標

        int (stream_engine_t::*next_msg) (msg_t *msg_);

        int (stream_engine_t::*process_msg) (msg_t *msg_);

這兩個指標分別指向能夠獲取下一條msg的方法和能夠處理下一條msg的方法,至於獲取到的msg怎麼傳送出去和怎樣從網路獲取msg會在之後分析編碼器和解碼器時詳細分析。
在handshake方法的最後,這兩個函式指標已經被設定為:

        next_msg = &stream_engine_t::next_handshake_command;
        process_msg = &stream_engine_t::process_handshake_command;

next_handshake_command的實現如下,還是使用ZMQ_NULL的認證方式為例:

int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);

    if (mechanism->status () == mechanism_t::ready) {
        mechanism_ready ();
        return pull_and_encode (msg_);
    }
    else
    if (mechanism->status () == mechanism_t::error) {
        errno = EPROTO;
        return -1;
    }
    else {
        const int rc = mechanism->next_handshake_command (msg_);
        if (rc == 0)
            msg_->set_flags (msg_t::command);
        return rc;
    }
}

首先next_handshake_command 呼叫mechanism的next_handshake_command 方法獲取command資料:

int zmq::null_mechanism_t::next_handshake_command (msg_t *msg_)
{
    if (ready_command_sent || error_command_sent) {
        errno = EAGAIN;
        return -1;

    //zap 方式的處理省略
    ....

    unsigned char *const command_buffer = (unsigned char *) malloc (512);
    alloc_assert (command_buffer);

    unsigned char *ptr = command_buffer;

    //  Add mechanism string
    memcpy (ptr, "\5READY", 6);
    ptr += 6;

    //  Add socket type property
    const char *socket_type = socket_type_string (options.type);
    ptr += add_property (ptr, "Socket-Type", socket_type, strlen (socket_type));

    //  Add identity property
    if (options.type == ZMQ_REQ
    ||  options.type == ZMQ_DEALER
    ||  options.type == ZMQ_ROUTER)
        ptr += add_property (ptr, "Identity", options.identity, options.identity_size);

    const size_t command_size = ptr - command_buffer;
    const int rc = msg_->init_size (command_size);
    errno_assert (rc == 0);
    memcpy (msg_->data (), command_buffer, command_size);
    free (command_buffer);

    ready_command_sent = true;

    return 0;
}

改command型別的msg主要包括一個ready字串和Socket-Type和Identity兩種屬性,next_handshake_command 獲取到該msg之後則交給編碼器傳送出去。next_handshake_command 的下一次呼叫會返回-1,標記commad已經發送出去,等待對方的command協議。此時out_event沒有資料處理,會呼叫reset_pollout (handle) 暫停寫出事件,等待對方命令。之後當解碼器收到一條msg時,則呼叫process_handshake_command:

int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
{
    zmq_assert (mechanism != NULL);
    const int rc = mechanism->process_handshake_command (msg_);
    if (rc == 0) {
        if (mechanism->status () == mechanism_t::ready)
            mechanism_ready ();
        else
        if (mechanism->status () == mechanism_t::error) {
            errno = EPROTO;
            return -1;
        }

        //此時可能由於在等待mechanism_t::ready狀態,output已經停止寫資料,並且不能用session的read_activated來啟用output,因為此時session和socket之間的管道可能不是睡眠狀態的,所以只能手動啟用一次
        if (output_stopped)
            restart_output ();
    }

    return rc;
}

process_handshake_command 呼叫mechanis 的process_handshake_command ,ZMQ_NULL機智會在該方法中校驗socket_type並且儲存對方的identity。當握手命令傳送出去並且對方的握手命令也接受處理之後,mechanism 處於ready狀態。此時無論是process_handshake_command和next_handshake_command誰先呼叫都會進入到mechanism_ready 方法中。這裡process_handshake_command 和 next_handshake_command 沒有先後順序規定哪個必須先處理,是無狀態的。

void zmq::stream_engine_t::mechanis_ready ()
{
    if (options.recv_identity) {
        msg_t identity;
        mechanism->peer_identity (&identity);
        const int rc = session->push_msg (&identity);
        if (rc == -1 && errno == EAGAIN) {
            // If the write is failing at this stage with
            // an EAGAIN the pipe must be being shut down,
            // so we can just bail out of the identity set.
            return;
        }
        errno_assert (rc == 0);
        session->flush ();
    }

    next_msg = &stream_engine_t::pull_and_encode;
    process_msg = &stream_engine