1. 程式人生 > >libuv封裝任務執行緒池

libuv封裝任務執行緒池

Task.h

#ifndef __CTASK__H_
#define __CTASK__H_
#include "RcObject.h"
class CTask: public CRcObject
{
public:
    virtual int TaskInit() = 0;
    virtual int TaskExcute() = 0;
    virtual int TaskQuit() = 0;
};

#endif
#ifndef __CUVTASKTHREAD__H_
#define __CUVTASKTHREAD__H_
#include "UvThread.h"
#include "Task.h"
class CUvTaskThread : public CUvThread
{
public:
    CUvTaskThread();
    ~CUvTaskThread();

public:
    int SetTask(CTask* pTask);

protected:
    int OnThreadRun();
    CTask* mpTask;
};

#endif


#include "UvTaskThread.h"
#include "UvTaskPool.h"

CUvTaskThread::CUvTaskThread(){
    mpTask = nullptr;
}

CUvTaskThread::~CUvTaskThread(){
}

int CUvTaskThread::OnThreadRun() {
    for (;;) {
        if (nullptr == mpTask) {
            sUvTaskPool->PushTaskThread(this);
            Wait();
            continue;
        }
        
        mpTask->TaskInit();
        mpTask->TaskExcute();
        mpTask->TaskQuit();
        UNREF(mpTask);
        mpTask = nullptr;
    }

    return 0;
}

int CUvTaskThread::SetTask(CTask* pTask) {
    ASSERT_RET_VALUE(nullptr != pTask && nullptr == mpTask, 1);
    mpTask = pTask;
    REF(mpTask);
    return 0;
}

 

#ifndef __CUVTASKPOOL__H_
#define __CUVTASKPOOL__H_
#include "singleton.h"
#include "UvTaskThread.h"
#include "UvMutex.h"
#include <set>
#include <queue>
class CUvTaskPool : public CSingleton<CUvTaskPool>, public CUvThread
{
    SINGLE_CLASS_INITIAL(CUvTaskPool);
public:
    ~CUvTaskPool();

public:
    int PushTask(CTask* pTask);
    int PushTaskThread(CUvTaskThread* pTaskThread);

private:
    CTask* PopTask();
    int DispatchTask(CTask* pTask);

protected:
    int OnThreadRun();

private:
    std::queue<CTask*> mqueTasks;
    CUvMutex mcQueTasksMutex;

    std::set<CUvTaskThread*> msetTaskThreads;
    CUvMutex mcTaskThreadsMutex;
};

#define sUvTaskPool CUvTaskPool::Instance()
#endif

#include "UvTaskPool.h"

CUvTaskPool::CUvTaskPool(){
}

CUvTaskPool::~CUvTaskPool(){
}

int CUvTaskPool::OnThreadRun() {
    for (;;) {
        CTask* pTask = PopTask();
        if (nullptr == pTask) {
            Wait();
            continue;
        }

        DispatchTask(pTask);
    }
    return 0;
}

CTask* CUvTaskPool::PopTask() {
    CTask* pTask = nullptr;
    mcQueTasksMutex.Lock();
    if (!mqueTasks.empty()) {
        pTask = mqueTasks.front();
        mqueTasks.pop();
    }
    mcQueTasksMutex.UnLock();

    return pTask;
}

int CUvTaskPool::PushTask(CTask* pTask) {
    ASSERT_RET_VALUE(nullptr != pTask, 1);
    mcQueTasksMutex.Lock();
    mqueTasks.push(pTask);
    mcQueTasksMutex.UnLock();

    Activate();
    return 0;
}

int CUvTaskPool::PushTaskThread(CUvTaskThread* pTaskThread) {
    ASSERT_RET_VALUE(nullptr != pTaskThread, 1);
    mcTaskThreadsMutex.Lock();
    msetTaskThreads.insert(pTaskThread);
    mcTaskThreadsMutex.UnLock();
    return 0;
}

int CUvTaskPool::DispatchTask(CTask* pTask) {
    ASSERT_RET_VALUE(nullptr != pTask, 1);
    CUvTaskThread* pTaskThread = nullptr;
    mcTaskThreadsMutex.Lock();
    std::set<CUvTaskThread*>::iterator iter = msetTaskThreads.begin();
    if (iter != msetTaskThreads.end()) {
        pTaskThread = (CUvTaskThread*)*iter;
        if (nullptr != pTaskThread) {
            pTaskThread->SetTask(pTask);
            pTaskThread->Activate();
        }

        msetTaskThreads.erase(iter);
    }
    mcTaskThreadsMutex.UnLock();

    if (nullptr == pTaskThread) {
        pTaskThread = new CUvTaskThread();
        ASSERT_RET_VALUE(nullptr != pTaskThread, 1);
        pTaskThread->SetTask(pTask);
        pTaskThread->Start();
    }

    return 0;
}