通過互斥鎖和條件變數實現佇列的無sleep生產與消費
昨天轉了一篇文章將條件變數與互斥鎖的結合使用,今天給大家摳出曾經學到的一段高效的處理訊息的demo,通過互斥鎖實現安全佇列,通過條件變數實現佇列的工作與休息。
目錄結構:
簡單介紹:
SignalThread是封裝好的執行緒類,我們所做的核心工作都是在這裡完成,其他類也是服務於這裡;
MessageList是封裝好的安全執行緒類,基於c++ list實現的;
MessageHanderImp是處理訊息的方法,通過繼承MessageHander的介面,實現了方法OnMessage,其實只是printf了一下;
message是訊息的類,封裝了一下我們的訊息;
原始碼解析:
主函式很簡單,顯而易見SignalThread是我們實現的一個最主要的類,在這裡面封裝了執行緒完成了核心功能,主函式只要將它啟動即可,之後往封裝好的類裡面“push”資料,當然這個“push”也是我們封裝好的方法postMessage。MessageHanderImp是處理訊息的類,message是訊息的類。
int main(int argc, char** argv) { cout << "----start\n"; SignalThread signalThread; signalThread.start(); //核心執行緒啟動 cout << "----post\n"; signalThread.postMessage(new MessageHanderImp(), new Message());//往執行緒裡面push訊息 cout << "----post end\n"; getchar(); return 0; }
核心程式碼:
下面是我們實現的核心類SignalThread,註釋寫的蠻清楚了,類的方法在標頭檔案中直接實現了,其中最重要的是MessageData* hander = m_queue.wait();這邊的wait在messageList裡面實現。
struct MessageData { MessageHander* pHander; Message*pMessage; }; class SignalThread { public: SignalThread() { pthread_cond_init(&cond, NULL); pthread_mutex_init(&mutex, NULL);//初始化鎖和條件變數 } virtual ~SignalThread(){ pthread_mutex_destroy(&mutex); pthread_cond_destroy(&cond); } void start() { pthread_t t; pthread_mutex_lock(&mutex); pthread_create(&t, NULL, run, this); pthread_cond_wait(&cond, &mutex);//這邊等待執行緒開始時會停止等待 pthread_mutex_unlock(&mutex); } void postMessage(MessageHander* hander, Message* message = NULL) { MessageData * pessageData = new MessageData(); pessageData->pHander = hander; pessageData->pMessage = message; m_queue.push_back(pessageData);//實現往佇列推送訊息 } private: void* _start(void * arg) { while(1){ pthread_mutex_lock(&mutex); pthread_cond_signal(&cond);//這裡是解開主執行緒的wait pthread_mutex_unlock(&mutex); //要改成阻塞式的佇列 MessageData* hander = m_queue.wait();//如果沒有訊息需要進行wait,釋放cpu if(hander){ //一旦有訊息及時處理 printf("_start SignalThread OnMessage\n"); hander->pHander->OnMessage(hander->pMessage);//這是一個處理訊息的方法 } } return NULL; } static void *run(void* pths) { SignalThread* THIS = (SignalThread*)pths; return THIS->_start(pths); } private: MessageList<MessageData*> m_queue;//這裡封裝了安全佇列,下面講解 pthread_mutex_t mutex; pthread_cond_t cond; };
messageList實現:
這裡邊實現了模板,最主要就是說一下wait這個方法pthread_cond_wait(&cond, &mutex);實現了阻塞,當佇列是空的時候不進行輪詢浪費cpu而是wait,至於為什麼不用 if (m_msglist.empty()) 而是用 while (m_msglist.empty()) 是因為if (在某些情況下可能會出問題,特別是有多個執行緒在wait的時候)這裡推薦看一下這篇篇文章關於互斥鎖和條件變數的: https://blog.csdn.net/FlayHigherGT/article/details/83790972
還有一個知識點為什麼要先執行pthread_cond_signal(&cond);再解鎖,這裡我就不多解釋了,也有一片文章給我們做了解釋:
https://blog.csdn.net/D_Guco/article/details/78308974?locationNum=1&fps=1
messageList部分程式碼:
value_type wait()
{
lock();
while (m_msglist.empty())
{
// LOGE("wait ~~~~~~~~~~~~~~~1");
pthread_cond_wait(&cond, &mutex);//直到有訊號告知有資料來,才結束等待
}
// LOGE("wait ~~~~~~~~~~~~~~~2");
value_type tmp = m_msglist.front();
m_msglist.pop_front();
unlock();
return tmp;
}
bool push_back(value_type msg_data)
{
lock();
m_msglist.push_back(msg_data);
pthread_cond_signal(&cond);//推送完資料之後通知處理資料
unlock();
return true;
}
github完整程式碼以及編譯方法:
git clone https://github.com/gt19930910/threadMessageDemo.git
cd threadMessageDemo
mkdir build
cd build
cmake ..
make
./threadmessage
編譯截圖: