1. 程式人生 > >用ACE實現生產者-消費者模式

用ACE實現生產者-消費者模式

      ACE_Task是ACE中的任務或主動物件“處理結構”的基類。ACE使用此類來實現主動物件模式。所有希望成為“主動物件”的物件都必須由此類派生。同時可將它看作是更高階的、更為面向物件的執行緒。

ACE_Task處理的是物件,因此更有利於構造OO程式,產生更好的OO軟體,而且,它還包括了一種用於與其他任務通訊的易於使用的機制。 ACE_Task可用作: <1>更高階的執行緒(常稱其為任務) <2>主動物件模式中的主動物件 ACE任務: 每個任務都含有一或多個執行緒,以及一個底層訊息佇列。各個任務通過訊息佇列進行通訊。至於訊息佇列實現的內在細節程式設計師不必關注。傳送任務用putq() 將訊息插入到另一任務的訊息佇列中,接收任務通過使用getq()將訊息提取出來。這樣的體系結構大大簡化了多執行緒程式的程式設計模型。 要實現一個ACE_Task,首先寫一個類派生自ACE_Task,通常要實現三個方法。 1.實現服務初始化和終止方法。open(),用於包含所有專屬於任務的初始化程式碼,其中可能包含諸如連線控制塊,鎖和記憶體這樣的資源。close()是對應的終止方法。 2,呼叫啟動方法,物件例項化後,必須呼叫activate()啟用它。要在主動物件中建立的執行緒數目,以及一些引數,被傳遞給activate()方法。activate()方法會使svc()方法成為它生成執行緒的啟動點。 3,svc()方法,各個新執行緒在svc()方法中啟動。開發者在子類定義此方法。 舉個例子: consumer.h
#include "ace/Task.h"
#include "ace/Message_Block.h"
#include "ace/Log_Msg.h"

class Consumer : public ACE_Task<ACE_MT_SYNCH>
{
	public:
		int open(void *)
		{
            ACE_DEBUG((LM_DEBUG,"(%t) Producer task opened\n"));
			activate(THR_NEW_LWP,1);
			return 0;
		}
		int svc(void)
		{
            ACE_Message_Block *mb=0;
			do
			{
				mb=0;
				getq(mb);
				ACE_DEBUG((LM_DEBUG,
						"(%t)Got message: %d from remote task\n",*mb->rd_ptr()));

			}while(*mb->rd_ptr()<10);
			return 0;
		}
		int close(u_long)
		{
            ACE_DEBUG((LM_DEBUG,ACE_TEXT("Consumer closes down\n")));
			return 0;
		}
};
       producter.h  
#include "ace/Task.h"  
#include "ace/Message_Block.h"  
#include "ace/Log_Msg.h"
#include "consumer.h"
  
class Producer:  
public ACE_Task<ACE_MT_SYNCH>  
{  
public:  
    Producer(Consumer * consumer):  
    consumer_(consumer), data_(0)  
    {  
    mb_=new ACE_Message_Block((char*)&data_,sizeof(data_));  
    }  
  
    int open(void*)  
    {  
        ACE_DEBUG((LM_DEBUG, "(%t) Producer task opened \n"));  
  
        //Activate the Task  
        activate(THR_NEW_LWP,1);  
        return 0;  
    }  
  
//The Service Processing routine  
    int svc(void)  
    {  
        while(data_<11)  
        {  
        //Send message to consumer  
            ACE_DEBUG((LM_DEBUG,  
            "(%t)Sending message: %d to remote task\n",data_));  
            consumer_->putq(mb_);  
  
            //Go to sleep for a sec.  
            ACE_OS::sleep(1);  
            data_++;  
        }  
        return 0;  
    }  
  
    int close(u_long)  
    {  
        ACE_DEBUG((LM_DEBUG,"Producer closes down \n"));  
        return 0;  
    }  
  
private:  
    char data_;  
    Consumer * consumer_;  
    ACE_Message_Block * mb_;  
};  
      main.cpp
#include "consumer.h"
#include "producter.h"
  
int main(int argc, char * argv[])  
{  
    Consumer *consumer = new Consumer;  
    Producer *producer = new Producer(consumer);  
  
    producer->open(0);  
    consumer->open(0);  
//Wait for all the tasks to exit.  
    ACE_Thread_Manager::instance()->wait();  
}  
       說明一下activate函式,其THR_NEW_LWP是建立一個核心級執行緒。對於沒有繫結的執行緒來說。是新增一個新的核心執行緒到執行緒池。