1. 程式人生 > >WebRTC原始碼探索之旅——多執行緒篇-4

WebRTC原始碼探索之旅——多執行緒篇-4

4 messagequeue

messagequeue.h/messagequeue.cc檔案是多路訊號分離器的重要組成部分。它實現了訊息一個完整地訊息佇列,該佇列包括立即執行訊息佇列、延遲執行訊息佇列和具有優先順序的訊息佇列。其中,talk_base::MessageQueue類也是talk_base::Thread類的基類。所以,所有的WebRTC的執行緒都是支援訊息佇列的。

4.1 talk_base::MessageQueueManager

talk_base::MessageQueueManager類是一個全域性單例類。這個類看似比較複雜,但是功能其實非常簡單——僅僅為了在所有的talk_base::MessagerQueue中刪除與指定的talk_base::MessageHandler相關的訊息。WebRTC的訊息佇列在傳送訊息的時候要指定訊息處理器(talk_base::MessageHandler)。如果某個訊息處理器被析構,那麼與之相關的所有訊息都將無法處理。所以,建立了這個全域性單例類來解決這個問題(見talk_base::MessageHandler解構函式)。

talk_base::MessageQueueManager的程式碼沒有涉及任何跨平臺的API呼叫,而且本身功能也非常簡單。所以我就不討論它如何使用std::vector管理talk_base::MessageQueue。唯一需要注意的就是talk_base::MessageQueueManager如何保證自己在第一個talk_base::Thread類例項化之前完成talk_base::MessageQueueManager全域性單例的例項化。這當中有個有趣的狀況,talk_base::MessageQueueManager在保證了自己必然在第一條子執行緒被建立之前自己被例項化,talk_base::MessageQueueManager:: Instance函式內部沒有使用任何鎖來保護talk_base::MessageQueueManager::instance_例項的建立。

如前面所說,talk_base::MessagerQueue是talk_base::Thread的基類。在建立talk_base::Thread時必然會呼叫talk_base::MessagerQueue的建構函式。在talk_base::MessagerQueue的建構函式中呼叫了talk_base::MessageQueueManager::Add函式,而該函式會使用talk_base::MessageQueueManager::Instance函式建立talk_base::MessagerQueueManager的例項。由於talk_base::ThreadManager保證了在建立第一個子執行緒之前,主執行緒會被包裝成talk_base::Thread物件,所以talk_base::MessageQueueManager必然可以將主執行緒作為第一個talk_base::MessageQueue物件納入管理。

以上的描述可能比較晦澀難懂,這是因為整個流程涉及到了talk_base::Thread和talk_base::ThreadManager等類。而這些都是尚未講解過他們的程式碼。不過即使看不明白也沒關係,我會在講解完所有相關類之後演示2段範例程式碼,並將範例程式碼的呼叫棧完全展開。看過範例程式碼後絕大多數讀者都應該能夠明白talk_base::MessageQueueManager的原理。

talk_base::MessageQueueManager還有最後一個問題,那就是它什麼時候被析構。talk_base::MessageQueue的解構函式會呼叫talk_base::MessageQueueManager::Remove函式,並且“理論上來說”在最後一個talk_base::MessageQueue從佇列中移除之後會析構talk_base::MessageQueueManager。既然,所有的執行緒都被移除,那就意味著talk_base::MessageQueueManager例項在被delete時重新回到了單執行緒的環境,所以也沒有任何鎖的保護。

4.2 MessageData

這一節的內容將包括talk_base::MessageData類以及多個它的子類和幾個工具函式。這些類和函式都很簡單,所以就不介紹程式碼和原理,僅僅羅列一下它們的功能。

4.2.1 talk_base::MessageData

定義了基類,並將解構函式定義為虛擬函式。

4.2.2 talk_base::TypedMessageData

使用模板定義的talk_base::MessageData的一個子類,便於擴充套件。

4.2.3 talk_base::ScopedMessageData

類似於talk_base::TypedMessageData,用於指標型別。在解構函式中,自動對該指標呼叫delete。

4.2.4 talk_base::ScopedRefMessageData

類似於talk_base::TypedMessageData,用於引用計數的指標型別。

4.2.5 talk_base::WrapMessageData函式

模板函式,便於建立talk_base::TypedMessageData

4.2.6 talk_base::UseMessageData函式

模板函式,用於將talk_base::TypedMessageData中的Data取出

4.2.7 talk_base::DisposeData

這是一個很特殊的訊息,用以將某個物件交給訊息引擎銷燬。可能的用途有2個:1. 有些函式不便在當前函式範圍內銷燬物件,見範例talk_base::HttpServer::Connection::~Connection;2.某物件屬於某一執行緒,因此銷燬操作應該交給所有者執行緒(未見範例)。WebRTC使用者不需要自行使用該類,呼叫talk_base::MessageQueue::Dispose函式即可使用它的功能。

以上7個類或函式的實現非常簡單,有C++使用經驗的讀者非常容易就能理解(標準庫中就有相似的類)。

4.3 Message

這一節將簡單介紹一下3個類:talk_base::Message、talk_base::DelayedMessage和talk_base::MessageList。

4.3.1 talk_base::Message

定義了訊息的基本資料結構。

4.3.2 talk_base::DelayedMessage

定義了延遲觸發訊息的資料結構。在talk_base::MessageQueue中,延遲訊息被存放在以talk_base::DelayedMessage::msTrigger_排序(talk_base::DelayedMessage類定義了operator<)的佇列中。如果2個延遲訊息的觸發時間相同,響應順序按先進先出原則。

這裡我將簡單介紹一下各個成員變數的用途:

cmsDelay_:延遲多久觸發訊息,僅作除錯使用

msTrigger_:觸發訊息的時間

num_:新增訊息的時間

msg_:訊息本身

在使用延遲訊息時,不需要自行構建talk_base::DelayedMessage例項。直接呼叫talk_base::MessageQueue::PostDelayed或者talk_base::MessageQueue::PostAt函式即可。

4.3.3 talk_base::MessageList

訊息列表,定義為std::list<talk_base::Message>

4.4 talk_base::MessageQueue

現在我們正式進入多執行緒篇最為激動人心的部分——多路訊號分離器的訊息佇列元件。WebRTC的多路型號分離器由2部分組成:訊息佇列和talk_base::SocketServer(主要實現就是talk_base::PhysicalSocketServer)。訊息佇列負責接受訊息,並使用訊息處理器(talk_base::MessageHandler的子類)處理訊息。在處理完所有訊息後,訊息佇列呼叫talk_base::SocketServer::Wait函式阻塞等待新的IO訊號。如果有新的訊息進入,訊息佇列會呼叫talk_base::SocketServer::WakeUp喚醒talk_base::SocketServer阻塞等待。這就是訊息佇列和talk_base::SocketServer協同工作的基本流程。

讓我們先來看一下,訊息佇列的實現。訊息佇列的主要功能是接收和處理訊息,並作為talk_base::Thread的基類出現。

它的主要部件(成員變數)包括:

ss_:協同工作的talk_base::SocketServer

default_ss_:預設的talk_base::SocketServer。如果在構造的時候提供talb_base::SocketServer,那麼訊息佇列就使用使用者提供的;如果沒有提供,訊息佇列就初始化一個talk_base::PhysicalSocketServer並儲存在default_ss_上,然後賦值給ss_。在訊息佇列析構的時候,如果default_ss_儲存有預設構造talk_base::PhysicalSocketServer,那就銷燬它

msgq_:訊息佇列,佇列內的訊息按照先進先出的原則立即執行

dmsgq_:延遲訊息佇列,佇列內的訊息按照指定的時間延遲執行

接著,我們來看一下talk_base::MessageQueue的主要成員函式:

talk_base::MessageQueue::Get:等待接收訊息。

引數說明:

pmsg:存放訊息的指標,用於返回訊息

cmsWait:等待的時間,kForever表示無限等待

process_io:是否要求talk_base::SocketServer處理IO訊號

talk_base::MessageQueue::PostDelayed:傳送一個延遲訊息(從當前時間計算延遲處理時間)

引數說明:

cmsDelay:延遲毫秒數

phandler:訊息處理器(talk_base::MessageHandler的子類)

id:訊息ID

pdata:MessageData指標

talk_base::MessageQueue::PostAt:傳送一個延遲訊息(直接指定延遲處理時間)

引數說明:

tstamp:訊息觸發的時間

phandler:訊息處理器(talk_base::MessageHandler的子類)

id:訊息ID

pdata:MessageData指標

talk_base::MessageQueue::Clear:通過指定talk_base::MessageHandler和訊息ID刪除訊息

引數說明:

phandler:指定被刪除訊息的talk_base::MessageHandler

id:指定被刪除訊息的ID;如果該id為MQID_ANY所有與phandler相關的訊息都將被刪除

removed:返回所有被刪除訊息的列表

talk_base::MessageQueue::GetDelay:從現在到下一條將要觸發的延遲訊息的觸發時間的毫秒數(無引數)

talk_base::MessageQueue::Dispose:請求訊息引擎刪除一個物件(delete物件指標)

引數說明:

doomed:將要被刪除的物件的指標

talk_base::MessageQueue::SignalQueueDestroyed(signal slot):通知接收者(observer)訊息佇列將要刪除(引數無)

talk_base::MessageQueue類的核心是talk_base::MessageQueue::Get函式。Get函式的主迴圈首先檢查dmsgq_和msgq_是否有立即需要處理的訊息。如果檢查到需要立即處理的訊息,就馬上將該訊息返回。如果沒有檢查到需要立即處理的訊息,那麼執行緒就阻塞等待在talk_base::SocketServer::Wait函式上。如果阻塞等待期間有新的訊息進入佇列或者執行緒需要停止退出,通過talk_base::SocketServer::WakeUp函式喚醒被阻塞的talk_base::SocketServer::Wait函式。從talk_base::SocketServer::Wait函式返回後,talk_base::MessageQueue::Get函式會重新檢查是否有可以返回的訊息,如果沒有則再次阻塞等待在talk_base::SocketServer::Wait函式上。以上流程如下圖所示:

talk_base::MessageQueue的原理大致如此。如果讀者結合程式碼依然無法理解,我會在多執行緒篇的最後給出幾段範例程式碼,並完整地展開所有函式的呼叫序列和呼叫棧。相信到時候大多數讀者都能夠理解WebRTC的執行緒架構是如何工作的。