1. 程式人生 > >多執行緒死鎖及解決辦法

多執行緒死鎖及解決辦法

死鎖是由於不同執行緒按照不同順序進行加鎖而造成的。如:

執行緒Alock a加鎖 => lock b加鎖 => dosth => 釋放lock b => 釋放lock a

執行緒Block b加鎖 => lock a加鎖 => dosth => 釋放lock a => 釋放lock b

這樣兩條執行緒,就可能發生死鎖問題。要避免發生死鎖,應該使用同一個順序進行加鎖。

這點在物件單向呼叫的情況下是很容易達成的。物件單向呼叫的意思是如果物件a的函式呼叫了物件b的函式,則物件b中的函式不會去呼叫物件a的函式(注意:ab也可能同屬於一個類)。

舉個例子吧,假設聊天室(Room)物件room,聊天者(Chatter)物件chatter,假設ChatterRoom的定義如下:

class InnerChatter

{

public:

       void sendMsg(const string& msg)

       {

              boost::mutex::scoped_lock lock(mtx);

              socket->send(msg);

}

private:

       boost::mutex mtx;

       TcpSocket socket;

};

typedef boost::shared_ptr< InnerChatter> Chatter;

class InnerRoom

{

public:

       void sendMsg(const string& user, const string& msg)

       {

              boost::mutex::scoped_lock lock(mtx);

              if (chatters.find(user) != chatters.end())

              {

                     chatters[user]-> sendMsg(user);

              }

       }

private:

       boost::mutex mtx;

       map<string, Chatter> chatters;

};

目前程式碼中只存在Room呼叫Chatter的情況,不存在Chatter呼叫RoomRoom呼叫RoomChatter呼叫Chatter這三種情況。所以總是先獲得room鎖,再獲得chatter鎖,不會發生死鎖。

如果為Chatter加上傳送歷史和以下這個方法之後呢?

vector<string> history;

void sendMsgToChatter(Chatter dst, const string& msg)

{

       boost::mutex::scoped_lock lock(mtx);   // 加鎖當前物件

       history.push_back(msg);

       dsg>sendMsg(msg);      // 注意:次函式呼叫會加鎖dst物件

}

乍看起來似乎沒問題,但如果執行緒A執行chatterA.sendMsgToChatter(chatterB, “sth”)時,執行緒B正好執行chatterB.sendMsgToChatter(chatterA, “sth”),就會發生本文一開頭舉例的死鎖問題。

如果在Chatter中加入函式:

void sendMsgToAll(Room room, const string& msg)

{

       boost::mutex::scoped_lock lock(mtx);

       history.push_back(msg);

       room->sendMsgToAll(msg);

}

Room中加入函式:

void sendMsgToAll(const string& msg)

{

       boost::mutex::scoped_lock lock(mtx);

       for (map<string, Chatter>::iterator it = chatters.begin(); it != chatters.end(); ++it)

       {

              it->second->sendMsg(msg);

       }

}

顯然死鎖問題更嚴重了,也更令人抓狂了。也許有人要問,為什麼要這麼做,不能就保持Room單向呼叫Chatter嗎?大部分時候答案是肯定的,也建議大部分模組尤其是周邊模組如基礎設施模組使用明確清晰的單向呼叫關係,這樣可以減少對死鎖的憂慮,少白一些頭髮。

但有時候保證單向呼叫的代價太高:試想一下,如果被呼叫者b是一個容器類,呼叫者a定義了一些對元素的彙總操作如求和,為了避免回撥(回撥打破了單向呼叫約束),那就只有對b加鎖,複製所有元素,解鎖,遍歷求和。複製所有元素比較耗計算資源,有可能成為效能瓶頸。

另外還有設計方面的考慮。還舉RoomChatter的例子,如果避免Chatter呼叫RoomChatter,則Chatter很難實現啥高階功能,這樣所有程式碼都將堆砌在RoomRoom將成為一個超級類,帶來維護上的難度。此外還有設計上的不妥:因為幾乎全部面向物件的設計模式都可以理解成某種方式的回撥,禁止回撥也就禁掉了設計模式,可能帶來不便。

當物件間的相互呼叫無法避免時,如果只使用傳統的mutex,保證相同順序加鎖需要十分小心,萬一程式設計時失誤,測試時又沒發現(這是很可能的,死鎖很不容易測試出來),如果條件允許還可以手忙腳亂地火線gdb,若無法除錯定位,則伺服器可能要成為重啟帝了,對產品的形象十分有害。

我想出的解決方案是既然mutex要保證相同順序加鎖,就直接讓mutex和一個優先順序掛鉤,使用執行緒專有儲存(TSS)儲存當前執行緒優先順序最低的鎖,當對新的mutex加鎖時,如果mutex的優先順序< 當前優先順序(為什麼=不可以,參考上文說的sendMsgToChatter函式),才允許加鎖,否則記錄當前函式棧資訊,丟擲異常(要仔細設計以免破壞內部資料結構)。程式碼如下:

boost::thread_specific_ptr<global::stack<int>> locks_hold_by_current_thread;

class xrecursive_mutex

{

public:           

       xrecursive_mutex(int pri_level_)

              : recursion_count(0)

              , pri_level(pri_level_){}

       ~xrecursive_mutex(){}        

       class scoped_lock

       {

       public:

              scoped_lock(xrecursive_mutex& mtx_)

                     : mtx(mtx_)

              {

                     mtx.lock();

              }

              ~scoped_lock()

              {

                     mtx.unlock();

              }

       private:          

              xrecursive_mutex& mtx;

       };

private:

       int recursion_count;

       int pri_level;

       boost::recursive_mutex mutex;

       int get_recursion_count()

       {                  

              return recursion_count;

       }

       void lock()

       {

              mutex.lock();

              ++ recursion_count;                   

              if (recursion_count == 1)

              {

                     if (locks_hold_by_current_thread.get() == NULL)

                     {

                            locks_hold_by_current_thread.reset(new std::stack<int>());                           

                     }

                     if (!locks_hold_by_current_thread->empty() &&

                            locks_hold_by_current_thread->top()>= pri_level)

                     {     //     wrong order, lock failed

                            -- recursion_count;

                            mutex.unlock();    

                            XASSERT(false);//記錄棧資訊,拋異常

                     }

                     locks_hold_by_current_thread->push(pri_level);

              }                  

       }

       void unlock()

       {

              bool bad_usage_flag = false;

              if (recursion_count == 1 &&locks_hold_by_current_thread.get() != NULL)

              {

                     if (!locks_hold_by_current_thread->empty()

                            && (locks_hold_by_current_thread->top() == pri_level))

                     {                         

                            locks_hold_by_current_thread->pop();

                     }    

                     else

                     {

                            bad_usage_flag = true;

                     }

              }

              -- recursion_count;

              mutex.unlock();    

              XASSERT(!bad_usage_flag);//      // 記錄棧資訊,拋異常

       }

};

使用:

xrecursive_mutex mtx1(1);

xrecursive_mutex mtx2(2);

xrecursive_mutex mtx3(3);

xrecursive_mutex mtx3_2(3);

{

       xrecursive_mutex::scoped_lock lock1(mtx1);      // pass, 當前執行緒鎖優先順序1

       xrecursive_mutex::scoped_lock lock2(mtx3);      // pass, 當前執行緒鎖優先順序3

       ASSERT_ANY_THROW(xrecursive_mutex::scoped_lock lock2_2(mtx3_2)); // 捕獲異常,因為優先順序3 <= 當前執行緒鎖優先順序

       xrecursive_mutex::scoped_lock lock3(mtx3);      // pass, 可重入鎖

       xrecursive_mutex::scoped_lock lock4(mtx1);      // pass, 可重入鎖

       ASSERT_ANY_THROW(xrecursive_mutex::scoped_lock lock5(mtx2)); // 捕獲異常,因為優先順序 2<= 當前執行緒鎖優先順序3

}