1. 程式人生 > >ZeroMQ的訂閱釋出(publish-subscribe)模式

ZeroMQ的訂閱釋出(publish-subscribe)模式

轉 https://blog.csdn.net/cjf_wei/article/details/80036372

ZeroMQ的訂閱釋出模式是一種單向的資料釋出,當客戶端向服務端訂閱訊息之後,服務端便會將產生的訊息源源不斷的推送給訂閱者,本文的示例程式碼來源於文獻[1]示例程式碼的修改。

釋出-訂閱圖示 
這裡寫圖片描述 
釋出者使用PUB套接字將訊息傳送到佇列中,訂閱者使用SUB套接字從佇列中源源不斷的接收訊息。新的訂閱者可以隨時加入,但之前的訊息是無法接收到的;已有的訂閱者可以隨時退出;訂閱者還可以新增“過濾器”用來有選擇性的接收訊息。

使用方法簡介 
首先要建立一個上下文環境,然後使用它建立套接字:

void *context = zmq_ctx_new ();
  • 1

對於服務端來說,使用”ZMQ_PUB”建立socket,並且繫結到一個周知的埠,然後便可以不斷的廣播訊息了:

void *publisher = zmq_socket (context, ZMQ_PUB);
int rc = zmq_bind (publisher, "tcp://*:5556");
  • 1
  • 2

如果使用TCP連線並且訂閱者是慢速的,那麼訊息將在釋出方排隊;可以使用高水位標記(High-Water Marks,HWM)來定義緩衝區的大小,在ZeroMQ v2.x版本中HWM預設是無限制的,而在v3.x中預設情況下它是1000。對於PUB套接字,當到達HWM時,將丟棄資料。設定HWM引數:

zmq_setsockopt(publisher, ZMQ_SNDHWM, &nMaxNum, sizeof(nMaxNum));
  • 1

對於客戶端來說,要使用”ZMQ_SUB”建立socket,並且連結(zmq_connect)到待訂閱的服務端;此外,要想接收到服務推送的訊息,還必須使用zmq_setsockoptZMQ_SUBSCRIBE來配置該訂閱。zmq_setsockopt的ZMQ_SUBSCRIBE選項可以帶有一個”過濾器“,用以選擇性的接收來自服務端的訊息。該”過濾器”為空,則接收全部的訊息;”過濾器”還可以有多個,它們之間是”or”的關係,即接收滿足任一條件的訊息。當然也可以使用zmq_setsockopt配置選項ZMQ_UNSUBSCRIBE來取消訂閱,示例如下:

void *context = zmq_ctx_new ();
void *subscriber = zmq_socket (context, ZMQ_SUB);
int rc = zmq_connect (subscriber, "tcp://localhost:5556");

char filter1[] = "10001 "; 
char filter2[] = "20002 ";
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter1, strlen (filter1)); //接收訊息的字首為filter1的訊息
rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter2, strlen (filter2)); //接收訊息的字首為filter2的訊息
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

接收和傳送訊息:此處使用的方法是zmq_recv()和zmq_send(),相對於zmq_msg_send()和zmq_msg_recv(),它們會自己呼叫訊息傳送和接收的初始化方法等。

int zmq_recv (void *s, void *buf, size_t len, int flags);
int zmq_send (void *s, const void *buf, size_t len, int flags);
  • 1
  • 2

示例程式碼

//服務端:
#include <iostream>
#include <string>
#include <sstream>
#include <cstring>
#include <iomanip>
#include <cstdlib>
#include <ctime>
#include <assert.h>
#include <zmq.h>

using namespace std;

int main ()
{
    void *context = zmq_ctx_new ();
    void *publisher = zmq_socket (context, ZMQ_PUB);
    int rc = zmq_bind (publisher, "tcp://*:5556");
    assert (rc == 0);

    //  Initialize random number generator
    srand(time(0));
    while (1) {
        int zipcode, temperature, relhumidity;
        zipcode     = rand() % 100000;
        temperature = rand() % 215 - 80;
        relhumidity = rand() % 50 + 10;

        ostringstream os;
        os << setw(5) << setfill('0')<< zipcode <<" "
           << temperature <<" "<< relhumidity << "\n";

        zmq_send(publisher, os.str().c_str(), strlen(os.str().c_str()), 0);
    }
    zmq_close (publisher);
    zmq_ctx_destroy (context);
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

//客戶端:
#include <iostream>
#include <string>
#include <cstring>
#include <assert.h>
#include <zmq.h>

using namespace std;

int main (int argc, char *argv [])
{
    //  Socket to talk to server
    printf ("Collecting updates from weather server...\n");
    void *context = zmq_ctx_new ();
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://localhost:5556");
    assert (rc == 0);

    char filter1[] = "10001 ";
    char filter2[] = "20002 ";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter1, strlen (filter1));
    assert (rc == 0);
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter2, strlen (filter2));
    assert (rc == 0);

    //  Process 100 updates
    int size;
    char buffer [256];
    for (int update_nbr = 0; update_nbr < 100; update_nbr++) {

        memset(buffer, 0, 256*sizeof(char));
        size = zmq_recv (subscriber, buffer, 255, 0);
        if (size == -1){
            cout<< "receiver error , skip this message"<<endl;
            continue;
        }
        buffer[size] = '\0';
        cout << buffer <<endl;
    }

    zmq_close (subscriber);
    zmq_ctx_destroy (context);
    return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

NOTE: 
在文獻[1]中指出:

  • 在ZMQ_SUB套接字上執行相對於zmq_msg_send()和在ZMQ_PUB套接字上執行相對於和zmq_msg_recv()同樣都是錯誤的;
  • PUB-SUB具有”slow joiner”症狀。”slow joiner”的症狀是:即使先啟動訂閱者,稍等片刻再啟動釋出者,訂閱者也可能錯過釋出者傳送的第一條訊息。建立TCP連線需要花費時間,具體取決於網路狀況,以及主機到服務端的路由,所以即使多個訂閱者同時啟動,它們可能也不會收到同樣的訊息;
  • 訂閱者可以使用zmq_connect()同時連線到多個釋出者。不同釋出者推送的訊息將交錯到達;
  • 如果一個釋出者沒有任何訂閱者,那麼它會簡單地丟棄所有的訊息;
  • 從ZMQ v3.x開始,在使用連線的協議是tcp或者ipc時,過濾發生在釋出方。使用epgm協議,過濾發生在訂閱方。但在ZMQ v2.x版本中,所有過濾都發生在訂閱方。

[1].《ZeroMQ雲時代極速訊息通訊庫》.電子工業出版社,2015.