1. 程式人生 > >【rabbitmq】解決SimpleAmqpClient建立連線時阻塞的問題

【rabbitmq】解決SimpleAmqpClient建立連線時阻塞的問題

如果在單執行緒的程式使用SimpleAmqpClient-v2.4,在使用介面Channel::Create()連線到rabbitmq時,如果網路中斷或者ip埠地址不對的時候,程式就會一直阻塞在這個呼叫上,沒有返回值沒有異常提示,這種情況如果你想提示個錯誤什麼的就無能為力了,Panda工作中也遇到這個問題,我想:如果他能提供一個連線超時異常就好了,畢竟SimpleAmqpClient只是對另外一個c語言開源專案rabbitmq-c的封裝,而且我記得rabbitmq-c是支援我所說的功能的。下面請跟隨我一起一步一步完成這個事情吧。

第一步:確定SimpleAmqpClient不支援我要的功能

簡化了channel.h裡有關建立連線的關鍵程式碼

class  Channel : boost::noncopyable
{
public:
    typedef boost::shared_ptr<Channel> ptr_t;

    // 建立連線
    static ptr_t Create(const std::string &host = "127.0.0.1",
                        int port = 5672,
                        const std::string &username = "guest",
                        const std::string &password = "guest",
                        const std::string &vhost = "/",
                        int frame_max = 131072);
    // 建立ssl連線
    static ptr_t CreateSecure(const std::string &path_to_ca_cert="",
                              const std::string &host = "127.0.0.1",
                              const std::string &path_to_client_key="",
                              const std::string &path_to_client_cert="",
                              int port = 5671,
                              const std::string &username = "guest",
                              const std::string &password = "guest",
                              const std::string &vhost = "/",
                              int frame_max = 131072,
                              bool verify_hostname = true);

    // 從uri建立連線
    static ptr_t CreateFromUri(const std::string &uri, int frame_max = 131072);

    // 從uri建立ssl連線
    static ptr_t CreateSecureFromUri(const std::string &uri,
                               const std::string &path_to_ca_cert,
                               const std::string &path_to_client_key="",
                               const std::string &path_to_client_cert="",
                               bool verify_hostname = true,
                               int frame_max = 131072);
);
沒有發現一個引數可以設定超時,然後我上github問了一下庫的作者,證實確實是這樣了 

這樣的話只能自己修改庫的程式碼了

第二步:找出關鍵呼叫

先來看一下Channel::Channel(…) 


然後在rabbitmq-c專案標頭檔案amqp.h中找到建立非阻塞socket的函式 


第三步:程式碼實現
有方向了,終於可以快樂的寫程式碼o(∩_∩)o 。根據設計模式的開閉原則:我們做的事情更好的是擴充套件而不是修改現有的功能,所以比較優雅的方案應該是增加一個工廠函式生成建立一個channel,做法如下: 
在Channel.h增加兩個函式

    /**
     * 以非阻塞的方法建立Channel
     * author: panxianzhan
     * @param timeout 最大等待事件,為NULL時採用阻塞方式開啟
     */
    explicit Channel(const std::string &host,
        int port,
        const std::string &username,
        const std::string &password,
        const std::string &vhost,
        int frame_max,
        timeval*
        );

    /**
     * 工廠方法
     * 以非阻塞的方法建立Channel
     * author: panxianzhan
     * @param timeout 最大等待事件,為NULL時採用阻塞方式開啟
     */
    static ptr_t CreateNoBlock(const std::string &host = "127.0.0.1",
        int port = 5672,
        const std::string &username = "guest",
        const std::string &password = "guest",
        const std::string &vhost = "/",
        int frame_max = 131072,
        timeval* timeout = NULL)
    {
        return boost::make_shared<Channel>(host, port, username, password, vhost, frame_max, timeout);
    }

然後在Channel.cpp實現
Channel::Channel(const std::string &host,
                 int port,
                 const std::string &username,
                 const std::string &password,
                 const std::string &vhost,
                 int frame_max,
                 timeval* timeout) :
    m_impl(new Detail::ChannelImpl)
{
    m_impl->m_connection = amqp_new_connection();

    if (NULL == m_impl->m_connection)
    {
        throw std::bad_alloc();
    }

    try
    {
        amqp_socket_t *socket = amqp_tcp_socket_new(m_impl->m_connection);
        int sock = amqp_socket_open_noblock(socket, host.c_str(), port, timeout);
        }

        //如果連線超時,下面這一行就會丟擲異常
        m_impl->CheckForError(sock);

        m_impl->DoLogin(username, password, vhost, frame_max);
    }
    catch (...)
    {
        amqp_destroy_connection(m_impl->m_connection);
        throw;
    }

    m_impl->SetIsConnected(true);
}
這樣就大功告成了。使用例子如下:
int main()
{
    timeval tv = {0};
    tv.tv_usec = 200 * 1000; //等待200毫秒
    try 
    {
        Channel::ptr_t channel = Channel::CreateNoBlock(
        "127.0.0.1", 5567,"guest", "guest", "/", 131072, &tv);
        ...
        ...
    } catch (AmqpLibraryException& ex)
    {
        //提示連線失敗;
    }
    return 0;
}