windows C++多執行緒程式設計高階篇 實現執行緒同步
上一篇文章windows程式設計 使用C++實現多執行緒類僅僅是介紹了怎樣用類來實現多執行緒,這篇文章則重點介紹多執行緒中資料同步的問題。好了,廢話不多說,進入主題。
問題場景:這裡我們假設有這樣一個工作流水線(CWorkPipeline),它不斷的生成一個SquareInfo的物件,這個物件包含x和y座標,同時包括一個未得到結果的平方和(squareSum),這些流水線類似於現實世界的工廠不斷產出產品(SquareInfo物件),很多個這樣的流水線把產生的SquareInfo物件彙集給處理中心。
處理中心(CProcessCenter)是一個多執行緒類,它不斷接收流水線上的資料,並計算每個SquareInfo物件的squareSum,同時把對應的資訊打印出來。
我們看CWorkPipeline的定義:
/************************************************************************/ /* FileName: WorkPipeline.h * Date: 2015-5-13 * Author: huangtianba * Description: 模擬工作流水線類 */ /************************************************************************/ #pragma once #include <Windows.h> #include <functional> struct SquareInfo { int x; int y; int squareSum; }; typedef std::tr1::function<void (const SquareInfo &squareInfo)> FoundItemFn; class CWorkPipeline { public: CWorkPipeline(); ~CWorkPipeline(); bool Init(const FoundItemFn &foundItemFn); bool UnInit(); static DWORD CALLBACK WorkThread(LPVOID lpParam); DWORD WorkProc(); private: HANDLE m_hThread; FoundItemFn m_foundItemFn; };
這裡需要注意的是Init()函式接受一個FoundItemFn的函式物件,這個物件是CProcessCenter出給它的回撥,用來接收流水線產生的SquareInfo物件。
下面是CWorkPipeline類各個函式的實現:
/************************************************************************/ /* FileName: WorkPipeline.cpp * Date: 2015-5-14 * Author: chenzba * Description: */ /************************************************************************/ #include "stdafx.h" #include "WorkPipeline.h" #include <time.h> CWorkPipeline::CWorkPipeline(): m_hThread(NULL) { } CWorkPipeline::~CWorkPipeline() { } bool CWorkPipeline::Init(const FoundItemFn &foundItemFn) { srand(unsigned int(time(NULL))); // 建立執行緒 m_hThread = CreateThread(NULL, 0, &CWorkPipeline::WorkThread, this, CREATE_SUSPENDED, NULL); if (NULL == m_hThread) { return false; } m_foundItemFn = foundItemFn; ResumeThread(m_hThread); return true; } bool CWorkPipeline::UnInit() { if (m_hThread != NULL) { CloseHandle(m_hThread); m_hThread = NULL; } m_foundItemFn = FoundItemFn(); return true; } DWORD CALLBACK CWorkPipeline::WorkThread(LPVOID lpParam) { if (NULL == lpParam) { return 0; } CWorkPipeline *lpThis = reinterpret_cast<CWorkPipeline *> (lpParam); return lpThis->WorkProc(); } // 執行緒處理函式 DWORD CWorkPipeline::WorkProc() { while (true) { // 宣告一個SquareInfo物件,給x和y隨機賦值 SquareInfo squareInfo = {}; squareInfo.x = rand() % 10000; squareInfo.y = rand() % 10000; // 將squreInfo通知給回撥 m_foundItemFn(squareInfo); // Sleep一段時間 Sleep(max(20, rand() % 2000)); } return 0; }
每個流水線物件都有一個執行緒,執行緒的工作就是沒隔一個不確定的時間產生一個SquareInfo物件,並把它拋給外面(CProcessCenter類物件)。這段程式碼就是m_foundItemFn(squareInfo)。
這裡值得注意的一點是:UnInit()函式中必須將m_foundItemFn物件置為空物件,假如不這樣做,如果CProcessCenter物件已經從棧釋放,而流水線執行緒還沒退出,繼續呼叫m_foundItemFn(squareInfo)將導致不可預料的錯誤。
下面看看CProcessCenter類的定義:
/************************************************************************/
/* FileName: ProcessCenter.h
* Date: 2015-5-14
* Author: chenzba
* Description: 資料處理彙集中心,處理多個執行緒彙集過來的資料
*/
/************************************************************************/
#pragma once
#include <atlbase.h>
#include <atlsync.h>
#include <vector>
#include <list>
struct SquareInfo;
class CWorkPipeline;
class CProcessCenter
{
public:
typedef ATL::CComAutoCriticalSection DataLock;
public:
CProcessCenter();
~CProcessCenter();
bool Init();
bool UnInit();
void DoWork();
static DWORD CALLBACK ProcessThread(LPVOID lpParam);
DWORD ProcessProc();
//
// 被CWorkPileline回撥的函式
//
void NotifyFoundItem(const SquareInfo &squareInfo);
//
// 通知執行緒退出
//
void NotifyExit();
void ProcessData();
private:
std::vector<CWorkPipeline *> m_workLineList; // 工作流水線執行緒列表
std::list<SquareInfo> m_dataList; // 流水線產生的資料列表
DataLock m_dataLock; // m_dataList資料鎖
HANDLE m_hThread;
ATL::CEvent m_pipeEvent; // 通知處理資料事件
bool m_isExit;
};
首先我們看Init()和UnInit()函式的實現:
/************************************************************************/
/* FileName: ProcessCenter.cpp
* Date: 2015-5-14
* Author: chenzba
* Description: The implement of CProcessCenter class.
*/
/************************************************************************/
#include "stdafx.h"
#include "ProcessCenter.h"
#include "WorkPipeline.h"
#include <functional>
#include <iostream>
#include "../boost/tr1/functional.hpp"
CProcessCenter::CProcessCenter(): m_hThread(NULL), m_isExit(false)
{
}
CProcessCenter::~CProcessCenter()
{
}
bool CProcessCenter::Init()
{
// 建立事件
BOOL created = m_pipeEvent.Create(NULL, TRUE, FALSE, NULL);
if (!created) {
return false;
}
m_hThread = CreateThread(NULL, 0, &CProcessCenter::ProcessThread, this, CREATE_SUSPENDED, NULL);
if (NULL == m_hThread)
{
UnInit();
return false;
}
// 建立10個工作流水線
for (int i = 0; i < 10; i++)
{
CWorkPipeline *pipeLine = new(std::nothrow) CWorkPipeline();
if (pipeLine != NULL)
{
pipeLine->Init(std::tr1::bind(&CProcessCenter::NotifyFoundItem, this, std::tr1::placeholders::_1));
m_workLineList.push_back(pipeLine);
}
}
m_isExit = false;
ResumeThread(m_hThread);
return true;
}
bool CProcessCenter::UnInit()
{
// 釋放流水線資源
std::vector<CWorkPipeline *>::iterator it;
for (it = m_workLineList.begin(); it != m_workLineList.end(); it++)
{
if ((*it) != NULL)
{
(*it)->UnInit();
delete (*it);
(*it) = NULL;
}
}
m_workLineList.clear();
if (m_pipeEvent != NULL) {
m_pipeEvent.Set();
}
if (m_hThread != NULL)
{
WaitForSingleObject(m_hThread, 100);
CloseHandle(m_hThread);
m_hThread = NULL;
}
m_pipeEvent.Close();
m_isExit = true;
return true;
}
這裡模擬資料處理中心擁有10個工作流水線,將它們分配在堆上,並用一個vector的指標列表存放。注意事件物件和執行緒的建立順序,ResumeThread要放到最後,這意味著當所有資源建立完成後才啟動執行緒。
要說明一點,
std::tr1::bind(&CProcessCenter::NotifyFoundItem, this, std::tr1::placeholders::_1)
使用bind方法產生一個函式物件,有些人編譯的時候可能找不到bind函式。如果編譯環境支援c++11(vs2012以上),bind函式屬於c++標準庫,直接在std::中就能找到;但是在vs2008或者一下,不支援c++11的編譯環境(也就是我現在寫程式碼的環境),bind函式是屬於boost庫的函式。Boost庫可以跟C++標準庫完美共同工作,並且為其提供擴充套件功能,因此,這裡需要引用Boost庫,相關庫大家可以到網上了解相關資訊。
下面我們繼續看NotifyFoundItem函式的實現:
void CProcessCenter::NotifyFoundItem(const SquareInfo &squareInfo)
{
// 資料同步 加鎖
CCritSecLock locker(m_dataLock.m_sec);
m_dataList.push_back(squareInfo);
m_pipeEvent.Set();
}
我們回到CWorkPipeline中的執行緒函式,m_foundItemFn(squareInfo),這句程式碼就進入NotifyFoundItem函式,在這裡找個函式被10執行緒呼叫,m_dataList是共享資料,這裡給m_dataList加鎖。下面繼續,看看加鎖的原因。
DWORD CALLBACK CProcessCenter::ProcessThread(LPVOID lpParam)
{
if (NULL == lpParam) {
return 0;
}
CProcessCenter *lpThis = reinterpret_cast<CProcessCenter *> (lpParam);
return lpThis->ProcessProc();
}
DWORD CProcessCenter::ProcessProc()
{
while (true)
{
DWORD dwResult = WaitForSingleObject(m_pipeEvent, 2000);
if (dwResult == WAIT_TIMEOUT && (!m_isExit)) {
continue;
}
// 處理資料
ProcessData();
if (m_isExit) {
break;
}
}
return 0;
}
void CProcessCenter::ProcessData()
{
std::list<SquareInfo> tempList;
m_dataLock.Lock();
tempList = m_dataList;
m_dataList.clear();
m_pipeEvent.Reset();
m_dataLock.Unlock();
int counter = 0;
std::list<SquareInfo>::iterator it;
for (it = tempList.begin(); it != tempList.end(); it++)
{
counter++;
(*it).squareSum = (*it).x * (*it).x + (*it).y * (*it).y;
std::cout <<"x: " <<(*it).x <<" y: " <<(*it).y <<" square sum: " <<(*it).squareSum <<std::endl;
}
std::cout <<"-------------------------------------------------------" <<counter <<std::endl;
}
void CProcessCenter::NotifyExit()
{
m_isExit = true;
}
void CProcessCenter::DoWork()
{
Sleep(20000);
}
NotifyItemFound函式接收到有資料加入到m_dataList中,就設定m_pipeEvent事件,通知執行緒處理資料。ProcessProc判斷m_isExit為false並且WaitForSingleObject返回不是WAIT_TIMEOUT,進入ProcessData()函式。
ProcessData宣告一個tempList臨時物件,然後把m_dataList的值賦給tempList,最後把m_dataList清空。
首先我們先探討執行緒同步問題,加入不給m_dataList加鎖,當執行緒執行到tempList = m_dataList之後,另外一個CWorkPipeline執行緒獲得cpu時間片,執行m_dataList.push(),這時候m_dataList新增一個未處理的資料,接下來返回到tempList = m_dataList後,執行m_dataList.clear(),這樣新增的未處理的資料就丟失了,因此這裡必須給m_dataList上鎖,保證任何時刻只有一個執行緒獲得m_dataList的使用許可權。
這裡有人可能會疑問,為什麼要另外宣告一個tempList物件呢?為什麼不直接使用m_dataList用於計算呢?
原因是,在多執行緒同步中,我們為了提高程式的執行效率,儘可能的讓執行緒獲得共享資料(m_dataList)的使用許可權時間最短,假如我們直接使用m_dataList計算資料,那麼我們必須對一大堆計算邏輯的程式碼進行加鎖或者在每次訪問m_dataList的時候加鎖,這樣效率是相當慢的。我想大家應該很容易理解明白。
好了,多執行緒就暫時講到這裡,語言冗餘,請大家多多包涵,也希望大家多多指正其中可能的錯誤。