1. 程式人生 > >windows C++多執行緒程式設計高階篇 實現執行緒同步

windows C++多執行緒程式設計高階篇 實現執行緒同步

    上一篇文章windows程式設計 使用C++實現多執行緒類僅僅是介紹了怎樣用類來實現多執行緒,這篇文章則重點介紹多執行緒中資料同步的問題。好了,廢話不多說,進入主題。

    問題場景:這裡我們假設有這樣一個工作流水線(CWorkPipeline),它不斷的生成一個SquareInfo的物件,這個物件包含x和y座標,同時包括一個未得到結果的平方和(squareSum),這些流水線類似於現實世界的工廠不斷產出產品(SquareInfo物件),很多個這樣的流水線把產生的SquareInfo物件彙集給處理中心。

    處理中心(CProcessCenter)是一個多執行緒類,它不斷接收流水線上的資料,並計算每個SquareInfo物件的squareSum,同時把對應的資訊打印出來。

    我們看CWorkPipeline的定義:

/************************************************************************/
/* FileName: WorkPipeline.h
 * Date: 2015-5-13
 * Author: huangtianba
 * Description: 模擬工作流水線類
 */
/************************************************************************/

#pragma once

#include <Windows.h>
#include <functional>

struct SquareInfo
{
    int x;
    int y;
    int squareSum;
};

typedef std::tr1::function<void (const SquareInfo &squareInfo)> FoundItemFn;

class CWorkPipeline
{
public:
    CWorkPipeline();
    ~CWorkPipeline();


    bool Init(const FoundItemFn &foundItemFn);
    bool UnInit();

    static DWORD CALLBACK WorkThread(LPVOID lpParam);
    DWORD WorkProc();

private:
    HANDLE m_hThread;
    FoundItemFn m_foundItemFn;
};

    這裡需要注意的是Init()函式接受一個FoundItemFn的函式物件,這個物件是CProcessCenter出給它的回撥,用來接收流水線產生的SquareInfo物件。

    下面是CWorkPipeline類各個函式的實現:

/************************************************************************/
/* FileName: WorkPipeline.cpp
 * Date: 2015-5-14
 * Author: chenzba
 * Description:
 */
/************************************************************************/

#include "stdafx.h"
#include "WorkPipeline.h"
#include <time.h>

CWorkPipeline::CWorkPipeline(): m_hThread(NULL)
{

}


CWorkPipeline::~CWorkPipeline()
{

}


bool CWorkPipeline::Init(const FoundItemFn &foundItemFn)
{
    srand(unsigned int(time(NULL)));

    // 建立執行緒
    m_hThread = CreateThread(NULL, 0, &CWorkPipeline::WorkThread, this, CREATE_SUSPENDED, NULL);
    if (NULL == m_hThread) {
        return false;
    }

    m_foundItemFn = foundItemFn;
    ResumeThread(m_hThread);
    return true;
}


bool CWorkPipeline::UnInit()
{
    if (m_hThread != NULL) 
    {
        CloseHandle(m_hThread);
        m_hThread = NULL;
    }

    m_foundItemFn = FoundItemFn();
    return true;
}


DWORD CALLBACK CWorkPipeline::WorkThread(LPVOID lpParam)
{
    if (NULL == lpParam) {
        return 0;
    }

    CWorkPipeline *lpThis = reinterpret_cast<CWorkPipeline *> (lpParam);
    return lpThis->WorkProc();
}


// 執行緒處理函式
DWORD CWorkPipeline::WorkProc()
{
    while (true)
    {
        // 宣告一個SquareInfo物件,給x和y隨機賦值
        SquareInfo squareInfo = {};
        squareInfo.x = rand() % 10000;
        squareInfo.y = rand() % 10000;

        // 將squreInfo通知給回撥
        m_foundItemFn(squareInfo);

        // Sleep一段時間
        Sleep(max(20, rand() % 2000));
    }
        
    return 0;
}


    每個流水線物件都有一個執行緒,執行緒的工作就是沒隔一個不確定的時間產生一個SquareInfo物件,並把它拋給外面(CProcessCenter類物件)。這段程式碼就是m_foundItemFn(squareInfo)。

    這裡值得注意的一點是:UnInit()函式中必須將m_foundItemFn物件置為空物件,假如不這樣做,如果CProcessCenter物件已經從棧釋放,而流水線執行緒還沒退出,繼續呼叫m_foundItemFn(squareInfo)將導致不可預料的錯誤。

    下面看看CProcessCenter類的定義:

/************************************************************************/
/* FileName: ProcessCenter.h
 * Date: 2015-5-14
 * Author: chenzba
 * Description: 資料處理彙集中心,處理多個執行緒彙集過來的資料
 */
/************************************************************************/

#pragma once
#include <atlbase.h>
#include <atlsync.h>
#include <vector>
#include <list>

struct SquareInfo;
class CWorkPipeline;

class CProcessCenter
{
public:
    typedef ATL::CComAutoCriticalSection DataLock;

public:
    CProcessCenter();
    ~CProcessCenter();

    bool Init();
    bool UnInit();
    void DoWork();

    static DWORD CALLBACK ProcessThread(LPVOID lpParam);
    DWORD ProcessProc();

    //
    // 被CWorkPileline回撥的函式
    //
    void NotifyFoundItem(const SquareInfo &squareInfo);

    //
    // 通知執行緒退出
    //
    void NotifyExit();

    void ProcessData();
private:
    std::vector<CWorkPipeline *>    m_workLineList;     // 工作流水線執行緒列表
    std::list<SquareInfo>           m_dataList;         // 流水線產生的資料列表
    DataLock                        m_dataLock;         // m_dataList資料鎖
    HANDLE                          m_hThread;
    ATL::CEvent                     m_pipeEvent;        // 通知處理資料事件
    bool                            m_isExit;
};

    首先我們看Init()和UnInit()函式的實現:
/************************************************************************/
/* FileName: ProcessCenter.cpp
 * Date: 2015-5-14
 * Author: chenzba
 * Description: The implement of CProcessCenter class.
 */
/************************************************************************/

#include "stdafx.h"
#include "ProcessCenter.h"
#include "WorkPipeline.h"
#include <functional>
#include <iostream>
#include "../boost/tr1/functional.hpp"

CProcessCenter::CProcessCenter(): m_hThread(NULL), m_isExit(false)
{

}


CProcessCenter::~CProcessCenter()
{

}


bool CProcessCenter::Init()
{
    // 建立事件
    BOOL created = m_pipeEvent.Create(NULL, TRUE, FALSE, NULL);
    if (!created) {
        return false;
    }

    m_hThread = CreateThread(NULL, 0, &CProcessCenter::ProcessThread, this, CREATE_SUSPENDED, NULL);
    if (NULL == m_hThread)
    {
        UnInit();
        return false;
    }

    // 建立10個工作流水線
    for (int i = 0; i < 10; i++)
    {
        CWorkPipeline *pipeLine = new(std::nothrow) CWorkPipeline();
        if (pipeLine != NULL)
        {
            pipeLine->Init(std::tr1::bind(&CProcessCenter::NotifyFoundItem, this, std::tr1::placeholders::_1));
            m_workLineList.push_back(pipeLine);
        }
    }

    m_isExit = false;
    ResumeThread(m_hThread);
   
    return true;
}


bool CProcessCenter::UnInit()
{
    // 釋放流水線資源
    std::vector<CWorkPipeline *>::iterator it;
    for (it = m_workLineList.begin(); it != m_workLineList.end(); it++)
    {
        if ((*it) != NULL)
        {
            (*it)->UnInit();
            delete (*it);
            (*it) = NULL;
        }
    }
    m_workLineList.clear();

    if (m_pipeEvent != NULL) {
        m_pipeEvent.Set();
    }

    if (m_hThread != NULL)
    {
        WaitForSingleObject(m_hThread, 100);
        CloseHandle(m_hThread);
        m_hThread = NULL;
    }
    m_pipeEvent.Close();
    m_isExit = true;

    return true;
}

    這裡模擬資料處理中心擁有10個工作流水線,將它們分配在堆上,並用一個vector的指標列表存放。注意事件物件和執行緒的建立順序,ResumeThread要放到最後,這意味著當所有資源建立完成後才啟動執行緒。

    要說明一點,

std::tr1::bind(&CProcessCenter::NotifyFoundItem, this, std::tr1::placeholders::_1)
    使用bind方法產生一個函式物件,有些人編譯的時候可能找不到bind函式。如果編譯環境支援c++11(vs2012以上),bind函式屬於c++標準庫,直接在std::中就能找到;但是在vs2008或者一下,不支援c++11的編譯環境(也就是我現在寫程式碼的環境),bind函式是屬於boost庫的函式。Boost庫可以跟C++標準庫完美共同工作,並且為其提供擴充套件功能,因此,這裡需要引用Boost庫,相關庫大家可以到網上了解相關資訊。

    下面我們繼續看NotifyFoundItem函式的實現:

void CProcessCenter::NotifyFoundItem(const SquareInfo &squareInfo)
{
    // 資料同步 加鎖
    CCritSecLock locker(m_dataLock.m_sec);
    m_dataList.push_back(squareInfo);
    m_pipeEvent.Set();
}
    我們回到CWorkPipeline中的執行緒函式,m_foundItemFn(squareInfo),這句程式碼就進入NotifyFoundItem函式,在這裡找個函式被10執行緒呼叫,m_dataList是共享資料,這裡給m_dataList加鎖。下面繼續,看看加鎖的原因。
DWORD CALLBACK CProcessCenter::ProcessThread(LPVOID lpParam)
{
    if (NULL == lpParam) {
        return 0;
    }

    CProcessCenter *lpThis = reinterpret_cast<CProcessCenter *> (lpParam);
    return lpThis->ProcessProc();
}


DWORD CProcessCenter::ProcessProc()
{
    while (true)
    {
        DWORD dwResult = WaitForSingleObject(m_pipeEvent, 2000);
        if (dwResult == WAIT_TIMEOUT && (!m_isExit)) {
            continue;
        }

        // 處理資料
        ProcessData();

        if (m_isExit) {
            break;
        }
    }

    return 0;
}

void CProcessCenter::ProcessData()
{
    std::list<SquareInfo> tempList;

    m_dataLock.Lock();
    tempList = m_dataList;
    m_dataList.clear();
    m_pipeEvent.Reset();
    m_dataLock.Unlock();

    int counter = 0;
    std::list<SquareInfo>::iterator it;
    for (it = tempList.begin(); it != tempList.end(); it++)
    {
        counter++;
        (*it).squareSum = (*it).x * (*it).x + (*it).y * (*it).y;
        std::cout <<"x: " <<(*it).x <<" y: " <<(*it).y <<" square sum: " <<(*it).squareSum <<std::endl;
    }

    std::cout <<"-------------------------------------------------------" <<counter <<std::endl;
}

void CProcessCenter::NotifyExit()
{
    m_isExit = true;
}


void CProcessCenter::DoWork()
{
    Sleep(20000);
}
    NotifyItemFound函式接收到有資料加入到m_dataList中,就設定m_pipeEvent事件,通知執行緒處理資料。ProcessProc判斷m_isExit為false並且WaitForSingleObject返回不是WAIT_TIMEOUT,進入ProcessData()函式。

    ProcessData宣告一個tempList臨時物件,然後把m_dataList的值賦給tempList,最後把m_dataList清空。

    首先我們先探討執行緒同步問題,加入不給m_dataList加鎖,當執行緒執行到tempList = m_dataList之後,另外一個CWorkPipeline執行緒獲得cpu時間片,執行m_dataList.push(),這時候m_dataList新增一個未處理的資料,接下來返回到tempList = m_dataList後,執行m_dataList.clear(),這樣新增的未處理的資料就丟失了,因此這裡必須給m_dataList上鎖,保證任何時刻只有一個執行緒獲得m_dataList的使用許可權。

    這裡有人可能會疑問,為什麼要另外宣告一個tempList物件呢?為什麼不直接使用m_dataList用於計算呢?

    原因是,在多執行緒同步中,我們為了提高程式的執行效率,儘可能的讓執行緒獲得共享資料(m_dataList)的使用許可權時間最短,假如我們直接使用m_dataList計算資料,那麼我們必須對一大堆計算邏輯的程式碼進行加鎖或者在每次訪問m_dataList的時候加鎖,這樣效率是相當慢的。我想大家應該很容易理解明白。

    好了,多執行緒就暫時講到這裡,語言冗餘,請大家多多包涵,也希望大家多多指正其中可能的錯誤。