1. 程式人生 > >執行緒池的坑

執行緒池的坑

最近實現執行緒池踩了不少坑…記錄如下:

問題描述

執行緒池建立了兩個執行緒,放兩個任務進去,發現任務沒執行.
例如:


void task()
{
    std::cout<<"hello world"<<std::endl;
}
void task2(int i)
{
    std::cout<<"hello world "<<i<<std::endl;
}
int main()
{
    threadpool p(10,2);//10 queuesize 2 threads

    p.start();
    p.addtask(task);
    p.addtask(boost::bind(task2,3
)); return 0; }

程式執行之後沒有任何列印在終端,但是gdb 和strace都是有輸出的.

解決

一共發現兩個問題最終解決之:
先看執行緒池的實現

threadpool::task threadpool::gettask()
{
    mutexlockguard lock(mutex_);
    while(queue_.empty()&&isstarted_)
    {
        notempty_.wait();
    }
    task t;
    if(!queue_.empty())//這裡非常重要
    //之前我寫的是  if(isstarted_)
{ assert(!queue_.empty()); t=queue_.front(); queue_.pop(); notfull_.notify(); } return t; } //這是執行緒執行體 //我們可以看到決定執行緒生命週期的是isstarted_這個變數 //當isstarted_變為false之後執行緒退出,pthread_join返回 void threadpool::runinthread() { while(isstarted_) { task t(gettask());//will blocked until "add" ops
if(t) { t();//run } } }

這裡解釋了執行緒池的執行邏輯…我們有一個內部的方法gettask將從佇列中取任務,如果執行緒還沒啟動或者沒有任務,這個函式將阻塞…

到這裡都沒什麼問題,因為我們會線上程池析構的時候,對等待的條件變數notify,並且將isstarted賦值為false,此時gettask()將返回一個空的task,迴圈將結束,pthread_join將順利執行….

//析夠函式
threadpool::~threadpool()
{
    if(isstarted_)
    {
    stop();
    }
}


void threadpool::stop()
{   
    {
    mutexlockguard lock(mutex_);
    isstarted_ = false;
    notempty_.notifyall(); //啟用所有的執行緒

    }
    for_each(threads_.begin(),
        threads_.end(),
        boost::bind(&thread::join, _1));

}

一切都看起來那麼的優雅美好,但是卻忽略一個問題:
如果主執行緒執行得太快,那麼很有可能工作執行緒還沒有拿到任務,isstarted_就為false了 條件變數也喚醒了,執行緒拿到一個空任務,什麼都不做就退出了,pthread_join也就返回了,執行緒沒有做任何事情就退出了….

試著發現這一切發生的原因: isstarted_的賦值不是執行緒安全的….但是上面的程式碼明明是用mutexlock進行保護了?

wait呼叫自動釋放鎖,在主執行緒中add一個任務,notify執行緒,wait返回又會加上鎖,析夠函式不應該拿到鎖修改臨界區變數…

問題二

同樣是執行緒沒執行就退出了,這個比較簡單,發現執行緒還沒有start就被析夠了,簡單的sleep就能解決,不過sleep不能做同步原語,因為最好的解決辦法還是用countdownlatch…

據我瞭解countdownlatch用法有兩個:

1.每個任務進行countdown操作,初始化時count為任務數

countdownlatch latch(2);
void task()
{
    std::cout<<"hello world"<<std::endl;
    latch.countdown();
}
void task2(int i)
{
    std::cout<<"hello world "<<i<<std::endl;

    latch.countdown();
}
int main()
{
    threadpool p(10,2);//10 queuesize 2 threads

    p.start();

    p.addtask(task);
    p.addtask(boost::bind(task2,3));
    p.addtask(boost::bind(&countdownlatch::countdown,&latch));

    latch.wait();

    return 0;
}

2.count初始化為1,在程式結束前wait,左右類似於一個barrier

void task()
{
    std::cout<<"hello world"<<std::endl;
}
void task2(int i)
{
    std::cout<<"hello world "<<i<<std::endl;

}
int main()
{
    threadpool p(10,2);//10 queuesize 2 threads

    countdownlatch latch(1);
    p.start();

    p.addtask(task);
    p.addtask(boost::bind(task2,3));

    p.addtask(boost::bind(&countdownlatch::countdown,&latch));

    latch.wait();

    return 0;
}