1. 程式人生 > >生產者/消費者模式之深入理解

生產者/消費者模式之深入理解

模板 必須 winapi 協議 針對 sso 額外 fill 功能實現

#include <windows.h>
#include <iostream>

const unsigned short SIZE_OF_BUFFER = 2; //緩沖區長度
unsigned short ProductID = 0;    //產品號
unsigned short ConsumeID = 0;    //將被消耗的產品號
unsigned short in = 0;      //產品進緩沖區時的緩沖區下標
unsigned short out = 0;      //產品出緩沖區時的緩沖區下標

int buffer[SIZE_OF_BUFFER];    //緩沖區是個循環隊列
bool p_ccontinue = true;      //控制程序結束
HANDLE Mutex;       //用於線程間的互斥
HANDLE FullSemaphore;     //當緩沖區滿時迫使生產者等待
HANDLE EmptySemaphore;     //當緩沖區空時迫使消費者等待

DWORD WINAPI Producer(LPVOID);    //生產者線程
DWORD WINAPI Consumer(LPVOID);    //消費者線程

int main()
{
    //創建各個互斥信號
	//註意,互斥信號量和同步信號量的定義方法不同,互斥信號量調用的是CreateMutex函數,同步信號量
	//調用的是CreateSemaphore函數,函數的返回值都是句柄。
    Mutex = CreateMutex(NULL,FALSE,NULL);
	EmptySemaphore = CreateSemaphore(NULL,SIZE_OF_BUFFER,SIZE_OF_BUFFER,NULL);
    //將上句做如下修改,看看結果會怎樣
	//EmptySemaphore = CreateSemaphore(NULL,0,SIZE_OF_BUFFER-1,NULL);
    FullSemaphore = CreateSemaphore(NULL,0,SIZE_OF_BUFFER,NULL);

    //調整下面的數值,可以發現,當生產者個數多於消費者個數時,
    //生產速度快,生產者經常等待消費者;反之,消費者經常等待 
    const unsigned short PRODUCERS_COUNT = 1;  //生產者的個數
    const unsigned short CONSUMERS_COUNT = 3;  //消費者的個數

    //總的線程數
    const unsigned short THREADS_COUNT = PRODUCERS_COUNT+CONSUMERS_COUNT;

    HANDLE hThreads[THREADS_COUNT]; //各線程的handle
    DWORD producerID[PRODUCERS_COUNT]; //生產者線程的標識符
    DWORD consumerID[CONSUMERS_COUNT]; //消費者線程的標識符

	//創建生產者線程
    for (int i=0;i<PRODUCERS_COUNT;++i){
        hThreads[i]=CreateThread(NULL,0,Producer,NULL,0,&producerID[i]);
        if (hThreads[i]==NULL) return -1;
    }
    //創建消費者線程
    for (int i=0;i<CONSUMERS_COUNT;++i){
        hThreads[PRODUCERS_COUNT+i]=CreateThread(NULL,0,Consumer,NULL,0,&consumerID[i]);
        if (hThreads[i]==NULL) return -1;
    }

    while(p_ccontinue){
        if(getchar()){ //按回車後終止程序運行
            p_ccontinue = false;
        }
    }

    return 0;
}

//生產一個產品。簡單模擬了一下,僅輸出新產品的ID號
void Produce()
{
    std::cout << std::endl<< "Producing " << ++ProductID << " ... ";
    std::cout << "Succeed" << std::endl;
}

//把新生產的產品放入緩沖區
void Append()
{
    std::cerr << "Appending a product ... ";
    buffer[in] = ProductID;
    in = (in+1)%SIZE_OF_BUFFER;
    std::cerr << "Succeed" << std::endl;

    //輸出緩沖區當前的狀態
    for (int i=0;i<SIZE_OF_BUFFER;++i){
        std::cout << i <<": " << buffer[i];
        if (i==in) std::cout << " <-- 生產";
        if (i==out) std::cout << " <-- 消費";
        std::cout << std::endl;
    }
}

//從緩沖區中取出一個產品
void Take()
{
    std::cerr << "Taking a product ... ";
    ConsumeID = buffer[out];
	buffer[out] = 0;
    out = (out+1)%SIZE_OF_BUFFER;
    std::cerr << "Succeed" << std::endl;

    //輸出緩沖區當前的狀態
    for (int i=0;i<SIZE_OF_BUFFER;++i){
        std::cout << i <<": " << buffer[i];
        if (i==in) std::cout << " <-- 生產";
        if (i==out) std::cout << " <-- 消費";
        std::cout << std::endl;
    }
}

//消耗一個產品
void Consume()
{
    std::cout << "Consuming " << ConsumeID << " ... ";
    std::cout << "Succeed" << std::endl;
}

//生產者
DWORD  WINAPI Producer(LPVOID lpPara)
{
    while(p_ccontinue){
        WaitForSingleObject(EmptySemaphore,INFINITE);	//p(empty);
        WaitForSingleObject(Mutex,INFINITE);	//p(mutex);
        Produce();
        Append();
        Sleep(1500);
        ReleaseMutex(Mutex);	//V(mutex);
        ReleaseSemaphore(FullSemaphore,1,NULL);	//V(full);
    }
    return 0;
}

//消費者
DWORD  WINAPI Consumer(LPVOID lpPara)
{
    while(p_ccontinue){
        WaitForSingleObject(FullSemaphore,INFINITE);	//P(full);
        WaitForSingleObject(Mutex,INFINITE);		//P(mutex);
        Take();
        Consume();
        Sleep(1500);
        ReleaseMutex(Mutex);		//V(mutex);
        ReleaseSemaphore(EmptySemaphore,1,NULL);		//V(empty);
    }
    return 0;
}

步驟三:在main函數中,分別創建生產者線程和消費者線程。其中CreateThread函數的參數說明如下。第三個參數即為執行過程,第六個參數是線程Id的地址。

lpsa

新線程的安全特性。

dwStackSize

新線程的堆棧大小。

pfnThreadProc

新線程的線程過程。

pvParam

將傳遞的參數傳遞給線程過程。

dwCreationFlags

創建標誌(0個或CREATE_SUSPENDED)。

pdwThreadId

[out] 中,若成功,接收新創建的線程的線程ID DWORD變量的地址

步驟四:1:首先我們將消費者設為1,生產者也設為1;

技術分享圖片

可以明顯發現,基本每次生產一下,就被消費了。也就是說,基本沒有線程出現大量時間的滯留

情況二:生產者為3,消費者為1

技術分享圖片

分析:生產者的能力這時就稍微強於情況一了,所以可以發現基本雙零的情況很少見。

情況三:生產者未一,消費者為三。

技術分享圖片

分析:通過線程的個數不同,我們可以給生產消費不同的能力大小。當生產者的線程數目多時,很容易發現,緩沖區的狀態基本很少為零。當消費者的數目較多時,可以發現緩沖區的狀態基本為零。

步驟五:

改變緩沖區的大小,當緩沖區空時迫使消費者等待,如果此時將這裏的緩沖區減小一個,應該可以推測出緩沖區的狀態,始終不可能為零。

★簡介

生產者消費者模式並不是GOF提出的23種設計模式之一,23種設計模式都是建立在面向對象的基礎之上的,但其實面向過程的編程中也有很多高效的編程模式,生產者消費者模式便是其中之一,它是我們編程過程中最常用的一種設計模式。

在實際的軟件開發過程中,經常會碰到如下場景:某個模塊負責產生數據,這些數據由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數、線程、進程等)。產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。

單單抽象出生產者和消費者,還夠不上是生產者/消費者模式。該模式還需要有一個緩沖區處於生產者和消費者之間,作為一個中介。生產者把數據放入緩沖區,而消費者從緩沖區取出數據。大概的結構如下圖。

技術分享圖片

為了不至於太抽象,我們舉一個寄信的例子(雖說這年頭寄信已經不時興,但這個例子還是比較貼切的)。假設你要寄一封平信,大致過程如下:

1、你把信寫好——相當於生產者制造數據

2、你把信放入郵筒——相當於生產者把數據放入緩沖區

3、郵遞員把信從郵筒取出——相當於消費者把數據取出緩沖區

4、郵遞員把信拿去郵局做相應的處理——相當於消費者處理數據

★優點

可能有同學會問了:這個緩沖區有什麽用捏?為什麽不讓生產者直接調用消費者的某個函數,直接把數據傳遞過去?搞出這麽一個緩沖區作甚?

其實這裏面是大有講究的,大概有如下一些好處。

◇解耦

假設生產者和消費者分別是兩個類。如果讓生產者直接調用消費者的某個方法,那麽生產者對於消費者就會產生依賴(也就是耦合)。將來如果消費者的代碼發生變化,可能會影響到生產者。而如果兩者都依賴於某個緩沖區,兩者之間不直接依賴,耦合也就相應降低了。

接著上述的例子,如果不使用郵筒(也就是緩沖區),你必須得把信直接交給郵遞員。有同學會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須得認識誰是郵遞員,才能把信給他(光憑身上穿的制服,萬一有人假冒,就慘了)。這就產生和你和郵遞員之間的依賴(相當於生產者和消費者的強耦合)。萬一哪天郵遞員換人了,你還要重新認識一下(相當於消費者變化導致修改生產者代碼)。而郵筒相對來說比較固定,你依賴它的成本就比較低(相當於和緩沖區之間的弱耦合)。

◇支持並發(concurrency)

生產者直接調用消費者的某個方法,還有另一個弊端。由於函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者就會白白糟蹋大好時光。

使用了生產者/消費者模式之後,生產者和消費者可以是兩個獨立的並發主體(常見並發類型有進程和線程兩種,後面的帖子會講兩種並發類型下的應用)。生產者把制造出來的數據往緩沖區一丟,就可以再去生產下一個數據。基本上不用依賴消費者的處理速度。

其實當初這個模式,主要就是用來處理並發問題的。

從寄信的例子來看。如果沒有郵筒,你得拿著信傻站在路口等郵遞員過來收(相當於生產者阻塞);又或者郵遞員得挨家挨戶問,誰要寄信(相當於消費者輪詢)。不管是哪種方法,都挺土的。

◇支持忙閑不均

緩沖區還有另一個好處。如果制造數據的速度時快時慢,緩沖區的好處就體現出來了。當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。等生產者的制造速度慢下來,消費者再慢慢處理掉。

為了充分復用,我們再拿寄信的例子來說事。假設郵遞員一次只能帶走1000封信。萬一某次碰上情人節(也可能是聖誕節)送賀卡,需要寄出去的信超過1000封,這時候郵筒這個緩沖區就派上用場了。郵遞員把來不及帶走的信暫存在郵筒中,等下次過來時再拿走。

費了這麽多口水,希望原先不太了解生產者/消費者模式的同學能夠明白它是怎麽一回事。接下來說說數據單元。

★啥是數據單元

何謂數據單元捏?簡單地說,每次生產者放到緩沖區的,就是一個數據單元;每次消費者從緩沖區取出的,也是一個數據單元。對於前一個帖子中寄信的例子,我們可以把每一封單獨的信件看成是一個數據單元。

不過光這麽介紹,太過於簡單,無助於大夥兒分析出這玩意兒。所以,後面咱們來看一下數據單元需要具備哪些特性。搞明白這些特性之後,就容易從復雜的業務邏輯中分析出適合做數據單元的東西了。

★數據單元的特性

分析數據單元,需要考慮如下幾個方面的特性:

◇關聯到業務對象

首先,數據單元必須關聯到某種業務對象。在考慮該問題的時候,你必須深刻理解當前這個生產者/消費者模式所對應的業務邏輯,才能夠作出合適的判斷。

由於“寄信”這個業務邏輯比較簡單,所以大夥兒很容易就可以判斷出數據單元是啥。但現實生活中,往往沒這麽樂觀。大多數業務邏輯都比較復雜,當中包含的業務對象是層次繁多、類型各異。在這種情況下,就不易作出決策了。

這一步很重要,如果選錯了業務對象,會導致後續程序設計和編碼實現的復雜度大為上升,增加了開發和維護成本。

◇完整性

所謂完整性,就是在傳輸過程中,要保證該數據單元的完整。要麽整個數據單元被傳遞到消費者,要麽完全沒有傳遞到消費者。不允許出現部分傳遞的情形。

對於寄信來說,你不能把半封信放入郵筒;同樣的,郵遞員從郵筒中拿信,也不能只拿出信的一部分。

◇獨立性

所謂獨立性,就是各個數據單元之間沒有互相依賴,某個數據單元傳輸失敗不應該影響已經完成傳輸的單元;也不應該影響尚未傳輸的單元。

為啥會出現傳輸失敗捏?假如生產者的生產速度在一段時間內一直超過消費者的處理速度,那就會導致緩沖區不斷增長並達到上限,之後的數據單元就會被丟棄。如果數據單元相互獨立,等到生產者的速度降下來之後,後續的數據單元繼續處理,不會受到牽連;反之,如果數據單元之間有某種耦合,導致被丟棄的數據單元會影響到後續其它單元的處理,那就會使程序邏輯變得非常復雜。

對於寄信來說,某封信弄丟了,不會影響後續信件的送達;當然更不會影響已經送達的信件。

◇顆粒度

前面提到,數據單元需要關聯到某種業務對象。那麽數據單元和業務對象是否要一一對應捏?很多場合確實是一一對應的。

不過,有時出於性能等因素的考慮,也可能會把N個業務對象打包成一個數據單元。那麽,這個N該如何取值就是顆粒度的考慮了。顆粒度的大小是有講究的。太大的顆粒度可能會造成某種浪費;太小的顆粒度可能會造成性能問題。顆粒度的權衡要基於多方面的因素,以及一些經驗值的考量。

還是拿寄信的例子。如果顆粒度過小(比如設定為1),那郵遞員每次只取出1封信。如果信件多了,那就得來回跑好多趟,浪費了時間。

如果顆粒度太大(比如設定為100),那寄信的人得等到湊滿100封信才拿去放入郵筒。假如平時很少寫信,就得等上很久,也不太爽。

可能有同學會問:生產者和消費者的顆粒度能否設置成不同大小(比如對於寄信人設置成1,對於郵遞員設置成100)。當然,理論上可以這麽幹,但是在某些情況下會增加程序邏輯和代碼實現的復雜度。後面討論具體技術細節時,或許會聊到這個問題。

好,數據單元的話題就說到這。希望通過本帖子,大夥兒能夠搞明白數據單元到底是怎麽一回事。下一個帖子,咱們來聊一下“基於隊列的緩沖區”,技術上如何實現。

[2]:隊列緩沖區

經過前面兩個帖子的鋪墊,今天終於開始聊一些具體的編程技術了。由於不同的緩沖區類型、不同的並發場景對於具體的技術實現有較大的影響。為了深入淺出、便於大夥兒理解,咱們先來介紹最傳統、最常見的方式。也就是單個生產者對應單個消費者,當中用隊列(FIFO)作緩沖。

關於並發的場景,在之前的帖子“進程還線程?是一個問題!”中,已經專門論述了進程和線程各自的優缺點,兩者皆不可偏廢。所以,後面對各種緩沖區類型的介紹都會同時提及進程方式和線程方式。

★線程方式

先來說一下並發線程中使用隊列的例子,以及相關的優缺點。

◇內存分配的性能

在線程方式下,生產者和消費者各自是一個線程。生產者把數據寫入隊列頭(以下簡稱push),消費者從隊列尾部讀出數據(以下簡稱pop)。當隊列為空,消費者就稍息(稍事休息);當隊列滿(達到最大長度),生產者就稍息。整個流程並不復雜。

那麽,上述過程會有什麽問題捏?一個主要的問題是關於內存分配的性能開銷。對於常見的隊列實現:在每次push時,可能涉及到堆內存的分配;在每次pop時,可能涉及堆內存的釋放。假如生產者和消費者都很勤快,頻繁地push、pop,那內存分配的開銷就很可觀了。對於內存分配的開銷,用Java的同學可以參見前幾天的帖子“Java性能優化[1]”;對於用C/C++的同學,想必對OS底層機制會更清楚,應該知道分配堆內存(new或malloc)會有加鎖的開銷和用戶態/核心態切換的開銷。

那該怎麽辦捏?請聽下文分解,關於“生產者/消費者模式[3]:環形緩沖區”。

◇同步和互斥的性能

另外,由於兩個線程共用一個隊列,自然就會涉及到線程間諸如同步啊、互斥啊、死鎖啊等等勞心費神的事情。好在"操作系統"這門課程對此有詳細介紹,學過的同學應該還有點印象吧?對於沒學過這門課的同學,也不必難過,網上相關的介紹挺多的(比如"這裏"),大夥自己去瞅一瞅。關於這方面的細節,咱今天就不多啰嗦了。

這會兒要細談的是,同步和互斥的性能開銷。在很多場合中,諸如信號量、互斥量等玩意兒的使用也是有不小的開銷的(某些情況下,也可能導致用戶態/核心態切換)。如果像剛才所說,生產者和消費者都很勤快,那這些開銷也不容小覷啊。

這又該咋辦捏?請聽下文的下文分解,關於“生產者/消費者模式[4]:雙緩沖區”。

◇適用於隊列的場合

剛才盡批判了隊列的缺點,難道隊列方式就一無是處?非也。由於隊列是很常見的數據結構,大部分編程語言都內置了隊列的支持(具體介紹見"這裏"),有些語言甚至提供了線程安全的隊列(比如JDK 1.5引入的ArrayBlockingQueue)。因此,開發人員可以撿現成,避免了重新發明輪子。

所以,假如你的數據流量不是很大,采用隊列緩沖區的好處還是很明顯的:邏輯清晰、代碼簡單、維護方便。比較符合KISS原則。

★進程方式

說完了線程的方式,再來介紹基於進程的並發。

跨進程的生產者/消費者模式,非常依賴於具體的進程間通訊(IPC)方式。而IPC的種類名目繁多,不便於挨個列舉(畢竟口水有限)。因此咱們挑選幾種跨平臺、且編程語言支持較多的IPC方式來說事兒。

◇匿名管道

感覺管道是最像隊列的IPC類型。生產者進程在管道的寫端放入數據;消費者進程在管道的讀端取出數據。整個的效果和線程中使用隊列非常類似,區別在於使用管道就無需操心線程安全、內存分配等瑣事(操作系統暗中都幫你搞定了)。

管道又分命名管道和匿名管道兩種,今天主要聊匿名管道。因為命名管道在不同的操作系統下差異較大(比如Win32和POSIX,在命名管道的API接口和功能實現上都有較大差異;有些平臺不支持命名管道,比如Windows CE)。除了操作系統的問題,對於有些編程語言(比如Java)來說,命名管道是無法使用的。所以我一般不推薦使用這玩意兒。

其實匿名管道在不同平臺上的API接口,也是有差異的(比如Win32的CreatePipe和POSIX的pipe,用法就很不一樣)。但是我們可以僅使用標準輸入和標準輸出(以下簡稱stdio)來進行數據的流入流出。然後利用shell的管道符把生產者進程和消費者進程關聯起來(沒聽說過這種手法的同學,可以看"這裏")。實際上,很多操作系統(尤其是POSIX風格的)自帶的命令都充分利用了這個特性來實現數據的傳輸(比如more、grep等)。

這麽幹有幾個好處:

1、基本上所有操作系統都支持在shell方式下使用管道符。因此很容易實現跨平臺。

2、大部分編程語言都能夠操作stdio,因此跨編程語言也就容易實現。

3、剛才已經提到,管道方式省卻了線程安全方面的瑣事。有利於降低開發、調試成本。

當然,這種方式也有自身的缺點:

1、生產者進程和消費者進程必須得在同一臺主機上,無法跨機器通訊。這個缺點比較明顯。

2、在一對一的情況下,這種方式挺合用。但如果要擴展到一對多或者多對一,那就有點棘手了。所以這種方式的擴展性要打個折扣。假如今後要考慮類似的擴展,這個缺點就比較明顯。

3、由於管道是shell創建的,對於兩邊的進程不可見(程序看到的只是stdio)。在某些情況下,導致程序不便於對管道進行操縱(比如調整管道緩沖區尺寸)。這個缺點不太明顯。

4、最後,這種方式只能單向傳數據。好在大多數情況下,消費者進程不需要傳數據給生產者進程。萬一你確實需要信息反饋(從消費者到生產者),那就費勁了。可能得考慮換種IPC方式。

順便補充幾個註意事項,大夥兒留意一下:

1、對stdio進行讀寫操作是以阻塞方式進行。比如管道中沒有數據,消費者進程的讀操作就會一直停在哪兒,直到管道中重新有數據。

2、由於stdio內部帶有自己的緩沖區(這緩沖區和管道緩沖區是兩碼事),有時會導致一些不太爽的現象(比如生產者進程輸出了數據,但消費者進程沒有立即讀到)。具體的細節,大夥兒可以看"這裏"。

◇SOCKET(TCP方式)

基於TCP方式的SOCKET通訊是又一個類似於隊列的IPC方式。它同樣保證了數據的順序到達;同樣有緩沖的機制。而且這玩意兒也是跨平臺和跨語言的,和剛才介紹的shell管道符方式類似。

SOCKET相比shell管道符的方式,有啥優點捏?主要有如下幾個優點:

1、SOCKET方式可以跨機器(便於實現分布式)。這是主要優點。

2、SOCKET方式便於將來擴展成為多對一或者一對多。這也是主要優點。

3、SOCKET可以設置阻塞和非阻塞方法,用起來比較靈活。這是次要優點。

4、SOCKET支持雙向通訊,有利於消費者反饋信息。

當然有利就有弊。相對於上述shell管道的方式,使用SOCKET在編程上會更復雜一些。好在前人已經做了大量的工作,搞出很多SOCKET通訊庫和框架給大夥兒用(比如C++的ACE庫、Python的Twisted)。借助於這些第三方的庫和框架,SOCKET方式用起來還是比較爽的。由於具體的網絡通訊庫該怎麽用不是本系列的重點,此處就不細說了。

雖然TCP在很多方面比UDP可靠,但鑒於跨機器通訊先天的不可預料性(比如網線可能被某傻X給拔錯了,網絡的忙閑波動可能很大),在程序設計上我們還是要多留一手。具體該如何做捏?可以在生產者進程和消費者進程內部各自再引入基於線程的"生產者/消費者模式"。這話聽著像繞口令,為了便於理解,畫張圖給大夥兒瞅一瞅。

技術分享圖片

這麽做的關鍵點在於把代碼分為兩部分:生產線程和消費線程屬於和業務邏輯相關的代碼(和通訊邏輯無關);發送線程和接收線程屬於通訊相關的代碼(和業務邏輯無關)。

這樣的好處是很明顯的,具體如下:

1、能夠應對暫時性的網絡故障。並且在網絡故障解除後,能夠繼續工作。

2、網絡故障的應對處理方式(比如斷開後的嘗試重連),只影響發送和接收線程,不會影響生產線程和消費線程(業務邏輯部分)。

3、具體的SOCKET方式(阻塞和非阻塞)只影響發送和接收線程,不影響生產線程和消費線程(業務邏輯部分)。

4、不依賴TCP自身的發送緩沖區和接收緩沖區。(默認的TCP緩沖區的大小可能無法滿足實際要求)

5、業務邏輯的變化(比如業務需求變更)不影響發送線程和接收線程。

針對上述的最後一條,再多啰嗦幾句。如果整個業務系統中有多個進程是采用上述的模式,那或許可以重構一把:在業務邏輯代碼和通訊邏輯代碼之間切一刀,把業務邏輯無關的部分封裝成一個通訊中間件(說中間件顯得比較牛X :-)。如果大夥兒對這玩意兒有興趣,以後專門開個帖子聊。

[3]:環形緩沖區

前一個帖子提及了隊列緩沖區可能存在的性能問題及解決方法:環形緩沖區。今天就專門來描述一下這個話題。

為了防止有人給咱扣上“過度設計”的大帽子,事先聲明一下:只有當存儲空間的分配/釋放非常頻繁並且確實產生了明顯的影響,你才應該考慮環形緩沖區的使用。否則的話,還是老老實實用最基本、最簡單的隊列緩沖區吧。還有一點需要說明一下:本文所提及的“存儲空間”,不僅包括內存,還可能包括諸如硬盤之類的存儲介質。

★環形緩沖區 vs 隊列緩沖區

◇外部接口相似

在介紹環形緩沖區之前,咱們先來回顧一下普通的隊列。普通的隊列有一個寫入端和一個讀出端。隊列為空的時候,讀出端無法讀取數據;當隊列滿(達到最大尺寸)時,寫入端無法寫入數據。

對於使用者來講,環形緩沖區和隊列緩沖區是一樣的。它也有一個寫入端(用於push)和一個讀出端(用於pop),也有緩沖區“滿”和“空”的狀態。所以,從隊列緩沖區切換到環形緩沖區,對於使用者來說能比較平滑地過渡。

◇內部結構迥異

雖然兩者的對外接口差不多,但是內部結構和運作機制有很大差別。隊列的內部結構此處就不多啰嗦了。重點介紹一下環形緩沖區的內部結構。

大夥兒可以把環形緩沖區的讀出端(以下簡稱R)和寫入端(以下簡稱W)想象成是兩個人在體育場跑道上追逐(R追W)。當R追上W的時候,就是緩沖區為空;當W追上R的時候(W比R多跑一圈),就是緩沖區滿。

為了形象起見,去找來一張圖並略作修改,如下:

技術分享圖片

從上圖可以看出,環形緩沖區所有的push和pop操作都是在一個固定的存儲空間內進行。而隊列緩沖區在push的時候,可能會分配存儲空間用於存儲新元素;在pop時,可能會釋放廢棄元素的存儲空間。所以環形方式相比隊列方式,少掉了對於緩沖區元素所用存儲空間的分配、釋放。這是環形緩沖區的一個主要優勢。

★環形緩沖區的實現

如果你手頭已經有現成的環形緩沖區可供使用,並且你對環形緩沖區的內部實現不感興趣,可以跳過這段。

◇數組方式 vs 鏈表方式

環形緩沖區的內部實現,即可基於數組(此處的數組,泛指連續存儲空間)實現,也可基於鏈表實現。

數組在物理存儲上是一維的連續線性結構,可以在初始化時,把存儲空間一次性分配好,這是數組方式的優點。但是要使用數組來模擬環,你必須在邏輯上把數組的頭和尾相連。在順序遍歷數組時,對尾部元素(最後一個元素)要作一下特殊處理。訪問尾部元素的下一個元素時,要重新回到頭部元素(第0個元素)。如下圖所示:

技術分享圖片

使用鏈表的方式,正好和數組相反:鏈表省去了頭尾相連的特殊處理。但是鏈表在初始化的時候比較繁瑣,而且在有些場合(比如後面提到的跨進程的IPC)不太方便使用。

◇讀寫操作

環形緩沖區要維護兩個索引,分別對應寫入端(W)和讀取端(R)。寫入(push)的時候,先確保環沒滿,然後把數據復制到W所對應的元素,最後W指向下一個元素;讀取(pop)的時候,先確保環沒空,然後返回R對應的元素,最後R指向下一個元素。

◇判斷“空”和“滿”

上述的操作並不復雜,不過有一個小小的麻煩:空環和滿環的時候,R和W都指向同一個位置!這樣就無法判斷到底是“空”還是“滿”。大體上有兩種方法可以解決該問題。

辦法1:始終保持一個元素不用

當空環的時候,R和W重疊。當W比R跑得快,追到距離R還有一個元素間隔的時候,就認為環已經滿。當環內元素占用的存儲空間較大的時候,這種辦法顯得很土(浪費空間)。

辦法2:維護額外變量

如果不喜歡上述辦法,還可以采用額外的變量來解決。比如可以用一個整數記錄當前環中已經保存的元素個數(該整數>=0)。當R和W重疊的時候,通過該變量就可以知道是“空”還是“滿”。

◇元素的存儲

由於環形緩沖區本身就是要降低存儲空間分配的開銷,因此緩沖區中元素的類型要選好。盡量存儲值類型的數據,而不要存儲指針(引用)類型的數據。因為指針類型的數據又會引起存儲空間(比如堆內存)的分配和釋放,使得環形緩沖區的效果打折扣。

★應用場合

剛才介紹了環形緩沖區內部的實現機制。按照前一個帖子的慣例,我們來介紹一下在線程和進程方式下的使用。

如果你所使用的編程語言和開發庫中帶有現成的、成熟的環形緩沖區,強烈建議使用現成的庫,不要重新制造輪子;確實找不到現成的,才考慮自己實現。如果你純粹是業余時間練練手,那另當別論。

◇用於並發線程

和線程中的隊列緩沖區類似,線程中的環形緩沖區也要考慮線程安全的問題。除非你使用的環形緩沖區的庫已經幫你實現了線程安全,否則你還是得自己動手搞定。線程方式下的環形緩沖區用得比較多,相關的網上資料也多,下面就大致介紹幾個。

對於C++的程序員,強烈推薦使用boost提供的circular_buffer模板,該模板最開始是在boost 1.35版本中引入的。鑒於boost在C++社區中的地位,大夥兒應該可以放心使用該模板。

對於C程序員,可以去看看開源項目circbuf,不過該項目是GPL協議的,不太爽;而且活躍度不太高;而且只有一個開發人員。大夥兒慎用!建議只拿它當參考。

對於C#程序員,可以參考CodeProject上的一個示例。

◇用於並發進程

進程間的環形緩沖區,似乎少有現成的庫可用。大夥兒只好自己動手、豐衣足食了。

適用於進程間環形緩沖的IPC類型,常見的有共享內存和文件。在這兩種方式上進行環形緩沖,通常都采用數組的方式實現。程序事先分配好一個固定長度的存儲空間,然後具體的讀寫操作、判斷“空”和“滿”、元素存儲等細節就可參照前面所說的來進行。

共享內存方式的性能很好,適用於數據流量很大的場景。但是有些語言(比如Java)對於共享內存不支持。因此,該方式在多語言協同開發的系統中,會有一定的局限性。

而文件方式在編程語言方面支持很好,幾乎所有編程語言都支持操作文件。但它可能會受限於磁盤讀寫(Disk I/O)的性能。所以文件方式不太適合於快速數據傳輸;但是對於某些“數據單元”很大的場合,文件方式是值得考慮的。

對於進程間的環形緩沖區,同樣要考慮好進程間的同步、互斥等問題,限於篇幅,此處就不細說了。

生產者/消費者模式之深入理解