1. 程式人生 > >使用線程間通信之條件變量

使用線程間通信之條件變量

cond cpp critical start mil 使用 out 輸入 eal

近期用C++寫安卓下的一個通訊程序。作為jni庫給java調用,採用多線程輪詢遇到一個問題描寫敘述例如以下:
A線程收到數據,放入隊列,是生產者。
B、C、D若幹個線輪詢訓消息隊列,假設隊列有數據就取出進行處理,沒數據就Sleep(T)歇息,問題是這個T值取多大合適?取大了消息處理不及時。取小了手機cpu上升電池非常快耗光。



這個問題最佳解法是採用條件變量,能夠比較完美解決這個問題。下面代碼使用C++封裝,用win32 SDK的條件變量舉例,Linux下有全然等價的概念:

// 線程消息通知
class ThreadMsgNotify
{
   // 條件變量和臨界變量
   CONDITION_VARIABLE cv_;
   CRITICAL_SECTION cs_;
public:
   ThreadMsgNotify();
   ~ThreadMsgNotify();

   int Wait(DWORD ms);  // 消費者調用此函數,阻塞等待毫秒數
   void Notify();  //  生產者調用此函數: 發出通知
};

各成員方法實現例如以下:

// ---------------------------------------
ThreadMsgNotify::ThreadMsgNotify()
{
   InitializeConditionVariable(&cv_);
   InitializeCriticalSection(&cs_);
}

ThreadMsgNotify::~ThreadMsgNotify()
{
   WakeAllConditionVariable(&cv_);  // 喚醒所有線程
   Sleep(50);

   DeleteCriticalSection(&cs_);
   DeleteConditionVariable(&cv_);
}

int ThreadMsgNotify::Wait(DWORD ms)  // 消費者,阻塞等待毫秒數
{
   EnterCriticalSection(&cs_);
   int ret = SleepConditionVariableCS(&cv_, &cs_, ms);  // 等待
   LeaveCriticalSection(&cs_);
   return(ret);
}

void ThreadMsgNotify::Notify()  // 生產者: 發出通知
{
   EnterCriticalSection(&cs_);
   WakeConditionVariable(&cv_);  // 喚醒一個等待線程(假設有的話)
   LeaveCriticalSection(&cs_);
}
// --------------

上面的代碼非常easy。差點兒不用解釋。

以下再給出測試代碼:

class TagThreadNotifyTest
{
  private:
     list<string> msgList;
     bool isEnd = false;
     CRITICAL_SECTION cs_;
  public:
     int no;
     ThreadMsgNotify threadNotify;  // 線程通知,接收隊列收到消息時通過該對象喚醒處理線程

     TagThreadNotifyTest();
     ~TagThreadNotifyTest();

     void New(const char* info);  // 生產者添加一條消息
     int Recv(string& msg);  // 消費者讀取一條消息,返回讀取前隊列中的消息數

     // 消費者線程函數
     static LRESULT WINAPI ProcThread(void* lParam);
};

// 輔助函數。獲取當前時間戳
void CurTime(char* timeStr)
{
   SYSTEMTIME st;
   GetLocalTime(&st);
   sprintf(timeStr, "%02d:%02d:%02d.%03d", st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
}

TagThreadNotifyTest::TagThreadNotifyTest()
{
  InitializeCriticalSection(&cs_);

  // 創建3個消費者線程
  for(int i=0; i<3; i++){
     no = i;
     DWORD dwThreadid;
     HANDLE thd = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ProcThread,
                (void*)this, 0, &dwThreadid);
     CloseHandle(thd);
     Sleep(30);
  }
}

TagThreadNotifyTest::~TagThreadNotifyTest()
{
  isEnd = true;
  Sleep(100);
  DeleteCriticalSection(&cs_);
}

// 生產者將新消息放入隊列
void TagThreadNotifyTest::New(const char* info)
{
   // 加鎖後插入隊列
   EnterCriticalSection(&cs_);
      msgList.push_back(info);
   LeaveCriticalSection(&cs_);

   threadNotify.Notify();  // 通知其它線程,去處理數據

   printf("notify...\n");
}

// 消費者讀取消息。假設沒有將返回0
int TagThreadNotifyTest::Recv(string& msg)
{
   EnterCriticalSection(&cs_);
      int n = msgList.size();
      if( n>0 ){
          msg = msgList.front();
          msgList.pop_front();
      }
   LeaveCriticalSection(&cs_);
   return(n);
}

// 消費者線程
LRESULT WINAPI TagThreadNotifyTest::ProcThread(void* lParam)
{
  TagThreadNotifyTest * test = (TagThreadNotifyTest *)lParam;
  int no = test->no;
  printf("Thread start,  no=%d...\n", no);
  char timeStr[80];
  while( !test->isEnd ){
     string msg;
     int ret = test->Recv(msg);  // 讀取一條消息
     CurTime(timeStr);
     if( ret ){  // 假設有就打印出來
        printf(" [%d %s]Recv: %s\n", no, timeStr, msg.c_str());
        Sleep(1000);  // 延時1秒模擬處理較慢的情況
        continue;
     }
     else{  // 沒有收到
        printf(" [%d %s]...\n", no, timeStr);
     }
 
     // 歇息15秒,假設有通知則會隨時結束歇息
     test->threadNotify.Wait(15000);
  }

  printf("Thread End : no=%d.\n", no);
  return(1);
}

int main()
{
   // 控制臺測試程序

   // new一個測試對象,此對象會創建3個消費者線程
   TagThreadNotifyTest* test = new TagThreadNotifyTest();

   // 作為生產者線程。就是接收你的按鍵,回車後產生一條消息
   while(true){
       char s[500];
       memset(s, 0, 500);
       gets(s);
       if( strcmp(s, "exit")==0 ){
           break;
       }
       if( s[0]=='\0' )
           continue;

       // 提交消息
       test->New(s);
   }

   delete test;
   return(0);
}

在windows和Linux下(替換成相應函數)均測試通過。

輸入一個回車後,消費者線程將馬上取到消息並打印出來。假設沒有消息。則消費者線程等待15秒。CPU非常輕松。


使用線程間通信之條件變量