RabbitMQ C++客戶端 RabbitMQ Client for Windows 超簡單接收發送介面(多執行緒版)-最近更新2018-05-21
阿新 • • 發佈:2019-01-05
該庫特點:
該介面本身不支援多執行緒,也就是說多個執行緒不能同時訪問一個介面物件。
支援多執行緒:
但是,由於該介面是值語義,所以在多執行緒場景中使用沒有任何問題。
下面的例子就展示了多執行緒環境下程式的Demo和結果。
要完成的功能:
1 A執行緒傳送0-99這一百個整數到RabbitMQ佇列;
2 同時B執行緒傳送0-99這一百個整數到RabbitMQ佇列;
3 C執行緒(一秒鐘之後)一次性取回來A B傳送的所有資料;
4 D執行緒(兩秒之後)檢查C執行緒取回的資料是否完整
如有需要:掃碼我的部落格頭像加我即可(還是要先說明一下,該資源時付費資源)
#include <iostream> #include <string> #include <vector> #include "MessageQueue/RabbitMQ.h" #include "MessageQueue/MessageBody.h" #include <ctime> #include <set> //#include <vld.h> using namespace std; #include <Windows.h> //--------------------------需要開發者自己填寫的資訊begin-------------------------------------- //服務端的一個Exchange,客戶端傳送的時候,如果沒有則會建立;客戶端取的時候,如果沒有則取不到資料 CExchange my_test_exchange("MyTestExchange"); //服務端的一個訊息佇列,客戶端傳送的時候,如果沒有則會建立;客戶端取的時候,如果沒有則取不到資料 CQueue my_test_queue("MyTestQueue"); //服務端Exchange與訊息佇列之間的繫結key,客戶端傳送的時候,如果沒有則不能傳送;客戶端取的時候,如果沒有則取不到資料 string my_test_bind_key("MyTestBindKey"); //RabbitMQ服務端IP string rabbitmq_server_ip = "localhost"; //RabbitMQ服務端埠 int rabbitmq_server_port = 5672; //RabbitMQ服務端登入使用者名稱 string rabbitmq_server_user = "guest"; //RabbitMQ服務端登入密碼 string rabbitmq_server_psw = "guest"; //--------------------------需要開發者自己填寫的資訊end-------------------------------------- //--------------------------建議開發者設定批量操作begin-------------------------------------- //建議將此值設定成500,或者1000.這樣可以節省時間和網路資源。沒有必要一條一條的傳送或者取 int test_send_number = 100; int test_get_number = 200; //--------------------------建議開發者設定批量操作end-------------------------------------- //引用全域性變數 std::multiset<int> MySet; //傳送資料到RabbitMQ服務的全部過程如下: //!!!每次publish之前必須呼叫Connect,publish之後呼叫Disconnect!!! DWORD WINAPI send_message_to_mq(LPVOID lpParameter) { vector<CMessage> message_array; CMessage message(""); //製作test_send_number個訊息,用於傳送到RabbitMQ伺服器 for (int i=0;i<test_send_number;++i) { //這裡建立你要傳送的資料,放到CMessage的m_data成員變數中,從服務端取的時候也是一樣從這個成員中取string message.m_data = std::to_string((long long)i); message_array.push_back(message); } string& err = GetErr(); //RabbitMQ伺服器的連線資訊,這裡改成你自己的IP/PORT CRabbitMQ pro(rabbitmq_server_ip,rabbitmq_server_port,rabbitmq_server_user,rabbitmq_server_psw); if(pro.Connect(err)<0) { cout<<"連線失敗!"<<endl; return -1; } else { cout<<"連線成功!"<<endl; } //宣告一個交換機,宣告一次,下次不需要再次宣告 if(pro.exchange_declare(my_test_exchange, err) < 0) { cout<<"宣告交換機失敗!"<<endl; return -1; } else { cout<<"宣告交換機成功!"<<endl; } //宣告一個佇列,宣告一次,下次不需要再次宣告 if( (pro.queue_declare(my_test_queue, err) < 0) ) { cout<<"宣告佇列失敗!"<<endl; return -1; } else { cout<<"宣告佇列成功!"<<endl; } //將交換機繫結到佇列, 繫結一次,後面不需要再繫結 if( (pro.queue_bind(my_test_queue,my_test_exchange,my_test_bind_key, err)<0) ) { cout<<"繫結佇列失敗!"<<endl; return -1; } else { cout<<"繫結佇列成功!"<<endl; } string my_test_root_key=my_test_bind_key; //傳送資料到RabbitMQ伺服器,可以反覆呼叫從而實現連續傳送資料 int flag = pro.publish(message_array,my_test_root_key,err); if(flag<0) { cout<<"投遞訊息到MQ失敗"<<endl; } else { cout<<"投遞訊息到MQ成功:"<<endl; for (vector<CMessage>::iterator itr = message_array.begin(); itr != message_array.end(); ++itr) { cout<<(*itr).m_data<<" "; } cout<<endl; } pro.__sleep(10); pro.Disconnect(); return 0; } //從RabbitMQ取資料的全部過程如下 //!!!每次consumer之前必須呼叫Connect,publish之後呼叫Disconnect!!! DWORD WINAPI get_message_from_mq(LPVOID lpParameter) { vector<CMessage> message_array; string& err = GetErr(); //RabbitMQ伺服器的連線資訊,這裡改成你自己的IP/PORT CRabbitMQ pro(rabbitmq_server_ip,rabbitmq_server_port,rabbitmq_server_user,rabbitmq_server_psw); pro.__sleep(1000);//等待生產者傳送完畢再取 if(pro.Connect(err)<0) { cout<<"取訊息時連線失敗!"<<endl; return -1; } else { cout<<"取訊息時連線成功!"<<endl; } message_array.clear(); //一次性取1000個訊息,不足1000也無妨,有多少取到多少 int get_number= test_get_number; ::timeval tvb={0,10}; //從RabbitMQ伺服器取訊息 if(pro.consumer(my_test_queue,message_array,get_number,&tvb,err)<0) { cout<<"取訊息失敗!"<<endl; } else { cout<<"取訊息成功!取到了"<<message_array.size()<<"個訊息:"<<endl; for (int i=0;i<message_array.size();i++) { cout<<message_array[i].m_data<<" "; MySet.insert(std::stoi(message_array[i].m_data)); } cout<<endl; } pro.__sleep(10); pro.Disconnect(); } DWORD WINAPI test_assert_success(LPVOID lpParameter) { CRabbitMQ pro; pro.__sleep(2000); cout<<"全部資料(驗證是否丟失):"<<endl; for (auto itr = MySet.begin(); itr != MySet.end(); ++itr) { cout<<*itr<<" "; } cout<<endl; return 0; } int main(int ,char**) { cout<<"Hello UseMQ!"<<endl; //建立執行緒 HANDLE t1 = CreateThread(NULL, 0, send_message_to_mq, 0, 0, NULL); HANDLE t2 = CreateThread(NULL, 0, send_message_to_mq, 0, 0, NULL); HANDLE t3 = CreateThread(NULL, 0, get_message_from_mq, 0, 0, NULL); HANDLE t4 = CreateThread(NULL, 0, test_assert_success, 0, 0, NULL); //傳送訊息 //send_message_to_mq(); //取訊息 //get_message_from_mq(); //等待執行緒退出 WaitForSingleObject(t1, INFINITE); WaitForSingleObject(t2, INFINITE); WaitForSingleObject(t3, INFINITE); WaitForSingleObject(t4, INFINITE); cout<<"Hello UseMQ end!"; return 0; };
執行驗證結果: