muduo原始碼分析--執行緒池的實現
阿新 • • 發佈:2019-02-09
muduo執行緒池主要有3個類的實現:EventLoop,EventLoopThreadPool,EventLoopThread,和Thread.
1.Thread為對一個執行緒的封裝:
class Thread : boost::noncopyable
{
public:
typedef boost::function<void ()> ThreadFunc; //建立執行緒時,傳遞的函式
explicit Thread(const ThreadFunc&, const string& name = string());
#ifdef __GXX_EXPERIMENTAL_CXX0X__
explicit Thread(ThreadFunc&&, const string& name = string());
#endif
~Thread();
void start(); //呼叫start,新建的執行緒開始執行ThreadFunc函式。
int join(); // return pthread_join()
bool started() const { return started_; }
// pthread_t pthreadId() const { return pthreadId_; }
pid_t tid() const { return *tid_; }
const string& name() const { return name_; }
static int numCreated() { return numCreated_.get(); }
private:
void setDefaultName();
bool started_;
bool joined_;
pthread_t pthreadId_;
boost::shared_ptr<pid_t> tid_;
ThreadFunc func_;
string name_;
};
}
Thread::Thread(ThreadFunc&& func, const string& n)
: started_(false),
joined_(false),
pthreadId_(0),
tid_(new pid_t(0)),
func_(std::move(func)),
name_(n)
{
setDefaultName();
}
Thread::~Thread()
{
if (started_ && !joined_)
{
pthread_detach(pthreadId_);
}
}
void Thread::start() //建立一個執行緒
{
assert(!started_);
started_ = true;
// FIXME: move(func_)
detail::ThreadData* data = new detail::ThreadData(func_, name_, tid_);
if (pthread_create(&pthreadId_, NULL, &detail::startThread, data))
{
started_ = false;
delete data; // or no delete?
LOG_SYSFATAL << "Failed in pthread_create";
}
}
int Thread::join() //建立該執行緒的執行緒可以呼叫join,等待執行緒退出
{
assert(started_);
assert(!joined_);
joined_ = true;
return pthread_join(pthreadId_, NULL);
}
Thred執行緒類的使用:
Thread thread(func,”thread”); //例項化一個執行緒物件,其中func是一個函式指標
thread.start(); //建立一個執行緒,並開始執行
2,EventLoopThread類封裝一個EventLoop執行緒,即對Thread的進一步封裝
class EventLoopThread : boost::noncopyable
{
public:
typedef boost::function<void(EventLoop*)> ThreadInitCallback;//執行緒初始化回撥
EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),
const string& name = string());
~EventLoopThread();
EventLoop* startLoop(); //開始
private:
void threadFunc();
EventLoop* loop_;
bool exiting_;
Thread thread_;
MutexLock mutex_;
Condition cond_;
ThreadInitCallback callback_;
};
//EventLoopThread 建構函式會建立一個thread.
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb,
const string& name)
: loop_(NULL),
exiting_(false),
thread_(boost::bind(&EventLoopThread::threadFunc, this), name),
mutex_(),
cond_(mutex_),
callback_(cb)
{
}
EventLoopThread::~EventLoopThread()
{
exiting_ = true;
if (loop_ != NULL) // not 100% race-free, eg. threadFunc could be running callback_.
{
// still a tiny chance to call destructed object, if threadFunc exits just now.
// but when EventLoopThread destructs, usually programming is exiting anyway.
loop_->quit();
thread_.join();
}
}
//呼叫該函式會呼叫thread.start()-》EventLoopThread::threadFunc()ThreadFunc()->例項化一個EventLoop-》return loop_;
EventLoop* EventLoopThread::startLoop()
{
assert(!thread_.started());
thread_.start();
{
MutexLockGuard lock(mutex_);
while (loop_ == NULL)
{
cond_.wait();
}
}
return loop_;
}
void EventLoopThread::threadFunc()// thread_.start();會執行該函式,並且線上程內例項化一個新的EventLoop例項,並會停留在loop.loop();
{
EventLoop loop;
if (callback_)
{
callback_(&loop);
}
{
MutexLockGuard lock(mutex_);
loop_ = &loop;
cond_.notify();
}
loop.loop();
//assert(exiting_);
loop_ = NULL;
}
EventLoopThread類的使用:
EventLoopThread eventloopthread;//例項化一個EventLoopThread,沒有ThreadInitCallback
EventLoop* ioloop = eventloopthread.startLoop();
//執行該執行緒的執行緒將會建立一個新的執行緒,這個新的執行緒有一個自己的EventLoop,並且會返回他的EventLoop物件的指標。即新執行緒的EventLoop物件是暴露給父執行緒的。
3,EventLoopThreadPool是管理EventLoopThread的池。
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg)
: baseLoop_(baseLoop), //baseLoop為主執行緒擁有的EventLoop;
name_(nameArg),
started_(false),
numThreads_(0), //該執行緒池需要建立的執行緒個數
next_(0)
{
}
EventLoopThreadPool::~EventLoopThreadPool()
{
// Don't delete loop, it's stack variable
}
//該函式會建立 numThreads_個EventLoopThread物件並執行各個執行緒,並在主執行緒儲存建立的EventLoopThread物件和EventLoopThread執行緒建立的EventLoop物件。如果建立的執行緒為0,則執行cb(baseLoop_);
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
assert(!started_);
baseLoop_->assertInLoopThread();
started_ = true;
for (int i = 0; i < numThreads_; ++i)
{
char buf[name_.size() + 32];
snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
EventLoopThread* t = new EventLoopThread(cb, buf);
threads_.push_back(t);
loops_.push_back(t->startLoop());
}
if (numThreads_ == 0 && cb)
{
cb(baseLoop_);
}
}
//呼叫該函式會按照輪流的順序返回池裡的執行緒的EventLoop物件指標
EventLoop* EventLoopThreadPool::getNextLoop()
{
baseLoop_->assertInLoopThread();
assert(started_);
EventLoop* loop = baseLoop_;
if (!loops_.empty())
{
// round-robin
loop = loops_[next_];
++next_;
if (implicit_cast<size_t>(next_) >= loops_.size())
{
next_ = 0;
}
}
return loop;
}
void EventLoopThreadPool::setThreadNum(int numThreads) { numThreads_ = numThreads; }
EventLoopThreadPool類的使用:
EventLoop el;
EventLoopThreadPool eltp(el);
eltp.setThreadNum(5);
eltp.start();