【rabbitmq】解決SimpleAmqpClient建立連線時阻塞的問題
阿新 • • 發佈:2019-01-26
如果在單執行緒的程式使用SimpleAmqpClient-v2.4,在使用介面Channel::Create()連線到rabbitmq時,如果網路中斷或者ip埠地址不對的時候,程式就會一直阻塞在這個呼叫上,沒有返回值沒有異常提示,這種情況如果你想提示個錯誤什麼的就無能為力了,Panda工作中也遇到這個問題,我想:如果他能提供一個連線超時異常就好了,畢竟SimpleAmqpClient只是對另外一個c語言開源專案rabbitmq-c的封裝,而且我記得rabbitmq-c是支援我所說的功能的。下面請跟隨我一起一步一步完成這個事情吧。
第一步:確定SimpleAmqpClient不支援我要的功能
簡化了channel.h裡有關建立連線的關鍵程式碼
沒有發現一個引數可以設定超時,然後我上github問了一下庫的作者,證實確實是這樣了class Channel : boost::noncopyable { public: typedef boost::shared_ptr<Channel> ptr_t; // 建立連線 static ptr_t Create(const std::string &host = "127.0.0.1", int port = 5672, const std::string &username = "guest", const std::string &password = "guest", const std::string &vhost = "/", int frame_max = 131072); // 建立ssl連線 static ptr_t CreateSecure(const std::string &path_to_ca_cert="", const std::string &host = "127.0.0.1", const std::string &path_to_client_key="", const std::string &path_to_client_cert="", int port = 5671, const std::string &username = "guest", const std::string &password = "guest", const std::string &vhost = "/", int frame_max = 131072, bool verify_hostname = true); // 從uri建立連線 static ptr_t CreateFromUri(const std::string &uri, int frame_max = 131072); // 從uri建立ssl連線 static ptr_t CreateSecureFromUri(const std::string &uri, const std::string &path_to_ca_cert, const std::string &path_to_client_key="", const std::string &path_to_client_cert="", bool verify_hostname = true, int frame_max = 131072); );
這樣的話只能自己修改庫的程式碼了
第二步:找出關鍵呼叫
先來看一下Channel::Channel(…)
然後在rabbitmq-c專案標頭檔案amqp.h中找到建立非阻塞socket的函式
第三步:程式碼實現
有方向了,終於可以快樂的寫程式碼o(∩_∩)o 。根據設計模式的開閉原則:我們做的事情更好的是擴充套件而不是修改現有的功能,所以比較優雅的方案應該是增加一個工廠函式生成建立一個channel,做法如下:
在Channel.h增加兩個函式
/** * 以非阻塞的方法建立Channel * author: panxianzhan * @param timeout 最大等待事件,為NULL時採用阻塞方式開啟 */ explicit Channel(const std::string &host, int port, const std::string &username, const std::string &password, const std::string &vhost, int frame_max, timeval* ); /** * 工廠方法 * 以非阻塞的方法建立Channel * author: panxianzhan * @param timeout 最大等待事件,為NULL時採用阻塞方式開啟 */ static ptr_t CreateNoBlock(const std::string &host = "127.0.0.1", int port = 5672, const std::string &username = "guest", const std::string &password = "guest", const std::string &vhost = "/", int frame_max = 131072, timeval* timeout = NULL) { return boost::make_shared<Channel>(host, port, username, password, vhost, frame_max, timeout); }
然後在Channel.cpp實現
Channel::Channel(const std::string &host,
int port,
const std::string &username,
const std::string &password,
const std::string &vhost,
int frame_max,
timeval* timeout) :
m_impl(new Detail::ChannelImpl)
{
m_impl->m_connection = amqp_new_connection();
if (NULL == m_impl->m_connection)
{
throw std::bad_alloc();
}
try
{
amqp_socket_t *socket = amqp_tcp_socket_new(m_impl->m_connection);
int sock = amqp_socket_open_noblock(socket, host.c_str(), port, timeout);
}
//如果連線超時,下面這一行就會丟擲異常
m_impl->CheckForError(sock);
m_impl->DoLogin(username, password, vhost, frame_max);
}
catch (...)
{
amqp_destroy_connection(m_impl->m_connection);
throw;
}
m_impl->SetIsConnected(true);
}
這樣就大功告成了。使用例子如下:
int main()
{
timeval tv = {0};
tv.tv_usec = 200 * 1000; //等待200毫秒
try
{
Channel::ptr_t channel = Channel::CreateNoBlock(
"127.0.0.1", 5567,"guest", "guest", "/", 131072, &tv);
...
...
} catch (AmqpLibraryException& ex)
{
//提示連線失敗;
}
return 0;
}