1. 程式人生 > >通過互斥鎖和條件變數實現佇列的無sleep生產與消費

通過互斥鎖和條件變數實現佇列的無sleep生產與消費

昨天轉了一篇文章將條件變數與互斥鎖的結合使用,今天給大家摳出曾經學到的一段高效的處理訊息的demo,通過互斥鎖實現安全佇列,通過條件變數實現佇列的工作與休息。

目錄結構:

簡單介紹:

SignalThread是封裝好的執行緒類,我們所做的核心工作都是在這裡完成,其他類也是服務於這裡;

MessageList是封裝好的安全執行緒類,基於c++ list實現的;

MessageHanderImp是處理訊息的方法,通過繼承MessageHander的介面,實現了方法OnMessage,其實只是printf了一下;

message是訊息的類,封裝了一下我們的訊息;

原始碼解析:

主函式很簡單,顯而易見SignalThread是我們實現的一個最主要的類,在這裡面封裝了執行緒完成了核心功能,主函式只要將它啟動即可,之後往封裝好的類裡面“push”資料,當然這個“push”也是我們封裝好的方法postMessage。MessageHanderImp是處理訊息的類,message是訊息的類。

int main(int argc, char** argv) {

    cout << "----start\n";
    SignalThread signalThread;
    signalThread.start();   //核心執行緒啟動
    
    cout << "----post\n";
    signalThread.postMessage(new MessageHanderImp(), new Message());//往執行緒裡面push訊息
    cout << "----post end\n";
    
    getchar();
    return 0;
}

核心程式碼:

下面是我們實現的核心類SignalThread,註釋寫的蠻清楚了,類的方法在標頭檔案中直接實現了,其中最重要的是MessageData* hander = m_queue.wait();這邊的wait在messageList裡面實現。

struct MessageData {
    MessageHander* pHander;
    Message*pMessage;
};

class SignalThread {
public:
    SignalThread() {
        pthread_cond_init(&cond, NULL);
        pthread_mutex_init(&mutex, NULL);//初始化鎖和條件變數
    }
    virtual ~SignalThread(){
        pthread_mutex_destroy(&mutex);
	    pthread_cond_destroy(&cond);
    }
    
    void start() {
        pthread_t t;
        
        pthread_mutex_lock(&mutex);
        pthread_create(&t, NULL, run, this);
        
        pthread_cond_wait(&cond, &mutex);//這邊等待執行緒開始時會停止等待
        pthread_mutex_unlock(&mutex);
    }
    void postMessage(MessageHander* hander, Message* message = NULL) { 
        
        MessageData * pessageData = new MessageData();
        pessageData->pHander = hander;
        pessageData->pMessage = message;
        m_queue.push_back(pessageData);//實現往佇列推送訊息
    }
    
private:
    void* _start(void * arg) {

        while(1){
            
            pthread_mutex_lock(&mutex);
            pthread_cond_signal(&cond);//這裡是解開主執行緒的wait
            pthread_mutex_unlock(&mutex);
            
            //要改成阻塞式的佇列
            MessageData* hander = m_queue.wait();//如果沒有訊息需要進行wait,釋放cpu
            if(hander){    //一旦有訊息及時處理
                printf("_start SignalThread OnMessage\n");
                hander->pHander->OnMessage(hander->pMessage);//這是一個處理訊息的方法
            }
        }
        return NULL;
    }
        
    static void *run(void* pths) {
        SignalThread* THIS = (SignalThread*)pths;
        return THIS->_start(pths);
    }
        
private:
    
    MessageList<MessageData*> m_queue;//這裡封裝了安全佇列,下面講解
    
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    
};

messageList實現:

這裡邊實現了模板,最主要就是說一下wait這個方法pthread_cond_wait(&cond, &mutex);實現了阻塞,當佇列是空的時候不進行輪詢浪費cpu而是wait,至於為什麼不用 if (m_msglist.empty()) 而是用 while (m_msglist.empty()) 是因為if (在某些情況下可能會出問題,特別是有多個執行緒在wait的時候)這裡推薦看一下這篇篇文章關於互斥鎖和條件變數的:            https://blog.csdn.net/FlayHigherGT/article/details/83790972

還有一個知識點為什麼要先執行pthread_cond_signal(&cond);再解鎖,這裡我就不多解釋了,也有一片文章給我們做了解釋:

https://blog.csdn.net/D_Guco/article/details/78308974?locationNum=1&fps=1

messageList部分程式碼:

value_type wait()
{
    lock();
    while (m_msglist.empty())
    {

//       LOGE("wait ~~~~~~~~~~~~~~~1");
         pthread_cond_wait(&cond, &mutex);//直到有訊號告知有資料來,才結束等待
    }
//  LOGE("wait ~~~~~~~~~~~~~~~2");
    value_type tmp = m_msglist.front();
    m_msglist.pop_front();
    unlock();
    return tmp;
}       

 bool push_back(value_type msg_data)
 {
     lock();
     m_msglist.push_back(msg_data);
     pthread_cond_signal(&cond);//推送完資料之後通知處理資料
     unlock();
     return true;
 }

github完整程式碼以及編譯方法:

git clone https://github.com/gt19930910/threadMessageDemo.git

cd threadMessageDemo

mkdir build

cd build

cmake ..

make

./threadmessage

編譯截圖: