1. 程式人生 > >RabbitMQ C++客戶端 RabbitMQ Client for Windows 超簡單接收發送介面(多執行緒版)-最近更新2018-05-21

RabbitMQ C++客戶端 RabbitMQ Client for Windows 超簡單接收發送介面(多執行緒版)-最近更新2018-05-21

該庫特點:

該介面本身不支援多執行緒,也就是說多個執行緒不能同時訪問一個介面物件。

支援多執行緒:

但是,由於該介面是值語義,所以在多執行緒場景中使用沒有任何問題。

下面的例子就展示了多執行緒環境下程式的Demo和結果。

要完成的功能:

1 A執行緒傳送0-99這一百個整數到RabbitMQ佇列;

2 同時B執行緒傳送0-99這一百個整數到RabbitMQ佇列;

3 C執行緒(一秒鐘之後)一次性取回來A B傳送的所有資料;

4 D執行緒(兩秒之後)檢查C執行緒取回的資料是否完整

如有需要:掃碼我的部落格頭像加我即可(還是要先說明一下,該資源時付費資源)

#include <iostream>
#include <string>
#include <vector>
#include "MessageQueue/RabbitMQ.h"
#include "MessageQueue/MessageBody.h"
#include <ctime>
#include <set>
//#include <vld.h>
using namespace std;
#include <Windows.h>

//--------------------------需要開發者自己填寫的資訊begin--------------------------------------
//服務端的一個Exchange,客戶端傳送的時候,如果沒有則會建立;客戶端取的時候,如果沒有則取不到資料
CExchange my_test_exchange("MyTestExchange");
//服務端的一個訊息佇列,客戶端傳送的時候,如果沒有則會建立;客戶端取的時候,如果沒有則取不到資料
CQueue my_test_queue("MyTestQueue");
//服務端Exchange與訊息佇列之間的繫結key,客戶端傳送的時候,如果沒有則不能傳送;客戶端取的時候,如果沒有則取不到資料
string my_test_bind_key("MyTestBindKey");
//RabbitMQ服務端IP
string rabbitmq_server_ip = "localhost";
//RabbitMQ服務端埠
int rabbitmq_server_port = 5672;
//RabbitMQ服務端登入使用者名稱
string rabbitmq_server_user = "guest";
//RabbitMQ服務端登入密碼
string rabbitmq_server_psw = "guest";
//--------------------------需要開發者自己填寫的資訊end--------------------------------------


//--------------------------建議開發者設定批量操作begin--------------------------------------
//建議將此值設定成500,或者1000.這樣可以節省時間和網路資源。沒有必要一條一條的傳送或者取
int test_send_number = 100;
int test_get_number = 200;
//--------------------------建議開發者設定批量操作end--------------------------------------
//引用全域性變數
std::multiset<int> MySet;
//傳送資料到RabbitMQ服務的全部過程如下:
//!!!每次publish之前必須呼叫Connect,publish之後呼叫Disconnect!!!
DWORD WINAPI  send_message_to_mq(LPVOID lpParameter)
{
	vector<CMessage> message_array;
	CMessage message("");
	//製作test_send_number個訊息,用於傳送到RabbitMQ伺服器
	for (int i=0;i<test_send_number;++i)
	{
		//這裡建立你要傳送的資料,放到CMessage的m_data成員變數中,從服務端取的時候也是一樣從這個成員中取string
		message.m_data = std::to_string((long long)i);
		message_array.push_back(message);
	}
	string& err = GetErr();    
	//RabbitMQ伺服器的連線資訊,這裡改成你自己的IP/PORT
	CRabbitMQ pro(rabbitmq_server_ip,rabbitmq_server_port,rabbitmq_server_user,rabbitmq_server_psw);

	if(pro.Connect(err)<0)
	{
		cout<<"連線失敗!"<<endl;
		return -1;
	}
	else
	{
		cout<<"連線成功!"<<endl;
	}

	//宣告一個交換機,宣告一次,下次不需要再次宣告
	if(pro.exchange_declare(my_test_exchange, err) < 0)
	{
		cout<<"宣告交換機失敗!"<<endl;
		return -1;
	}
	else
	{
		cout<<"宣告交換機成功!"<<endl;
	}
	//宣告一個佇列,宣告一次,下次不需要再次宣告
	if( (pro.queue_declare(my_test_queue, err) < 0) ) 
	{
		cout<<"宣告佇列失敗!"<<endl;
		return -1;
	}
	else
	{
		cout<<"宣告佇列成功!"<<endl;
	}
	//將交換機繫結到佇列, 繫結一次,後面不需要再繫結
	if( (pro.queue_bind(my_test_queue,my_test_exchange,my_test_bind_key, err)<0) ) 
	{
		cout<<"繫結佇列失敗!"<<endl;
		return -1;
	}
	else
	{
		cout<<"繫結佇列成功!"<<endl;
	}
	string my_test_root_key=my_test_bind_key;
	//傳送資料到RabbitMQ伺服器,可以反覆呼叫從而實現連續傳送資料
	int flag = pro.publish(message_array,my_test_root_key,err);

	if(flag<0)
	{
		cout<<"投遞訊息到MQ失敗"<<endl;
	}
	else
	{
		cout<<"投遞訊息到MQ成功:"<<endl;
		for (vector<CMessage>::iterator itr = message_array.begin(); itr != message_array.end(); ++itr)
		{
			cout<<(*itr).m_data<<" ";
		}
		cout<<endl;
	}
	pro.__sleep(10);
	pro.Disconnect();

	return 0;
}
//從RabbitMQ取資料的全部過程如下
//!!!每次consumer之前必須呼叫Connect,publish之後呼叫Disconnect!!!
DWORD WINAPI   get_message_from_mq(LPVOID lpParameter)
{
	vector<CMessage> message_array;
	string& err = GetErr();    

	//RabbitMQ伺服器的連線資訊,這裡改成你自己的IP/PORT
	CRabbitMQ pro(rabbitmq_server_ip,rabbitmq_server_port,rabbitmq_server_user,rabbitmq_server_psw);
	pro.__sleep(1000);//等待生產者傳送完畢再取
	if(pro.Connect(err)<0)
	{
		cout<<"取訊息時連線失敗!"<<endl;
		return -1;
	}
	else
	{
		cout<<"取訊息時連線成功!"<<endl;
	}
	message_array.clear();
	//一次性取1000個訊息,不足1000也無妨,有多少取到多少
	int get_number= test_get_number;
	::timeval tvb={0,10};
	//從RabbitMQ伺服器取訊息
	if(pro.consumer(my_test_queue,message_array,get_number,&tvb,err)<0)
	{
		cout<<"取訊息失敗!"<<endl;
	}
	else
	{
		cout<<"取訊息成功!取到了"<<message_array.size()<<"個訊息:"<<endl;
		for (int i=0;i<message_array.size();i++)
		{
			cout<<message_array[i].m_data<<" ";
			MySet.insert(std::stoi(message_array[i].m_data));
		}
		cout<<endl;
	}
	pro.__sleep(10);
	pro.Disconnect();
}
DWORD WINAPI   test_assert_success(LPVOID lpParameter)
{
	CRabbitMQ pro;
	pro.__sleep(2000);
	cout<<"全部資料(驗證是否丟失):"<<endl;
	for (auto itr = MySet.begin(); itr != MySet.end(); ++itr)
	{
		cout<<*itr<<" ";
	}
	cout<<endl;
	return 0;
}
int main(int ,char**)
{
	cout<<"Hello UseMQ!"<<endl;

	//建立執行緒  
	HANDLE t1 = CreateThread(NULL, 0, send_message_to_mq, 0, 0, NULL);  
	HANDLE t2 = CreateThread(NULL, 0, send_message_to_mq, 0, 0, NULL); 

	HANDLE t3 = CreateThread(NULL, 0, get_message_from_mq, 0, 0, NULL); 

	HANDLE t4 = CreateThread(NULL, 0, test_assert_success, 0, 0, NULL); 
	//傳送訊息
	//send_message_to_mq();
	//取訊息
	//get_message_from_mq();

	//等待執行緒退出  
	WaitForSingleObject(t1, INFINITE);  
	WaitForSingleObject(t2, INFINITE);  
	WaitForSingleObject(t3, INFINITE);  
	WaitForSingleObject(t4, INFINITE);  

	cout<<"Hello UseMQ end!";
	return 0;
};

執行驗證結果: