C++任務佇列與多執行緒
摘要:
很多場合之所以使用C++,一方面是由於C++編譯後的native code的高效效能,另一方面是由於C++優秀的併發能力。並行方式有多程序 和多執行緒之分,本章暫且只討論多執行緒,多程序方面的知識會在其他章節具體討論。多執行緒是開發C++伺服器程式非常重要的基礎,如何根據需求具體的設計、分配執行緒以及執行緒間的通訊,也是伺服器程式非常重要的部分,除了能夠帶來程式的效能提高外,若設計失誤,則可能導致程式複雜而又混亂,變成bug滋生的溫床。所以設計、開發優秀的執行緒元件以供重用,無論如何都是值得的。
執行緒相關的api並不複雜,然而無論是linux還是windows系統,都是c風格的介面,我們只需簡單的封裝成物件,方便易用即可。任務佇列是設計成用來進行執行緒間通訊,使用任務佇列進行執行緒間通訊設計到一些模式,原理並不難理解,我們需要做到是弄清楚,在什麼場景下選用什麼樣的模式即可。
任務佇列的定義:
任務佇列對執行緒間通訊進行了抽象,限定了執行緒間只能通過傳遞任務,而相關的資料及操作則被任務儲存。任務佇列這個名詞可能在其他場景定義過其他意義,這裡討論的任務佇列定義為:能夠把封裝了資料和操作的任務在多執行緒間傳遞的執行緒安全的先入先出的佇列。其與執行緒關係示意圖如下:
注:兩個虛線框分別表示執行緒A和執行緒B恩能夠訪問的資料邊界,由此可見 任務佇列是執行緒間通訊的媒介。
任務佇列的實現:
任務的定義
生產者消費者模型在軟體設計中是極其常見的模型,常常被用來實現對各個元件或系統解耦合。大到分散式的系統互動,小到網路層物件和應用層物件的通訊,都會應用到生產者消費者模型,在任務佇列中,生產和消費的物件為“任務”。這裡把任務定義為組合了資料和操作的物件,或者簡單理解成包含了void (void*) 型別的函式指標和void* 資料指標的結構。我們把任務定義成類task_t,下面來分析一下task_t的實現。
插入程式碼:
class task_impl_i { public: virtual ~task_impl_i(){} virtual void run() = 0; virtual task_impl_i* fork() = 0; }; class task_impl_t: public task_impl_i { public: task_impl_t(task_func_t func_, void* arg_): m_func(func_), m_arg(arg_) {} virtualvoid run() { m_func(m_arg); } virtual task_impl_i* fork() { return new task_impl_t(m_func, m_arg); } protected: task_func_t m_func; void* m_arg; }; struct task_t { static void dumy(void*){} task_t(task_func_t f_, void* d_): task_impl(new task_impl_t(f_, d_)) { } task_t(task_impl_i* task_imp_): task_impl(task_imp_) { } task_t(const task_t& src_): task_impl(src_.task_impl->fork()) { } task_t() { task_impl = new task_impl_t(&task_t::dumy, NULL); } ~task_t() { delete task_impl; } task_t& operator=(const task_t& src_) { delete task_impl; task_impl = src_.task_impl->fork(); return *this; } void run() { task_impl->run(); } task_impl_i* task_impl; };
Task最重要的介面是run,簡單的執行儲存的操作,具體的操作儲存在task_impl_i的基類中,由於物件本身就是資料加操作的集合,所以構造task_impl_i的子類物件時,為其賦予不同的資料和操作即可。這裡使用了組合的方式實現了介面和實現的分離。這麼做的優點是應用層只需知道task的概念即可,對應task_impl_i不需要了解。由於不同的操作和資料可能需要構造不同task_impl_i子類,我們需要提供一些泛型函式,能夠將使用者的所有操作和資料都能輕易的轉換成task物件。task_binder_t 提供一系列的gen函式,能夠轉換使用者的普通函式和資料為task_t物件。
struct task_binder_t { //! C function static task_t gen(void (*func_)(void*), void* p_) { return task_t(func_, p_); } template<typename RET> static task_t gen(RET (*func_)(void)) { struct lambda_t { static void task_func(void* p_) { (*(RET(*)(void))p_)(); }; }; return task_t(lambda_t::task_func, (void*)func_); } template<typename FUNCT, typename ARG1> static task_t gen(FUNCT func_, ARG1 arg1_) { struct lambda_t: public task_impl_i { FUNCT dest_func; ARG1 arg1; lambda_t(FUNCT func_, const ARG1& arg1_): dest_func(func_), arg1(arg1_) {} virtual void run() { (*dest_func)(arg1); } virtual task_impl_i* fork() { return new lambda_t(dest_func, arg1); } }; return task_t(new lambda_t(func_, arg1_));
生產任務
函式封裝了使用者的操作邏輯,需要在某執行緒執行特定操作時,需要將操作對應的函式轉換成task_t,投遞到目的執行緒對應的任務佇列。任務佇列使用起來雖然像是在互相投遞訊息,但是根本上仍然是共享資料式的資料交換方式。主要步驟如下:
l 使用者函式轉換成task_t物件
l 鎖定目的執行緒的任務佇列,將task_t 放到任務佇列尾,當佇列為空時,目的執行緒會wait在條件變數上,此時需要signal喚醒目的執行緒
實現的關鍵程式碼如下:
void produce(const task_t& task_) { lock_guard_t lock(m_mutex); bool need_sig = m_tasklist.empty(); m_tasklist.push_back(task_); if (need_sig) { m_cond.signal(); } }
消費任務
消費任務的執行緒會變成完全的任務驅動,該執行緒只有一個職責,執行任務佇列的所有任務,若當前任務佇列為空時,執行緒會阻塞在條件變數上,重新有新任務到來時,執行緒會被再次喚醒。實現程式碼如下:
int consume(task_t& task_) { lock_guard_t lock(m_mutex); while (m_tasklist.empty()) { if (false == m_flag) { return -1; } m_cond.wait(); } task_ = m_tasklist.front(); m_tasklist.pop_front(); return 0; } int run() { task_t t; while (0 == consume(t)) { t.run(); } return 0; }
任務佇列的模式
單執行緒單任務佇列方式
任務佇列已經提供了run介面,繫結任務佇列的執行緒只需執行此函式即可,此函式除非使用者顯示的呼叫任務佇列的close介面,否則run函式永不返回。任務佇列的close介面是專門用來停止任務佇列的工作的,程式碼如下:
void close() { lock_guard_t lock(m_mutex); m_flag = false; m_cond.broadcast(); }
首先設定了關閉標記,然後在條件變數上執行broadcast, 任務佇列的run函式也會由此退出。在回頭看一下run介面的程式碼你會發現,檢查任務佇列是否關閉(m_flag 變數)的程式碼是在任務佇列為空的時候才檢測的,這樣能夠保證任務佇列被全部執行後,run函式才返回。
下面是一個使用任務佇列的helloworld的示例:
class foo_t { public: void print(int data) { cout << "helloworld, data:" <<data << " thread id:"<< ::pthread_self() << endl; } void print_callback(int data, void (*callback_)(int)) { callback_(data); } static void check(int data) { cout << "helloworld, data:" <<data << " thread id:"<< ::pthread_self() << endl; } }; // 單執行緒單任務佇列 void test_1() { thread_t thread; task_queue_t tq; thread.create_thread(task_binder_t::gen(&task_queue_t::run, &tq), 1); foo_t foo; for (int i = 0; i < 100; ++i) { cout << "helloworld, thread id:"<< ::pthread_self() << endl; tq.produce(task_binder_t::gen(&foo_t::print, &foo, i)); sleep(1); } thread.join(); } int main(int argc, char* argv[]) { test_1(); return 0; }
本例使用單執行緒單任務佇列的方式,由於只有一個執行緒繫結在任務佇列上,所以任務的執行會嚴格按照先入先出的方式執行。優點是能夠保證邏輯操作的有序性,所以最為常用。
多執行緒多工佇列方式
如果想利用更多執行緒,那麼建立更多執行緒的同時,仍然保證每個任務佇列繫結在單執行緒上。讓不同的任務佇列並行執行就可以了。
下面幾種情況適用此模式:
l 比如網遊中資料庫一般會建立連線池,使用者的操作資料庫都是有資料庫執行緒池完成,在將結果投遞給邏輯層。對每個使用者的資料增刪改查操作都必須是有序的,所以每個使用者繫結一個固定的任務佇列。而不同的使用者的資料修改互不干擾,不同的使用者分配不同的任務佇列即可。
l 比如網路層中的多個socket的讀寫是互不干擾的,可以建立兩個或更多執行緒,每個對應一個任務佇列,不同的socket的操作可以隨機的分配一個任務佇列(注意分配是隨機的,一旦分配了,單個socket的所有操作都會由這個任務佇列完成,保證邏輯有序性)。
示例程式碼:
//! 多執行緒多工佇列 void test_2() { thread_t thread; task_queue_t tq[3]; for (unsigned int i = 0; i < sizeof(tq)/sizeof(task_queue_t); ++i) { thread.create_thread(task_binder_t::gen(&task_queue_t::run, &(tq[i])), 1); } foo_t foo; cout << "helloworld, thread id:"<< ::pthread_self() << endl; for (unsigned int j = 0; j < 100; ++j) { tq[j % (sizeof(tq)/sizeof(task_queue_t))].produce(task_binder_t::gen(&foo_t::print, &foo, j)); sleep(1); } thread.join(); }
多執行緒單任務佇列方式
有時候可能並不需要邏輯操作的完全有序,而是要求操作儘可能快的執行,只要有空閒執行緒,任務就投遞到空閒執行緒立刻執行。如果時序不影響結果,這種模式會更有效率,下面幾種情況可能用到這種模式:
l 比如social game中的好友是從platform的api獲取的,需要http協議通訊,若採用curl等http庫同步通訊時,會阻塞執行緒,這是可以使用多執行緒單佇列方式,請求投遞到任務佇列後,只要有空閒執行緒立馬執行,使用者A雖然比使用者B先到達任務佇列,但是並不能保證A比B一定先獲取到好友列表,如果A有2k好友,而B只有兩個呢,當然有可能B請求更快。
//! 多執行緒單任務佇列 void test_3() { thread_t thread; task_queue_t tq; thread.create_thread(task_binder_t::gen(&task_queue_t::run, &tq), 3); foo_t foo; cout << "helloworld, thread id:"<< ::pthread_self() << endl; for (unsigned int j = 0; j < 100; ++j) { tq.produce(task_binder_t::gen(&foo_t::print, &foo, j)); sleep(1); } thread.join(); }
任務佇列的高階用法
非同步回撥
任務佇列的模式中列舉的例子都是執行緒間單項通訊,執行緒A將請求投遞給了B,但B執行完畢後A並沒有檢測結果。實際中往往都是需要將執行結果進行額外處理或者投遞到另外任務佇列。非同步回撥可以很好的解決這個問題,原理就是投遞任務時,同時包含檢查任務執行結果的函式。示例程式碼:
//! 非同步回撥 void test_4() { thread_t thread; task_queue_t tq; thread.create_thread(task_binder_t::gen(&task_queue_t::run, &tq), 1); foo_t foo; cout << "helloworld, thread id:"<< ::pthread_self() << endl; for (unsigned int j = 0; j < 100; ++j) { tq.produce(task_binder_t::gen(&foo_t::print_callback, &foo, j, &foo_t::check)); sleep(1); } thread.join(); }
非同步是效能優化非常重要的手段,下面如下場合可以使用非同步:
l 伺服器程式要求很高的實時性,幾乎邏輯層不執行io操作,io操作通過任務佇列被io執行緒執行成功後再通過回撥的方式傳回邏輯層。
l 網遊中使用者登入,需呀從資料庫載入使用者資料,資料庫層不需要知曉邏輯層如何處理使用者資料,當介面被呼叫時必須傳入回撥函式,資料庫層載入資料後直接呼叫回撥函式,而資料作為引數。
隱式任務佇列
使用任務佇列可以解耦多執行緒的設計。更加優秀的使用是將其封裝在介面之後。前邊的例子中都是顯示的操作了任務佇列物件。但這就限制了使用者必須知道某個介面需要繫結哪個任務佇列上,尤其是多執行緒多工佇列的例子,如果當用戶操作socket介面時還要知道socket對應哪個任務佇列就顯得不夠優雅了。Socket自己本身可以儲存對應任務佇列的引用,這樣使用者只需呼叫socket的介面,而介面內部再將請求投遞到爭取的任務佇列。示例程式碼:
void socket_impl_t::async_send(const string& msg_) { tq.produce(task_binder_t::gen(&socket_impl_t::send, &this, msg_)); } void socket_impl_t::send(const string& msg_) { //do send code }
總結:
l 設計多執行緒程式時,往往設計使用任務佇列是關鍵,好用、高效、靈活的任務佇列元件十分必需,本節介紹的實現支援多種多執行緒模式,易用易理解。
l 非同步回撥在多執行緒程式中非常常見,非同步往往是為了提高效能和系統吞吐量的,但是非同步其不可避免的會帶來複雜性,所以儘量保證非同步相關的步驟簡單。
l 任務佇列封裝物件介面的內部更佳,使用者直接呼叫介面,彷彿沒有任務佇列這回事,讓他在看不見的地方默默執行。
l 本節設計的任務佇列是執行緒安全的,並且關閉時已經投遞的任務能夠保證被 。