使用線程間通信之條件變量
阿新 • • 發佈:2017-07-01
cond cpp critical start mil 使用 out 輸入 eal
近期用C++寫安卓下的一個通訊程序。作為jni庫給java調用,採用多線程輪詢遇到一個問題描寫敘述例如以下:
A線程收到數據,放入隊列,是生產者。
B、C、D若幹個線輪詢訓消息隊列,假設隊列有數據就取出進行處理,沒數據就Sleep(T)歇息,問題是這個T值取多大合適?取大了消息處理不及時。取小了手機cpu上升電池非常快耗光。
各成員方法實現例如以下:
上面的代碼非常easy。差點兒不用解釋。
在windows和Linux下(替換成相應函數)均測試通過。
A線程收到數據,放入隊列,是生產者。
B、C、D若幹個線輪詢訓消息隊列,假設隊列有數據就取出進行處理,沒數據就Sleep(T)歇息,問題是這個T值取多大合適?取大了消息處理不及時。取小了手機cpu上升電池非常快耗光。
這個問題最佳解法是採用條件變量,能夠比較完美解決這個問題。下面代碼使用C++封裝,用win32 SDK的條件變量舉例,Linux下有全然等價的概念:
// 線程消息通知 class ThreadMsgNotify { // 條件變量和臨界變量 CONDITION_VARIABLE cv_; CRITICAL_SECTION cs_; public: ThreadMsgNotify(); ~ThreadMsgNotify(); int Wait(DWORD ms); // 消費者調用此函數,阻塞等待毫秒數 void Notify(); // 生產者調用此函數: 發出通知 };
各成員方法實現例如以下:
// --------------------------------------- ThreadMsgNotify::ThreadMsgNotify() { InitializeConditionVariable(&cv_); InitializeCriticalSection(&cs_); } ThreadMsgNotify::~ThreadMsgNotify() { WakeAllConditionVariable(&cv_); // 喚醒所有線程 Sleep(50); DeleteCriticalSection(&cs_); DeleteConditionVariable(&cv_); } int ThreadMsgNotify::Wait(DWORD ms) // 消費者,阻塞等待毫秒數 { EnterCriticalSection(&cs_); int ret = SleepConditionVariableCS(&cv_, &cs_, ms); // 等待 LeaveCriticalSection(&cs_); return(ret); } void ThreadMsgNotify::Notify() // 生產者: 發出通知 { EnterCriticalSection(&cs_); WakeConditionVariable(&cv_); // 喚醒一個等待線程(假設有的話) LeaveCriticalSection(&cs_); } // --------------
上面的代碼非常easy。差點兒不用解釋。
以下再給出測試代碼:
class TagThreadNotifyTest { private: list<string> msgList; bool isEnd = false; CRITICAL_SECTION cs_; public: int no; ThreadMsgNotify threadNotify; // 線程通知,接收隊列收到消息時通過該對象喚醒處理線程 TagThreadNotifyTest(); ~TagThreadNotifyTest(); void New(const char* info); // 生產者添加一條消息 int Recv(string& msg); // 消費者讀取一條消息,返回讀取前隊列中的消息數 // 消費者線程函數 static LRESULT WINAPI ProcThread(void* lParam); }; // 輔助函數。獲取當前時間戳 void CurTime(char* timeStr) { SYSTEMTIME st; GetLocalTime(&st); sprintf(timeStr, "%02d:%02d:%02d.%03d", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds); } TagThreadNotifyTest::TagThreadNotifyTest() { InitializeCriticalSection(&cs_); // 創建3個消費者線程 for(int i=0; i<3; i++){ no = i; DWORD dwThreadid; HANDLE thd = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ProcThread, (void*)this, 0, &dwThreadid); CloseHandle(thd); Sleep(30); } } TagThreadNotifyTest::~TagThreadNotifyTest() { isEnd = true; Sleep(100); DeleteCriticalSection(&cs_); } // 生產者將新消息放入隊列 void TagThreadNotifyTest::New(const char* info) { // 加鎖後插入隊列 EnterCriticalSection(&cs_); msgList.push_back(info); LeaveCriticalSection(&cs_); threadNotify.Notify(); // 通知其它線程,去處理數據 printf("notify...\n"); } // 消費者讀取消息。假設沒有將返回0 int TagThreadNotifyTest::Recv(string& msg) { EnterCriticalSection(&cs_); int n = msgList.size(); if( n>0 ){ msg = msgList.front(); msgList.pop_front(); } LeaveCriticalSection(&cs_); return(n); } // 消費者線程 LRESULT WINAPI TagThreadNotifyTest::ProcThread(void* lParam) { TagThreadNotifyTest * test = (TagThreadNotifyTest *)lParam; int no = test->no; printf("Thread start, no=%d...\n", no); char timeStr[80]; while( !test->isEnd ){ string msg; int ret = test->Recv(msg); // 讀取一條消息 CurTime(timeStr); if( ret ){ // 假設有就打印出來 printf(" [%d %s]Recv: %s\n", no, timeStr, msg.c_str()); Sleep(1000); // 延時1秒模擬處理較慢的情況 continue; } else{ // 沒有收到 printf(" [%d %s]...\n", no, timeStr); } // 歇息15秒,假設有通知則會隨時結束歇息 test->threadNotify.Wait(15000); } printf("Thread End : no=%d.\n", no); return(1); } int main() { // 控制臺測試程序 // new一個測試對象,此對象會創建3個消費者線程 TagThreadNotifyTest* test = new TagThreadNotifyTest(); // 作為生產者線程。就是接收你的按鍵,回車後產生一條消息 while(true){ char s[500]; memset(s, 0, 500); gets(s); if( strcmp(s, "exit")==0 ){ break; } if( s[0]=='\0' ) continue; // 提交消息 test->New(s); } delete test; return(0); }
在windows和Linux下(替換成相應函數)均測試通過。
輸入一個回車後,消費者線程將馬上取到消息並打印出來。假設沒有消息。則消費者線程等待15秒。CPU非常輕松。
使用線程間通信之條件變量