1. 程式人生 > >zmq筆記一: 對象關系

zmq筆記一: 對象關系

strong rep 其中 base tex 結束 基本 發送消息 ray

int major, minor, patch;
zmq_version(&major, &minor, &patch); //4.2.0

本文主要是分析代碼,方便自己日後查閱.

=========================================

1.上下文對象以及socket對象創建

void *context = zmq_ctx_new(); //創建上下文對象
void *responder = zmq_socket(context, ZMQ_REP); //創建socket類型的對象 zmq::socket_base_t *s = ctx->create_socket (type_);
int rc = zmq_bind(responder, "tcp://*:6666"); //綁定端口

一般一個進程只有一個context對象,它管理著所有socket對象. context是線程安全的,可以在多線程之間傳遞使用,但是socket對象不是線程安全.

zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
    slot_sync.lock ();
    if (unlikely (starting)) {

        starting = false;
        //  Initialise the array of mailboxes. Additional three slots are for
        //  zmq_ctx_term thread and reaper thread.
        opt_sync.lock ();
        int mazmq = max_sockets;//默認值
        int ios = io_thread_count; //最大io線程數量,默認只有1個
        opt_sync.unlock ();
        slot_count = mazmq + ios + 2; //slot_count決定了郵箱數量, 2這個數字是指下面的 term_tid + reaper_tid 兩個mailbox的數組占坑位置
        slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
        alloc_assert (slots);

        //  Initialise the infrastructure for zmq_ctx_term thread.
        slots [term_tid] = &term_mailbox; //zmq終結處理時的唯一的一個郵箱

        //  Create the reaper thread.
        reaper = new (std::nothrow) reaper_t (this, reaper_tid); //收割線程,回收結束使用的socket
        alloc_assert (reaper);
        slots [reaper_tid] = reaper->get_mailbox ();//把線程的郵箱放在全局郵箱管理
        reaper->start ();

        //  Create I/O thread objects and launch them.
        for (int i = 2; i != ios + 2; i++) {//除終結處理和回收線程郵箱外,還要開啟一共ios個io線程
            io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
            alloc_assert (io_thread);
            io_threads.push_back (io_thread); //將所有開啟的io線程統一管理
            slots [i] = io_thread->get_mailbox (); //線程的郵箱放在全局郵箱管理
            io_thread->start ();
        }

        //  In the unused part of the slot array, create a list of empty slots.
        for (int32_t i = (int32_t) slot_count - 1;
              i >= (int32_t) ios + 2; i--) {
            empty_slots.push_back (i); //還沒被使用的郵箱占位
            slots [i] = NULL;
        }
    }

    //  Once zmq_ctx_term() was called, we can‘t create new sockets.
    if (terminating) {
        slot_sync.unlock ();
        errno = ETERM;
        return NULL;
    }

    //  If max_sockets limit was reached, return error.
    if (empty_slots.empty ()) {
        slot_sync.unlock ();
        errno = EMFILE;
        return NULL;
    }

    //  Choose a slot for the socket.
    uint32_t slot = empty_slots.back (); //當前新建的socket類型占一個郵箱位置
    empty_slots.pop_back ();

    //  Generate new unique socket ID.
    int sid = ((int) max_socket_id.add (1)) + 1; //原子遞增的socket id

    //  Create the socket and register its mailbox.
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid); //創建當前類型socket對象,並初始化它對應的郵箱
    if (!s) {
        empty_slots.push_back (slot);
        slot_sync.unlock ();
        return NULL;
    }
    sockets.push_back (s); //所有創建的socket對象統一管理
    slots [slot] = s->get_mailbox (); //對應占坑

    slot_sync.unlock ();
    return s;
}

每一種socket都有一個與之對應的對象類,創建時需要綁定context,郵箱位置,socket id 等.

zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
    uint32_t tid_, int sid_)
{
    socket_base_t *s = NULL;
    switch (type_) {
        case ZMQ_PAIR:
            s = new (std::nothrow) pair_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUB:
            s = new (std::nothrow) pub_t (parent_, tid_, sid_);
            break;
        case ZMQ_SUB:
            s = new (std::nothrow) sub_t (parent_, tid_, sid_);
            break;
        case ZMQ_REQ:
            s = new (std::nothrow) req_t (parent_, tid_, sid_);
            break;
        case ZMQ_REP:
            s = new (std::nothrow) rep_t (parent_, tid_, sid_);
            break;
      ......
        case ZMQ_GATHER:
            s = new (std::nothrow) gather_t (parent_, tid_, sid_);
            break;
        case ZMQ_SCATTER:
            s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
            break;
        default:
            errno = EINVAL;
            return NULL;
    }

    alloc_assert (s);

    if (s->mailbox == NULL) {
        s->destroyed = true;
        LIBZMQ_DELETE(s);
        return NULL;
    }

    return s;
}

object_t 類對象繼承關系圖:( zmq版本:4.2.0, 生成工具:doxygen,graphviz )

技術分享

2.線程與通信

I / O線程(io_thread_t)是ZMQ異步處理網絡IO的後臺線程。io_thread_t實現繼承object_t ,並實現 i_poll_events 接口,其內部包含一個郵箱(mailbox_t)和一個poller對象(poller_t)。

class io_thread_t : public object_t, public i_poll_events{
 ...
}
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
    object_t (ctx_, tid_)
{
    poller = new (std::nothrow) poller_t (*ctx_); //註意,poller_t是根據操作系統來定義的,typedef select_t/poll_t/epoll_t, 本文在windows操作系統分析,用的是select_t,基本邏輯大同小異
    alloc_assert (poller);

    mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
    poller->set_pollin (mailbox_handle);
}

void zmq::io_thread_t::start ()
{
    //  Start the underlying I/O thread.
    poller->start ();
}
void zmq::select_t::start ()
{
    ctx.start_thread (worker, worker_routine, this);
}
void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
{
    thread_.start(tfn_, arg_);
    thread_.setSchedulingParameters(thread_priority, thread_sched_policy);
}
void zmq::select_t::worker_routine (void *arg_)
{
    ((select_t*) arg_)->loop ();
}

void zmq::select_t::loop () //io線程循環是在poller_t(typedef select_t poller_t)的loop函數
{
     while (!stopping) {
     ....
   }
}

  

poller_t 是從不同操作系統提供的事件通知機制中抽象出來的概念,用來通知描述符和計時器事件,poller_t 通過 typedef定義為操作系統首選的通知機制(select_t/poll_t/epoll_t 等)。所有運行在 io_thread_t上的對象都繼承自輔助類 io_object_t,該類實現了向io_thread_t註冊/刪除文件描述符 (add_fd/rm_fd)和計時器(add_timer/cancel_timer)事件的功能,同時io_object_t 還繼承了 i_poll_events 接口來實現事件回調功能。i_poll_events 接口定義了文件描述符和計時器事件就緒時的回調處理函數(in_event/out_event/timer_event)。io_thread_t 實現此接口(in_event)來處理來自mailbox的事件。

繼承object_t使得io_thread_t能夠發送和接收command,mailbox_t 用來存儲發送給任何居住在io_thread_t 上的object_t 的命令,每個io_thread_t 上有多個對象,這些對象公用同一個郵箱,郵箱的收件人就是對象。mailbox_t本質是一個具有就緒通知功能的存儲命令的隊列。就緒通知機制由signaler_t提供的文件描述符實現。隊列是由ypipe_t實現的無鎖無溢出隊列。當mailbox_t事件觸發時,io線程從mailbox中獲取命令,並讓命令的接收者進行處理。

io線程之間的通信是通過發命令消息(command_t)到對方的mailbox,而socket_base_t實例與session的消息通信則通過發送消息對象msg_t.

ZMQ內部使用兩種不同類型的線程(擁有郵箱的對象):I/O線程(io_thread_t)和socket(socket_base_t). 其中io線程,像reaper_t、io_thread_t都屬於這一類,這類線程的特點就是內含一個輪詢器poller及mailbox_t,通過poller可以監聽激活mailbox_t的信號 ;另一類是zmq的socket,所有socket_base_t實例化的對象都可以看做一個單獨的線程,這類線程不含poller,但同樣含有一個mailbox_t,可以用於收發命令; 由於不含poller,只能在每次使用socket_base_t實例的時候先處理一下mailbox_t,看是否有命令需要處理(process_commands函數), 例如:

int zmq::socket_base_t::send (msg_t *msg_, int flags_)
{
    ENTER_MUTEX ();

    ......

    //  Process pending commands, if any.
    int rc = process_commands (0, true); //處理一次郵箱裏的命令
    if (unlikely (rc != 0)) {
        EXIT_MUTEX ();
        return -1;
    }

    //  Clear any user-visible flags that are set on the message.
    msg_->reset_flags (msg_t::more);

    //  At this point we impose the flags on the message.
    if (flags_ & ZMQ_SNDMORE)
        msg_->set_flags (msg_t::more);

    msg_->reset_metadata ();

    //  Try to send the message using method in each socket class
    rc = xsend (msg_);
    ......
    while (true) {
        if (unlikely (process_commands (timeout, false) != 0)) {//再次處理郵箱裏的命令
            EXIT_MUTEX ();
            return -1;
        }
        rc = xsend (msg_);
        ......
    return 0;
}

i_poll_events 類對象繼承關系圖:( zmq版本:4.2.0, 生成工具:doxygen,graphviz )

技術分享

參考文章:

http://www.cnblogs.com/zengzy/p/5132437.html

http://watter1985.iteye.com/blog/1736023

zmq筆記一: 對象關系