1. 程式人生 > >C++網路程式設計實戰專案--Sinetlib網路庫(3)——事件迴圈與跨執行緒呼叫

C++網路程式設計實戰專案--Sinetlib網路庫(3)——事件迴圈與跨執行緒呼叫

上一篇文章講了Reactor模式的關鍵結構I/O複用和事件分發,現在我們來關注一下它們的使用。

事件迴圈

我們已經實現了一個Epoller類來實現I/O複用,具體的使用方法就是Epoller::Poll()函式等待事件的發生,該函式有一個超時時間,超過這個時間即使沒有事件發生也會返回,那麼我們如何讓它一直工作呢?很明顯就是使用while迴圈。 在這裡插入圖片描述

一個事件迴圈的大概邏輯如上圖,就是迴圈反覆地呼叫Poll(),現在我們抽象出一個類Looper來管理這個迴圈。

下面是部分Looper的程式碼,在這個類中我們持有唯一的一個Epoller指標,通過AddEventBase()等函式往Epoller上註冊或修改事件,注意,我們傳入的事件是經過包裝後的EventBase類,裡面已經有了事件處理函數了。我們可以通過Start()開始事件迴圈,所有的處理都在該迴圈內完成。

class Looper
{
public:
    Looper();
    ~Looper();
    
    // 開始事件迴圈
    void Start();

    // 註冊事件
    void AddEventBase(std::shared_ptr<EventBase> eventbase) { epoller_->Add(eventbase); }
    void ModEventBase(std::shared_ptr<EventBase> eventbase) { epoller_->Mod(eventbase); }
    void
DelEventBase(std::shared_ptr<EventBase> eventbase) { epoller_->Del(eventbase); } private: // 底層的I/O複用類 std::unique_ptr<Epoller> epoller_; };

執行緒間呼叫

一個執行緒只能有一個Looper。現在讓我們考慮一下多個執行緒下Looper之間如何分配任務,為後面打下基礎。 在這裡插入圖片描述

如上,現在這個程序中有三個執行緒,他們共享了程序的地址空間,每個執行緒各有一個事件迴圈Looper,如何實現在Looper_1上讓Looper_2執行某個函式呢? 在這裡插入圖片描述

這裡我們可以在每個事件迴圈中都增加一個任務佇列,並暴露出相應的介面,Looper_1通過Looper_2的介面往後者的任務佇列裡投放任務,Looper_2再從中取出執行即可,為此這裡需要修改一下事件迴圈的邏輯。 在這裡插入圖片描述

在每次發生事件後,處理完事件後不再是直接等待下一次事件發生,而是先檢查任務佇列是否有任務,處理完任務佇列後再繼續等待,這樣就可以實現執行緒間的任務分配了。

但是這裡又出現了一個問題,就是Looper_1把任務丟到Looper_2的任務佇列後,Looper_2並不能及時的執行該任務,它只有在發生事件或者Poll()超時之後才能得到機會執行,顯然這是無法容忍的。解決問題的辦法就是在Looper_2上註冊一個用來喚醒的檔案描述符,在把任務放入Looper_2的任務佇列後,往該檔案描述符上寫一個字元,那麼Looper_2就會監測到該檔案描述符發生了可讀事件,也就從Poll()返回,開始處理這個可讀事件和任務隊列了。

下面就是關於執行緒間任務分配和喚醒處理的相關介面,其他執行緒通過AddTask()把任務交給該Looper的任務佇列,然後可通過WakeUp()喚醒該Looper去處理任務。因為任務佇列可由多個執行緒訪問,所有需要加上互斥鎖進行保護。

class Looper
{
public:
    using Task = std::function<void()>;
    
    // 喚醒迴圈以處理Task,配合Handle使用
    void WakeUp();
    void HandleWakeUp();

    void RunTask(Task&& task);
    void AddTask(Task&& task);
    void HandleTask();

private:
    // 用來喚醒的描述符以及關注該描述符的事件基類
    int wakeup_fd_;
    std::shared_ptr<EventBase> wakeup_eventbase_;

    // 任務佇列以及保護該佇列的互斥鎖
    std::mutex mutex_;
    std::vector<Task> task_queue_;
};

定時器和定時任務

有的時候我們需要在一段時間後在Looper上執行某個任務,而不是立刻執行,這時就需要定時器了。

首先看Timer,我們稱之為定時器,裡面封裝了發生的時間以及對應要執行的函式。

有了多個定時器,就需要一個TimerQueue佇列來管理定時器,管理的策略也很簡單,即一個優先佇列,發生時間先的定時器排在前面: 在這裡插入圖片描述

要將定時器整合到Looper上,我們使用的方法是把定時器當做一個事件,具體來說就是申請一個定時器檔案描述符,我們可以設定該檔案描述符的超時時間為定時器佇列隊頭的時間,當到達這個時間時該檔案描述符上就會發生可讀事件,這時我們就從定時器佇列上把隊頭的定時器執行掉,接著更新該檔案描述符的超時時間為新的隊頭的時間,然後等待下一次超時,如此迴圈反覆。

在Looper上我們暴露出來一個介面,傳入一個任務和一段時間間隔,Looper會把該任務變成一個定時器放入定時器佇列中,並檢查是否需要更新定時器檔案描述符的超時時間。當該定時器時間到期後便會執行該任務。

void RunTaskAfter(Task&& task, Nanosecond interval);

關於時間,這裡我使用的是c++11的std::chrono類,定義在base/timestamp.h,具體的使用讀者可以去查詢相關資料,這裡不做贅述。