1. 程式人生 > >ACE-Streams架構簡介及應用

ACE-Streams架構簡介及應用

系統 manager ask 循環 正數 rec img tchar 示例

一概述

Streams框架是管道和過濾構架模式的一種實現,主要應用於處理數據流的系統。其實現以Task框架為基礎。Task框架有兩個特性非常適用於Streams框架:一是Task框架可用於創建獨立線程的並發環境,這適合應用於ACE Streams框架中的主動過濾器;二是Task框架有統一的數據傳輸結果——消息隊列,這適用於Streams框架中的管道。

二ACE_Task類

這裏主要介紹與Streams框架相關的部分。

成員變量

 1 Task_T.h
 2 
 3 class ACE_Task : public ACE_Task_Base
 4 
 5 {
 6 
 7 /// Queue of messages on the ACE_Task..
8 9 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> *msg_queue_; 10 11 12 13 /// true if should delete Message_Queue, false otherwise. 14 15 bool delete_msg_queue_; 16 17 18 19 /// Back-pointer to the enclosing module. 20 21 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod_; 22 23
24 25 /// Pointer to adjacent ACE_Task. 26 27 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *next_; 28 29 }

put函數

在ACE_Task框架中,put函數沒有實際作用,在默認情況下,該函數沒有執行任何操作,僅僅返回0.但是在Streams框架中,put函數與put_next函數結合起來可以實現數據在過濾器間的傳輸。如果put函數將數據保存在消息隊列中,通過獨立的線程來處理這些消息,那麽它將成為一個主動過濾器;反之,如果put函數直接對數據進行處理,然後交給下一個過濾器,那麽它就是一個被動過濾器。

 1 Task.cpp
 2 
 3 /// Default ACE_Task put routine.
 4 
 5 int
 6 
 7 ACE_Task_Base::put (ACE_Message_Block *, ACE_Time_Value *)
 8 
 9 {
10 
11   ACE_TRACE ("ACE_Task_Base::put");
12 
13   return 0;
14 
15 }

put_next函數

如果在數據處理流水線有下一個過濾器,那麽put_next函數用於將數據交給下一個過濾器處理。如下:

 1 Task_T.inl
 2 
 3 // Transfer msg to the next ACE_Task.
 4 
 5  
 6 
 7 template <ACE_SYNCH_DECL, class TIME_POLICY> ACE_INLINE int
 8 
 9 ACE_Task<ACE_SYNCH_USE, TIME_POLICY>::put_next (ACE_Message_Block *msg, ACE_Time_Value *tv)
10 
11 {
12 
13   ACE_TRACE ("ACE_Task<ACE_SYNCH_USE, TIME_POLICY>::put_next");
14 
15   return this->next_ == 0 ? -1 : this->next_->put (msg, tv);
16 
17 }

next_指向的是有序過濾器的下一個過濾器。通過put_next函數,可以將數據交給下一個過濾器處理。

Streams框架應用示例

在這個示例中,我們將一個數據流的處理分為4步,在Streams框架中,將每個處理步驟稱為一個Module:

  • Logrec_Reader:從文件中讀取記錄,然後交給下一個步驟。
  • Logrec_Timer:在記錄尾部加上“format_data”
  • Logrec_Suffix:在記錄尾部加上一個後綴——suffix
  • Logrec_Write:將記錄顯示在終端上

Logrec_Reader類

其是ACE_Task的子類,是一個主動對象類,有獨立的控制線程,線程處理函數是svc。在Streams框架中,Logrec_Reader類是一個主動過濾器,代碼如下:

 1 class Logrec_Reader : public ACE_Task<ACE_MT_SYNCH>
 2 {
 3 private:
 4     ifstream fin;  //標準輸入流
 5 public:
 6     Logrec_Reader(ACE_TString logfile)
 7     {
 8         fin.open(logfile.c_str()); //ACE_TString.c_str() 轉換為char
 9     }
10     virtual int open (void *)
11     {
12         return activate();
13     }
14 
15     virtual int svc()
16     {
17         ACE_Message_Block *mblk;
18         int len = 0;
19         const int LineSize = 256;
20         char file_buf[LineSize];
21 
22         while(!fin.eof())
23         {
24             fin.getline(file_buf, LineSize);
25             len = ACE_OS::strlen(file_buf);
26             ACE_NEW_RETURN(mblk, ACE_Message_Block(len + 200), 0);
27             if (file_buf[len - 1] == \r)
28             {
29                 len = len - 1;
30             }
31             mblk->copy(file_buf, len);
32             // 通過put_next函數,將消息傳遞給下一個過濾器
33             put_next(mblk);
34         }
35         //構造一個MB_STOP消息
36         ACE_NEW_RETURN(mblk, ACE_Message_Block (0, ACE_Message_Block::MB_STOP), 0);
37         put_next(mblk);
38         fin.close();
39         ACE_DEBUG((LM_DEBUG, "read svc return. \n"));
40         return 0;
41     }
42 };

Logrec_Timer類

也是ACE_Task的子類,但是它不是主動對象類,沒有創建獨立線程。其實現了put函數,這個函數被它的上一個過濾器(Logrec_Reader)調用,並且數據直接在這個函數中處理。

這裏for循環用於處理消息鏈表,在示例中並沒有使用鏈表因此for循環只會執行一次。

 1 class Logrec_Timer : public ACE_Task<ACE_SYNCH>
 2 {
 3 private:
 4     void format_data(ACE_Message_Block *mblk)
 5     {
 6         char *msg = mblk->data_block()->base();
 7         strcat(msg, "format_data");
 8     }
 9 public:
10     virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *)
11     {
12         for (ACE_Message_Block *temp = mblk;
13             temp != 0; temp = temp->cont())
14         {
15             if (temp->msg_type() != ACE_Message_Block::MB_STOP)
16             {
17                 format_data(temp);
18             }
19         }
20         return put_next(mblk);
21     }
22 };

Logrec_Suffix類

類似Logrec_Timer

 1 class Logrec_Suffix : public ACE_Task<ACE_SYNCH>
 2 {
 3 public:
 4     void suffix(ACE_Message_Block *mblk)
 5     {
 6         char *msg = mblk->data_block()->base();
 7         strcat(msg, "suffix\n");
 8     }
 9     virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *)
10     {
11         for (ACE_Message_Block *temp = mblk;
12             temp != 0; temp = temp->cont())
13         {
14             if (temp->msg_type() != ACE_Message_Block::MB_STOP)
15             {
16                 suffix(temp);
17             }
18         }
19         return put_next(mblk);
20     }
21 };

Logrec_Write類

這裏put函數由上一個過濾器(Logrec_Suffix)調用,其並沒有對數據進行實際處理,只是將數據放入隊列中,由線程獨立處理。

 1 class Logrec_Write : public ACE_Task<ACE_SYNCH>
 2 {
 3 public:
 4     virtual int open(void*)
 5     {
 6         ACE_DEBUG((LM_DEBUG, "Logrec_Writer.\n"));
 7         return activate();
 8     }
 9 
10     virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *to)
11     {
12         return putq(mblk, to);
13     }
14 
15     virtual int svc()
16     {
17         int stop = 0;
18         for (ACE_Message_Block *mb; !stop && getq(mb) != -1;)
19         {
20             if (mb->msg_type() == ACE_Message_Block::MB_STOP)
21             {
22                 stop = 1;
23             }
24             else{
25                 ACE_DEBUG((LM_DEBUG, "%s", mb->base()));
26             }
27             put_next(mb);
28         }
29         return 0;
30     }
31 };

Main

這裏講ACE_Module放入Streams中,ACE_Module才是真正數據處理的Module。ACE_Streams類有兩個數據成員:stream_head_和stream_tail_,它們指向ACE_Module鏈表的首尾。對於每個Streams,其默認帶有首尾兩個Module,而後可以通過push將數據處理的Module放入執行鏈表中。每個Module包含兩個Task,分別為讀Task和寫Task。在示例中僅註冊了寫Task,這些Module和Task通過next指針構成一個有序的串。

這裏註意push有順序要求,最後push即棧頂的為先執行的Module。

 1 int ACE_TMAIN(int argc, ACE_TCHAR *argv[])
 2 {
 3     if (argc != 2)
 4     {
 5         ACE_ERROR_RETURN((LM_ERROR, "usage: %s logfile\n", argv[0]), 1);
 6     }
 7 
 8     ACE_TString logfile (argv[1]);
 9 
10     ACE_Stream<ACE_SYNCH> stream;
11 
12 
13     ACE_Module<ACE_MT_SYNCH> *module[4];
14     module[0] = new ACE_Module<ACE_MT_SYNCH>("Reader", new Logrec_Reader(logfile), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
15     module[1] = new ACE_Module<ACE_MT_SYNCH>("Formatter", new Logrec_Timer(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
16     module[2] = new ACE_Module<ACE_MT_SYNCH>("Separator", new Logrec_Suffix(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
17     module[3] = new ACE_Module<ACE_MT_SYNCH>("Writer", new Logrec_Write(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
18 
19     for ( int i = 3; i >= 0; --i )
20     {
21         if (stream.push(module[i]) == -1)
22         {
23             ACE_ERROR_RETURN((LM_ERROR, "push %s module into stream failed.\n", module[i]->name()), 1);
24         }
25         ACE_DEBUG((LM_DEBUG, "push %s module into stream success. \n", module[i]->name()));
26     }
27     ACE_Thread_Manager::instance()->wait();
28 }

附完整代碼及結果:https://github.com/ShiningZhang/ACE_Learning/tree/master/streams

  1 /*************************************************************************
  2     > File Name: logrec.cpp
  3     > Author: 
  4     > Mail: 
  5     > Created Time: Fri 13 Oct 2017 04:19:39 PM CST
  6  ************************************************************************/
  7  #include <fstream>
  8  #include <ace/Synch.h>
  9  #include <ace/Task.h>
 10  #include <ace/Message_Block.h>
 11  #include <ace/Stream.h>
 12  #include "ace/Thread_Manager.h"
 13  #include <ace/Time_Value.h>
 14  #include <ace/Module.h>
 15 
 16 using namespace std;
 17 
 18 class Logrec_Reader : public ACE_Task<ACE_MT_SYNCH>
 19 {
 20 private:
 21     ifstream fin;  //標準輸入流
 22 public:
 23     Logrec_Reader(ACE_TString logfile)
 24     {
 25         fin.open(logfile.c_str()); //ACE_TString.c_str() 轉換為char
 26     }
 27     virtual int open (void *)
 28     {
 29         return activate();
 30     }
 31 
 32     virtual int svc()
 33     {
 34         ACE_Message_Block *mblk;
 35         int len = 0;
 36         const int LineSize = 256;
 37         char file_buf[LineSize];
 38 
 39         while(!fin.eof())
 40         {
 41             fin.getline(file_buf, LineSize);
 42             len = ACE_OS::strlen(file_buf);
 43             ACE_NEW_RETURN(mblk, ACE_Message_Block(len + 200), 0);
 44             if (file_buf[len - 1] == \r)
 45             {
 46                 len = len - 1;
 47             }
 48             mblk->copy(file_buf, len);
 49             // 通過put_next函數,將消息傳遞給下一個過濾器
 50             put_next(mblk);
 51         }
 52         //構造一個MB_STOP消息
 53         ACE_NEW_RETURN(mblk, ACE_Message_Block (0, ACE_Message_Block::MB_STOP), 0);
 54         put_next(mblk);
 55         fin.close();
 56         ACE_DEBUG((LM_DEBUG, "read svc return. \n"));
 57         return 0;
 58     }
 59 };
 60 
 61 class Logrec_Timer : public ACE_Task<ACE_SYNCH>
 62 {
 63 private:
 64     void format_data(ACE_Message_Block *mblk)
 65     {
 66         char *msg = mblk->data_block()->base();
 67         strcat(msg, "format_data");
 68     }
 69 public:
 70     virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *)
 71     {
 72         for (ACE_Message_Block *temp = mblk;
 73             temp != 0; temp = temp->cont())
 74         {
 75             if (temp->msg_type() != ACE_Message_Block::MB_STOP)
 76             {
 77                 format_data(temp);
 78             }
 79         }
 80         return put_next(mblk);
 81     }
 82 };
 83 
 84 class Logrec_Suffix : public ACE_Task<ACE_SYNCH>
 85 {
 86 public:
 87     void suffix(ACE_Message_Block *mblk)
 88     {
 89         char *msg = mblk->data_block()->base();
 90         strcat(msg, "suffix\n");
 91     }
 92     virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *)
 93     {
 94         for (ACE_Message_Block *temp = mblk;
 95             temp != 0; temp = temp->cont())
 96         {
 97             if (temp->msg_type() != ACE_Message_Block::MB_STOP)
 98             {
 99                 suffix(temp);
100             }
101         }
102         return put_next(mblk);
103     }
104 };
105 
106 class Logrec_Write : public ACE_Task<ACE_SYNCH>
107 {
108 public:
109     virtual int open(void*)
110     {
111         ACE_DEBUG((LM_DEBUG, "Logrec_Writer.\n"));
112         return activate();
113     }
114 
115     virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *to)
116     {
117         return putq(mblk, to);
118     }
119 
120     virtual int svc()
121     {
122         int stop = 0;
123         for (ACE_Message_Block *mb; !stop && getq(mb) != -1;)
124         {
125             if (mb->msg_type() == ACE_Message_Block::MB_STOP)
126             {
127                 stop = 1;
128             }
129             else{
130                 ACE_DEBUG((LM_DEBUG, "%s", mb->base()));
131             }
132             put_next(mb);
133         }
134         return 0;
135     }
136 };
137 
138 int ACE_TMAIN(int argc, ACE_TCHAR *argv[])
139 {
140     if (argc != 2)
141     {
142         ACE_ERROR_RETURN((LM_ERROR, "usage: %s logfile\n", argv[0]), 1);
143     }
144 
145     ACE_TString logfile (argv[1]);
146 
147     ACE_Stream<ACE_SYNCH> stream;
148 
149 
150     ACE_Module<ACE_MT_SYNCH> *module[4];
151     module[0] = new ACE_Module<ACE_MT_SYNCH>("Reader", new Logrec_Reader(logfile), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
152     module[1] = new ACE_Module<ACE_MT_SYNCH>("Formatter", new Logrec_Timer(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
153     module[2] = new ACE_Module<ACE_MT_SYNCH>("Separator", new Logrec_Suffix(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
154     module[3] = new ACE_Module<ACE_MT_SYNCH>("Writer", new Logrec_Write(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
155 
156     for ( int i = 3; i >= 0; --i )
157     {
158         if (stream.push(module[i]) == -1)
159         {
160             ACE_ERROR_RETURN((LM_ERROR, "push %s module into stream failed.\n", module[i]->name()), 1);
161         }
162         ACE_DEBUG((LM_DEBUG, "push %s module into stream success. \n", module[i]->name()));
163     }
164     ACE_Thread_Manager::instance()->wait();
165 }

技術分享

技術分享

test:

技術分享

ACE-Streams架構簡介及應用