1. 程式人生 > >Linux下使用ZMQ實踐“請求-響應”非同步伺服器模型

Linux下使用ZMQ實踐“請求-響應”非同步伺服器模型

一、背景

    上一篇文章《Linux下使用ZMQ實踐“請求-響應”伺服器模型》中使用的是REP-REQ套件,該套件的特點是必須一個請求對應一個響應,如果在應用中不想使用同步處理的方式呢?ZMQ有沒有提供非同步處理的方法?答案是使用DEALER-ROUTER套件。

    另外如何在多執行緒中安全傳遞訊息的方法可以參見《Linux下使用ZMQ實踐“生產者-消費者”模型》ZMQ_PULL、ZMQ_PUSH的實踐。

二、相關知識

1、ZMQ_DEALER

       ZMQ_DEALER
           A socket of type ZMQ_DEALER is an advanced pattern used for extending request/reply sockets.
           Each message sent is round-robined among all connected peers, and each message received is
           fair-queued from all connected peers.

           When a ZMQ_DEALER socket enters the mute state due to having reached the high water mark for
           all peers, or if there are no peers at all, then any zmq_send(3) operations on the socket
           shall block until the mute state ends or at least one peer becomes available for sending;
           messages are not discarded.

           When a ZMQ_DEALER socket is connected to a ZMQ_REP socket each message sent must consist of
           an empty message part, the delimiter, followed by one or more body parts.

    DEALER是一種用於請求/答應模式的更高階的擴充套件Socket,它可以自由的收發訊息,沒有ZMQ_REP/ZMQ_REQ那樣的限制。對於每一個連線,接收訊息也是使用了公平佇列,傳送使用了迴圈佇列(RR)。

    ZMQ_DEALER受ZMQ_RCVHW和ZMQ_SNDHW兩個閥值影響(可通過zmq_setsockopt函式設定),一旦傳送或接收佇列達到閥值,Socket就會進入mute狀態,此時對DEALER的任何zmq_send操作都會阻塞,直到mute狀態結束。如果當前沒有有效的連線,zmq_send操作也會阻塞,直到有新的連線到來為止。DEALER發生阻塞並不會丟棄訊息。

    注意:如果ZMQ_DEALER連線到ZMQ_REP,每一個訊息包必須包含一個空幀,然後再緊跟著資料包體。

2、ZMQ_ROUTER

    ZMQ_ROUTER           
           A socket of type ZMQ_ROUTER is an advanced socket type used for extending request/reply
           sockets. When receiving messages a ZMQ_ROUTER socket shall prepend a message part containing
           the routing id of the originating peer to the message before passing it to the application.
           Messages received are fair-queued from among all connected peers. When sending messages a
           ZMQ_ROUTER socket shall remove the first part of the message and use it to determine the
           routing id of the peer the message shall be routed to. If the peer does not exist anymore,
           or has never existed, the message shall be silently discarded. However, if
           ZMQ_ROUTER_MANDATORY socket option is set to 1, the socket shall fail with EHOSTUNREACH in
           both cases.

    ZMQ_ROUTER是一種用於請求/答應模式的更高階的擴充套件Socket,它可以自由的收發訊息。當ZMQ_ROUTER接收到訊息時,會自動在訊息頂部加入來源地址識別符號,接收訊息使用了公平佇列。

    

    當傳送訊息時,ZMQ_ROUTER又會自動去掉這個識別符號,並且根據這個識別符號路由到相應的端點。

    如果此地址標識的端點不存在,預設會毫無徵兆的丟棄訊息,除非將ZMQ_ROUTER_MANDATORY 選項設定為1。

           When a ZMQ_ROUTER socket enters the mute state due to having reached the high water mark for
           all peers, then any messages sent to the socket shall be dropped until the mute state ends.
           Likewise, any messages routed to a peer for which the individual high water mark has been
           reached shall also be dropped. If, ZMQ_ROUTER_MANDATORY is set to 1, the socket shall block
           or return EAGAIN in both cases.
    當佇列達到閥值時,ZMQ_ROUTER Socket就會進入mute狀態,此時所有後續傳送到此Socket的訊息都會被丟棄,直到mute狀態結束。同樣的,如果對端的接收佇列達到了閥值,訊息也會被丟棄。但是如果設定了ZMQ_ROUTER_MANDATORY 選項,訊息不會丟棄,介面將等待發送完成後返回。

三、實踐

    程式碼基於REQ-REP版本進行修改,讀者可以通過ROUTER_DEALER巨集來看出與原來程式碼的區別;

    服務端主要是配合客戶端實現,程式碼修改不大,仍然保持著一次請求、一次響應的功能;

    需要注意的地方就是Router收到Dealer的訊息是包含標識頭部的(Dealer來源標識),傳送的時候需要使用sendmore把標識先發出去。

int main(int argc, char *argv[])
{
        void *ctx = zmq_ctx_new();
        server_master(ctx);
        zmq_ctx_destroy(ctx);
        exit(EXIT_SUCCESS);
}
static void *server_master(void *ctx)
{
        int ret = 0;
        char id[16] = {0};
        char request[1024];
        char respone[1024];

#ifdef DEALER_ROUTER
        void *server = zmq_socket(ctx, ZMQ_ROUTER);
#else
        void *server = zmq_socket(ctx, ZMQ_REP);
#endif

        s_set_id_ex(server, id, sizeof(id));
        zmq_bind(server, "ipc://server.ipc");
        zmq_pollitem_t items[] = {
                { server, 0, ZMQ_POLLIN, 0 },
        };

        LOGN("Server %s start\n", id);
        while (1) {
                ret = zmq_poll(items, 1 /* size */, 1000 /* ms */);
                assert(ret >= 0);

        if (items[0].revents & ZMQ_POLLIN) {
#ifdef DEALER_ROUTER
                        char peer[16] = {0};
                        s_recv(server, peer);
#endif
                        s_recv(server, request);
                        LOGN("Server %s recv: %s\n", id, request);

                        //TODO something handle
                        sleep(1);

#ifdef DEALER_ROUTER
                        s_sendmore(server, peer);
#endif
                        snprintf(respone, sizeof(respone), "%s-World", request);
                        s_send(server, respone);
                        LOGN("Server %s send: %s\n", id, respone);
                }
        }

        LOGN("Server %s Finish\n", id);
        zmq_close(server);
}

    客戶端的程式碼修改的比較多,主要是使用主執行緒推送請求,子執行緒中對請求進行響應處理,來達到非同步的目的(或者另一種實現就是在一個執行緒中zmq_poll統一處理)

int main(int argc, char *argv[])
{
        void *ctx = zmq_ctx_new();
        srandom(time(NULL));
#ifdef DEALER_ROUTER
        void *thread = zmq_threadstart(client_worker, ctx);
        client_master(ctx);
        zmq_threadclose(thread);
#else
        client_task(ctx);
#endif
        zmq_ctx_destroy(ctx);
        exit(EXIT_SUCCESS);
}
void client_master(void *ctx)
{
        int ix;
        int roll = randof(1000);
        char request[1024];
        void *pusher = zmq_socket(ctx, ZMQ_PUSH);

        zmq_connect(pusher, "inproc://client.inproc");

        for (ix = 0; ix < TEST_TIMES; ix++) {
                snprintf(request, sizeof(request), "Request-%03d-%03d", roll, ix);
                s_send(pusher, request);
        }

        zmq_close(pusher);
}

主執行緒通過 ZMQ_PUSH 與子執行緒的 ZMQ_PULL 進行對接,子執行緒再轉送給 ZMQ_DEALER 傳送出去;

由於子執行緒需要同時監聽兩個socket的收事件,所以使用了 zmq_poll 進行IO複用;

void client_worker(void *ctx)
{
        int ret = 0;
        int cnt = 0;
        char id[16] = {0};
        char request[1024];
        char respone[1024];
        void *puller = zmq_socket(ctx, ZMQ_PULL);
        void *dealer = zmq_socket(ctx, ZMQ_DEALER);
        
        s_set_id_ex(dealer, id, sizeof(id));
        zmq_connect(dealer, "ipc://server.ipc");
        zmq_bind(puller, "inproc://client.inproc");
        zmq_pollitem_t items[] = {
                { puller, 0, ZMQ_POLLIN, 0 },
                { dealer, 0, ZMQ_POLLIN, 0 }
        };
        LOGN("Client %s start\n", id);
        while (cnt < TEST_TIMES) {
                ret = zmq_poll(items, 2 /* size */, 1000 /* ms */);
                assert(ret >= 0);
        if (items[0].revents & ZMQ_POLLIN) {
                        s_recv(puller, request);
                        s_sendmore(puller, ""); // dealer
                        s_send(dealer, request);
                        LOGN("Client %s send: %s\n", id, request);
                }
        if (items[1].revents & ZMQ_POLLIN) {
                        s_recv(dealer, respone);
                        cnt++;
                        LOGN("Client %s recv: %s\n", id, respone);
                        //TODO something handle
                }
        }
        LOGN("Client %s finish\n", id);
        zmq_close(puller);
        zmq_close(dealer);
}

開啟兩個client(00FA與0064)、一個server,執行結果如下:

[ 1520703616.843 ]: Client 00FA start
[ 1520703616.844 ]: Client 00FA send: Request-251-000
[ 1520703616.844 ]: Client 00FA send: Request-251-001
[ 1520703616.844 ]: Client 00FA send: Request-251-002
[ 1520703616.844 ]: Client 00FA send: Request-251-003
[ 1520703616.844 ]: Client 00FA send: Request-251-004
[ 1520703616.844 ]: Client 00FA send: Request-251-005
[ 1520703616.844 ]: Client 00FA send: Request-251-006
[ 1520703616.844 ]: Client 00FA send: Request-251-007
[ 1520703616.844 ]: Client 00FA send: Request-251-008
[ 1520703616.844 ]: Client 00FA send: Request-251-009
[ 1520703622.523 ]: Client 00FA recv: Request-251-000-World
[ 1520703624.523 ]: Client 00FA recv: Request-251-001-World
[ 1520703626.525 ]: Client 00FA recv: Request-251-002-World
[ 1520703628.527 ]: Client 00FA recv: Request-251-003-World
[ 1520703630.528 ]: Client 00FA recv: Request-251-004-World
[ 1520703632.531 ]: Client 00FA recv: Request-251-005-World
[ 1520703634.534 ]: Client 00FA recv: Request-251-006-World
[ 1520703636.535 ]: Client 00FA recv: Request-251-007-World
[ 1520703638.544 ]: Client 00FA recv: Request-251-008-World
[ 1520703639.541 ]: Client 00FA recv: Request-251-009-World
[ 1520703639.541 ]: Client 00FA finish
[ 1520703617.217 ]: Client 0064 start
[ 1520703617.217 ]: Client 0064 send: Request-609-000
[ 1520703617.217 ]: Client 0064 send: Request-609-001
[ 1520703617.217 ]: Client 0064 send: Request-609-002
[ 1520703617.217 ]: Client 0064 send: Request-609-003
[ 1520703617.217 ]: Client 0064 send: Request-609-004
[ 1520703617.217 ]: Client 0064 send: Request-609-005
[ 1520703617.217 ]: Client 0064 send: Request-609-006
[ 1520703617.217 ]: Client 0064 send: Request-609-007
[ 1520703617.217 ]: Client 0064 send: Request-609-008
[ 1520703617.217 ]: Client 0064 send: Request-609-009
[ 1520703620.521 ]: Client 0064 recv: Request-609-000-World
[ 1520703621.521 ]: Client 0064 recv: Request-609-001-World
[ 1520703623.522 ]: Client 0064 recv: Request-609-002-World
[ 1520703625.524 ]: Client 0064 recv: Request-609-003-World
[ 1520703627.526 ]: Client 0064 recv: Request-609-004-World
[ 1520703629.527 ]: Client 0064 recv: Request-609-005-World
[ 1520703631.529 ]: Client 0064 recv: Request-609-006-World
[ 1520703633.531 ]: Client 0064 recv: Request-609-007-World
[ 1520703635.534 ]: Client 0064 recv: Request-609-008-World
[ 1520703637.537 ]: Client 0064 recv: Request-609-009-World
[ 1520703637.537 ]: Client 0064 finish
[ 1520703619.450 ]: Server 00C8 start
[ 1520703619.519 ]: Server 00C8 recv: Request-609-000
[ 1520703620.521 ]: Server 00C8 send: Request-609-000-World
[ 1520703620.521 ]: Server 00C8 recv: Request-609-001
[ 1520703621.521 ]: Server 00C8 send: Request-609-001-World
[ 1520703621.521 ]: Server 00C8 recv: Request-251-000
[ 1520703622.522 ]: Server 00C8 send: Request-251-000-World
[ 1520703622.522 ]: Server 00C8 recv: Request-609-002
[ 1520703623.522 ]: Server 00C8 send: Request-609-002-World
[ 1520703623.523 ]: Server 00C8 recv: Request-251-001
[ 1520703624.523 ]: Server 00C8 send: Request-251-001-World
[ 1520703624.524 ]: Server 00C8 recv: Request-609-003
[ 1520703625.524 ]: Server 00C8 send: Request-609-003-World
[ 1520703625.524 ]: Server 00C8 recv: Request-251-002
[ 1520703626.525 ]: Server 00C8 send: Request-251-002-World
[ 1520703626.525 ]: Server 00C8 recv: Request-609-004
[ 1520703627.526 ]: Server 00C8 send: Request-609-004-World
[ 1520703627.526 ]: Server 00C8 recv: Request-251-003
[ 1520703628.527 ]: Server 00C8 send: Request-251-003-World
[ 1520703628.527 ]: Server 00C8 recv: Request-609-005
[ 1520703629.527 ]: Server 00C8 send: Request-609-005-World
[ 1520703629.528 ]: Server 00C8 recv: Request-251-004
[ 1520703630.528 ]: Server 00C8 send: Request-251-004-World
[ 1520703630.529 ]: Server 00C8 recv: Request-609-006
[ 1520703631.529 ]: Server 00C8 send: Request-609-006-World
[ 1520703631.530 ]: Server 00C8 recv: Request-251-005
[ 1520703632.530 ]: Server 00C8 send: Request-251-005-World
[ 1520703632.531 ]: Server 00C8 recv: Request-609-007
[ 1520703633.531 ]: Server 00C8 send: Request-609-007-World
[ 1520703633.532 ]: Server 00C8 recv: Request-251-006
[ 1520703634.532 ]: Server 00C8 send: Request-251-006-World
[ 1520703634.533 ]: Server 00C8 recv: Request-609-008
[ 1520703635.533 ]: Server 00C8 send: Request-609-008-World
[ 1520703635.534 ]: Server 00C8 recv: Request-251-007
[ 1520703636.535 ]: Server 00C8 send: Request-251-007-World
[ 1520703636.536 ]: Server 00C8 recv: Request-609-009
[ 1520703637.537 ]: Server 00C8 send: Request-609-009-World
[ 1520703637.538 ]: Server 00C8 recv: Request-251-008
[ 1520703638.541 ]: Server 00C8 send: Request-251-008-World
[ 1520703638.541 ]: Server 00C8 recv: Request-251-009
[ 1520703639.541 ]: Server 00C8 send: Request-251-009-World

    從客戶端資訊來看,10個請求快速地就傳送出去了,然後再下來的20秒內,大約每2秒能夠獲取到一次響應;

    從服務端資訊上來看,對於兩個客戶端的處理基本是輪流地處理的;

四、總結

    應用可以通過 ROUTER-DEALER套件來實現非同步的“請求-響應" 處理,在訊息的處理過程中只要注意空幀的傳送、標識頭的處理就行;


參考文章:

[1] http://zguide.zeromq.org/page:all

    ZMQ_DEALER受ZMQ_RCVHW和ZMQ_SNDHW兩個閥值影響(可通過zmq_setsockopt函式設定),一旦傳送或接收佇列達到閥值,Socket就會進入mute狀態,此時對DEALER的任何zmq_send操作都會阻塞,直到mute狀態結束。如果當前沒有有效的連線,zmq_send操作也會阻塞,直到有新的連線到來為止。DEALER發生阻塞並不會丟棄訊息。