1. 程式人生 > >簡單執行緒池的設計

簡單執行緒池的設計

  在網上觀摩了一些大佬關於執行緒池的實現後,我決定也親手寫一下簡單執行緒池,首先先解釋一下什麼是執行緒池,簡單的來說,就是預先建立一些執行緒,使它們處於睡眠狀態,當任務來臨時,喚醒執行緒讓它們去執行。使用執行緒池的好處有很多,比如,1.執行緒的建立和銷燬的開銷,無論從時間還是空間上來說是巨大的,而通過執行緒池的重用大大減少了這些不必要的開銷,當然既然少了這麼多消費記憶體的開銷,其執行緒執行速度也是得到提升,2.還有有效的控制執行緒的併發數,控制執行緒的併發數可以有效的避免大量的執行緒爭奪CPU資源而造成堵塞。

  關於設計這個執行緒池,從概念出發,預先建立一些執行緒(建立執行緒,其後必須也伴隨著銷燬執行緒),使它們處於睡眠狀態(掛起或者阻塞態),當任務來臨時(設計一個佇列專門裝任務),喚醒執行緒並執行(執行緒函式所完成),同時再設計一個具體描述任務的父類並設定成純虛擬函式,使用者在使用此執行緒池的時候,只需要重寫父類就可以了,所以大致需要實現的功能如下圖所示:

 

接下來按每個功能進行講述:

1.建立執行緒

 1     //先檢查引數正確性
 2     if(ThreadNUM_MIN < 0 || ThreadNUM_MAX < ThreadNUM_MIN)
 3         return false;
 4     //建立訊號量(在建立執行緒前建立訊號量,防止執行緒空轉)
 5     m_hSemphore = CreateSemaphore(NULL,0,ThreadNUM_MAX,0);
 6     //建立指定個數執行緒
 7     for(int i = 0;i < ThreadNUM_MIN;i++)
 8     {
 9         HANDLE handle = (HANDLE)_beginthreadex(NULL,0,&ThreadFunction,this,0,0);
10         if(handle)
11         {
12             m_lHandle.push_back(handle);
13         }
14     }
15     return true;

首先值得說的有兩點,第一點,在這裡建立執行緒我使用的是_beginthreadex(安全屬性,執行緒棧大小,執行緒函式地址,執行緒函式引數,執行緒初始態,執行緒識別符號),而不是用CreateThread(),這是因為如果在程式碼中有使用標準C執行庫中的函式時,儘量使用_beginthreadex()來代替CreateThread()(這個函式解決的應該是一個歷史遺留問題,標準C執行庫在1970年被實現了,由於當時沒任何一個作業系統提供對多執行緒的支援。因此編寫標準C執行庫的程式設計師根本沒考慮多執行緒程式使用標準C執行庫的情況)比如標準C執行庫的全域性變數errno。很多執行庫中的函式在出錯時會將錯誤代號賦值給這個全域性變數,這樣可以方便除錯。但如果有這樣的一個程式碼片段:

1 if (system("notepad.exe readme.txt") == -1)
2 {
3     switch(errno)
4     {
5         ...//錯誤處理程式碼
6     }

假設某個執行緒A在執行上面的程式碼,該執行緒在呼叫system()之後且尚未呼叫switch()語句時另外一個執行緒B啟動了,這個執行緒B也呼叫了標準C執行庫的函式,不幸的是這個函式執行出錯了並將錯誤代號寫入全域性變數errno中。這樣執行緒A一旦開始執行switch()語句時,它將訪問一個被B執行緒改動了的errno。這種情況必須要加以避免!因為不單單是這一個變數會出問題,其它像strerror()、strtok()、tmpnam()、gmtime()、asctime()等函式也會遇到這種由多個執行緒訪問修改導致的資料覆蓋問題,通過檢視原始碼可知,_beginthreadex()是先建立了一個記憶體塊,再呼叫CreateThread(),這個記憶體塊中用來存放一些需要執行緒獨享的資料。事實上新執行緒執行時會首先將記憶體塊與自己進一步關聯起來。然後新執行緒呼叫標準C執行庫函式如strtok()時就會先取得記憶體塊的地址再將需要保護的資料存入記憶體塊中。這樣每個執行緒就只會訪問和修改自己的資料而不會去篡改其它執行緒的資料了。

   第二點,在設計執行緒睡眠狀態時,有很多種方法 掛起(恢復指定執行緒),阻塞中有關鍵段/臨界區(無安全屬性,不適用),事件(無法喚醒指定執行緒,不適用),互斥量(同事件),所以我選擇的是訊號量用來阻塞執行緒和恢復執行緒,因為就如同停車場一樣,我只是開放了車位,至於哪輛車(執行緒)停進來由系統隨機分配。

 

 

2.銷燬執行緒

 1     m_bFlagExit = false;
 2     list<HANDLE>::iterator ite = m_lHandle.begin();
 4     while(ite != m_lHandle.end())
 5     {
 6         if(WaitForSingleObject(*ite,100) == WAIT_TIMEOUT)
 7             TerminateThread(*ite,-1);
 8         CloseHandle(*ite);
 9         *ite = 0;
10         ite++;
11     }
12     m_lHandle.clear();14     CloseHandle(m_hSemphore);
15     m_hSemphore = 0;

能自然退出就自然退出,若遇到執行緒無法關閉的情況,即(等待執行緒中核心物件的訊號量100ms內為無訊號時),強制退出

3.執行緒函式

    CMyThreadPool *pThis = (CMyThreadPool *)lpvoid;
    CItask *pItask = NULL;
    while(pThis->m_bFlagExit)
    {       
        if(!pThis->m_qItask.empty())                                       
        {
            pItask = pThis->m_qItask.front();
            pThis->m_qItask.pop();

            pItask->RunTask();
            delete pItask;
            pItask = NULL;
        }

    }

    return 0;

目的很簡單,在無退出標記的情況下,從佇列中取出一個任務,來一個執行緒去執行

4.投遞任務

    if(itask == NULL)
        return false;
    m_qItask.push(itask);
    //釋放一個訊號量
    ReleaseSemaphore(m_hSemphore,1,0);

將一個任務投入佇列中,並且釋放一個訊號量

5.程式碼優化

1》在重寫任務類後,嘗試建立了兩個執行緒去執行 從1加到10000000000的任務,不出意外的崩了,原因是 佇列迭代器失效,在經過一頓查閱後發現,是因為在C++中STL不支援執行緒安全,佇列的push和pop同時進行會崩掉,一般說來,STL對於多執行緒的支援僅限於下列兩點:(Effective STL中有描述)

1.多個讀取者是安全的。即多個執行緒可以同時讀取一個容器中的內容。 即此時多個執行緒呼叫 容器的不涉及到寫的介面都可以 eg find, begin, end 等.

2.對不同容器的多個寫入者是安全的。即多個執行緒對不同容器的同時寫入合法。 但是對於同一容器當有執行緒寫,有執行緒讀時,如何保證正確? 需要程式設計師自己來控制,比如:執行緒A讀容器某一項時,執行緒B正在移除該項。這會導致一下無法預知的錯誤。 通常的解決方式是用開銷較小的臨界區(CRITICAL_SECTION)來做同步。以下列方式同步基本上可以做到執行緒安全的容器(就是在有寫操作的情況下仍能保證安全)。

但是在查閱後,我打算利用一個bool變數去標記佇列中任務是否已經pop,來決定是否去push,還是崩了,原來多根執行緒也不允許同時push也不符合執行緒安全,那麼將push加入互斥量解決了這個問題。

2》但是作為CPU來說,執行緒池是由任務的個數來建立執行緒數,這樣才能最大利用的使用資源,這就像在餐館裡吃飯一樣,CPU是老闆,飯店裡最多有5個服務員(實現建立的執行緒最大數),在店的有2個(建立的執行緒數),此時來了一個客人(任務),此時放走一個服務員去執行即可(釋放訊號量),此時來了4個客人,我把不在店的2個服務員給叫回來,此時來了10個客人,飯店裡5個服務員都已經用完了,那麼剩下5個就只能排隊等待了,人數(任務)少時,服務員(執行緒)少,老闆(CPU)就可以減少開支(資源分配)。那麼這種方法作為程式碼就可以這樣實現。

 1 //1.是否有空閒執行緒
 2     if(m_lRunThreadNum < m_lCreateThreadNum) 
 3     {
 4         //釋放一個訊號量
 5         ReleaseSemaphore(m_hSemphore,1,0);
 6     }
 7     //2.是否達到上限
 8     else if(m_lCreateThreadNum < m_lMaxThreadNum) 
 9     {
10         //建立執行緒 並且釋放一個訊號量
11         HANDLE handle = (HANDLE)_beginthreadex(NULL,0,&ThreadFunction,this,0,0);
12         if(handle)
13         {
14             m_lHandle.push_back(handle);
15         }
16         m_lCreateThreadNum++;
17         ReleaseSemaphore(m_hSemphore,1,0);
18     }
19     //3.已達到上限 任務等待

3》線上程函式裡,多個執行緒去執行任務時,難免會出現執行緒併發的情況,解決執行緒併發最常見的莫過於執行緒同步,也就是上鎖,我列舉一下常見的幾種方式(如果有列舉不當的,歡迎指出):

 

  1.  原子訪問:同一時刻只允許一個執行緒訪問資源,具體運用就是Interlocked...一系列函式,但是鎖定範圍小,一般就是一個變數,我認為它運用的主要核心就是Volatile關鍵字,因為CPU運算速度過快,需要一個Cache快取來進行資料交換,而對於多執行緒來說,資料更改必須從記憶體中取用,而不是Cache,防止讀記憶體不同步,比如變數a已經減1了,但此時兩個執行緒中有一個執行緒讀取的還是a,而不是記憶體中的a-1,這個關鍵字的作用就是防止編譯優化,並且對於特殊地址的穩定訪問。
  2. 關鍵段:同一個時刻只允許一個執行緒訪問資源,舉一個不雅觀的例子,一群人上廁所,一個人進去後,將外面的牌子置為使用中,外面就有一群人在等待,當廁所裡的人出來後,再將牌子置為未使用,在有一個人進入,這樣就控制每一隻有一個人(執行緒),上廁所(訪問資源)了,而根據外面人等待時間的長短分為等不到就直接坐下來(其餘執行緒直接阻塞),站著等一段時間,裡面人出來了就直接進去,時間到了,裡面人還沒出來就直接阻塞(其餘執行緒使用旋轉鎖),還有一種就是衝進來直接推門,能推開就進去,推不開就找另一個廁所(程序)(其餘執行緒非同步處理)
  3. 互斥量,事件和訊號量:這三種都是核心物件,使用時很安全,並且作用範圍廣,可以跨程序進行通訊,並且通過waitforsingleobject()等待訊號的時間長短,都能實現關鍵段中三個方法,唯一的缺點就是相對於關鍵段來說執行效率慢,關鍵段是使用者態下的方法,不需要狀態轉換

根據這幾種方法,針對執行緒函式又加了一些鎖

 1 CMyThreadPool *pThis = (CMyThreadPool *)lpvoid;
 2     CItask *pItask = NULL;
 3     while(pThis->m_bFlagExit)
 4     {
 5         //等待訊號量
 6         if(WaitForSingleObject(pThis->m_hSemphore,100) == WAIT_TIMEOUT)        //為了能讓卡死程序能夠退出
 7             continue;
 8 
 9         InterlockedIncrement(&pThis->m_lRunThreadNum);       
10         while(!pThis->m_qItask.empty())                                        //由if->while 程式碼優化
11         {
12             if(WaitForSingleObject(pThis->m_lMutex,100) == WAIT_TIMEOUT)    //上鎖
13                 continue;
14             if(pThis->m_qItask.empty())
15             {
16                 ReleaseMutex(pThis->m_lMutex);                                    //解鎖
17                 break;
18             }
19             pItask = pThis->m_qItask.front();
20             pThis->m_qItask.pop();
21             //pThis->m_bIsPop = true;
22             ReleaseMutex(pThis->m_lMutex);                                    //解鎖
23 
24             pItask->RunTask();
25             delete pItask;
26             pItask = NULL;
27         }
28         InterlockedDecrement(&pThis->m_lRunThreadNum);
29 
30     }
31 
32     return 0;

4》在銷燬時,能清空的都要清空,防止記憶體洩漏

 1     m_bFlagExit = false;
 2     list<HANDLE>::iterator ite = m_lHandle.begin();
 3     //auto ite = m_lHandle.begin();
 4     while(ite != m_lHandle.end())
 5     {
 6         if(WaitForSingleObject(*ite,100) == WAIT_TIMEOUT)
 7             TerminateThread(*ite,-1);
 8         CloseHandle(*ite);
 9         *ite = 0;
10         ite++;
11     }
12     m_lHandle.clear();
13     m_lCreateThreadNum = 0;
14     CloseHandle(m_hSemphore);
15     m_hSemphore = 0;
16 
17     while(!m_qItask.empty())        //防止記憶體洩漏,將任務清空
18     {
19         CItask *pItask = NULL;
20         pItask = m_qItask.front();
21         m_qItask.pop();
22         delete pItask;
23         pItask = NULL;
24     }
25 
26     if(m_lMutex)                    //關閉互斥量
27     {
28         CloseHandle(m_lMutex);
29         m_lMutex = 0;
30     }

 

接下來在測試時就沒有問題了,但是有點卡,並且CPU的運算率達到了100%,後來查閱部落格https://blog.csdn.net/liuyancainiao/article/details/84400637資料,設定的執行緒數最多為CPU核數的兩倍,假設計算機有一個物理CPU,是雙核的,支援超執行緒。那麼這臺計算機就是雙核四執行緒的。 所以兩路(兩路指的是有兩個物理CPU)四核超執行緒就有2*4*2=16個邏輯CPU。有人也把它稱之為16核,實際上在linux的/proc/cpuinfo中檢視只有8核。既然計算機多核與超執行緒模擬相關,所以實際上計算機的核數翻倍並不意味著效能的翻倍,也不意味著核數越多計算機效能會越來越好,因為超執行緒只是充分利用了CPU的空閒資源,實際上在應用中基於很多原因,CPU的執行單元都沒有被充分使用。  具體的程式碼 ,我放在檔案共享,有需要的朋友可以直接拿走,不客氣^_^

 

 

2019-08-10 11:34:36 程式設計小菜鳥自我反思,路過的朋友可以留下自己建議和意見 謝謝!!!

 <