1. 程式人生 > >利用thrift實現一個非阻塞帶有回撥機制的客戶端

利用thrift實現一個非阻塞帶有回撥機制的客戶端

客戶端有時需要非阻塞的去傳送請求,給定服務端一個請求,要求其返回一個計算結果。但是客戶端不想等待服務端處理完,而是想傳送完這個指令後自己去做其他事情,當結果返回時自動的去處理。

  比如舉個形象點的例子:飯店的Boss讓小弟A把本週店裡的欠條收集起來放到自己桌子上,然後又告訴自己的小祕書坐在自己辦公室等著小弟A把欠條拿過來,然後統計一下一共有多少,然後Boss自己出去半點事兒。

  Boss相當於client,小弟A相當於server,而小祕書相當於client端的回撥函式(callback)。怎麼講呢?Boss不想等待小弟處理完,因為他老人家公務繁忙,還要去幹別的呢。於是他把接下來處理欠條的任務託管給了小祕書,於是自己一個人出去了。

  OK,那麼我們基本瞭解了整個工作流程,來看看實現的方法。thrift去實現client非同步+回撥的方法關鍵點在於:thrift生成的client中有個send_XXX()和recv_XXX()方法。send_XXX()相當於告知server去處理東西,可以立即返回;而呼叫recv_XXX就是個阻塞的方法了,直到server返回結果。所以,我們可以在主執行緒呼叫完send_XXX()之後,然後另開一個執行緒去呼叫send_XXX(),該執行緒在等到server回覆後自動呼叫callback方法,對結果進行一些處理(當然callback在修改client狀態時需要進行同步操作)。這樣的模式下,我們可以做很多事情,比如分散式環境下的觀察者模式

。當然了需要注意的一點就是,各個執行緒接受到結果的順序跟請求順序不一定一樣,因為server處理不通請求時間不通或者網路環境的影響都可能導致這種情形。所以如果你對接受這些結果時不是冪等操作時需要注意一下。

thrift指令碼:

1 2 3 4 //只有一個方法,client傳送一個訊息,server換回一個訊息 service TestServ{ string ping(1: string message), }

server端採用TNBlockingServer實現

複製程式碼
 1 #include "TestServ.h"
 2 
 3 #include <iostream>
 4
5 #include <thrift/protocol/TBinaryProtocol.h> 6 #include <thrift/server/TNonblockingServer.h> 7 #include <thrift/transport/TServerSocket.h> 8 #include <thrift/transport/TBufferTransports.h> 9 #include <thrift/concurrency/PosixThreadFactory.h> 10 11 using namespace std; 12 13 using namespace ::apache::thrift; 14 using namespace ::apache::thrift::protocol; 15 using namespace ::apache::thrift::transport; 16 using namespace ::apache::thrift::server; 17 using namespace ::apache::thrift::concurrency; 18 19 using boost::shared_ptr; 20 21 class TestServHandler : virtual public TestServIf { 22 public: 23 TestServHandler() { 24 // Your initialization goes here 25 } 26 27 void ping(std::string& _return, const std::string& message) { 28 _return = "hello, i am server! "; 29 sleep(3);// do something time-consuming/ 這裡我們在server端加一些耗時的操作 30 cout<<"Request from client: "<<message<<endl; 31 } 32 33 }; 34 35 int main(int argc, char **argv) { 36 int port = 9090; 37 38 shared_ptr<TestServHandler> handler(new TestServHandler()); 39 shared_ptr<TProcessor> processor(new TestServProcessor(handler)); 40 shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); 41 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(15); 42 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory > (new PosixThreadFactory()); 43 threadManager->threadFactory(threadFactory); 44 threadManager->start(); 45 TNonblockingServer server(processor, protocolFactory, port, threadManager); 46 server.serve(); 47 return 0; 48 }
複製程式碼

client端實現:

複製程式碼
 1 #include "TestServ.h"
 2 
 3 #include <iostream>
 4 #include <thrift/protocol/TBinaryProtocol.h>
 5 #include <thrift/transport/TSocket.h>
 6 #include <thrift/transport/TBufferTransports.h>
 7 
 8 #include "test_constants.h"
 9 
10 using namespace std;
11 using namespace ::apache::thrift;
12 using namespace ::apache::thrift::protocol;
13 using namespace ::apache::thrift::transport;
14 using boost::shared_ptr;
15 
16 class AsynTestClient;
17 void * wait_recv(void * parg );
18 struct PARG {
19     AsynTestClient * pthis;
20     string message;
21 };
22 
23 class AsynTestClient {
24 private:
25     unsigned int d_cnt_recv;//< 客戶端接受到server響應次數的計數器.
26 
27     pthread_rwlock_t m_cnt_recv;//< 計數器的讀寫鎖.
28     vector<pthread_t> m_ids;
29 
30 public:
31     TestServClient * d_client;
32     void call_back(string & _return){
33     //輸出伺服器返回資訊並把返回計數加1
34     cout<<"server msg: "<<_return<<endl;
35     pthread_rwlock_wrlock( &m_cnt_recv );
36     d_cnt_recv ++;
37     pthread_rwlock_unlock( &m_cnt_recv );
38     }
39     explicit AsynTestClient(boost::shared_ptr<TProtocol> & protocol){
40     pthread_rwlock_init( &m_cnt_recv, NULL );
41     d_cnt_recv = 0;
42     d_client = new TestServClient( protocol );
43     }
44 
45     ~AsynTestClient(){
46     delete d_client;
47     pthread_rwlock_destroy( &m_cnt_recv );
48     }
49 
50     void asyn_ping( const string & message) {
51     //傳送請求
52     d_client->send_ping(message);
53     //初始化每個等待回撥執行緒的引數
54     PARG * parg = new PARG;
55     parg->pthis = this;
56     parg->message = message;
57     //把新生成的執行緒id放入全域性陣列維護
58     pthread_t m_id;
59     m_ids.push_back(m_id);
60     //啟動執行緒,從此只要接受到伺服器的返回結果就呼叫回撥函式。
61     if( 0 != pthread_create( &m_id, NULL, wait_recv, reinterpret_cast< void * > (parg) ) ) {
62         return;
63     }
64     }
65 };
66 int main(int argc, char **argv) {
67 
68     boost::shared_ptr<TSocket> socket(new TSocket("localhost", 9090));  
69     boost::shared_ptr<TTransport> transport(new TFramedTransport(socket));  
70     boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));  
71 
72     //TestServClient client(protocol);  
73 
74     transport->open();  
75     AsynTestClient client(protocol);
76     string message = "hello, i am client! ";
77     client.asyn_ping(message);
78 
79     while(true){
80     sleep(1);//這裡相當於client去做別的事情了
81     }
82 
83     transport->close();  
84     return 0;  
85 }
86 void * wait_recv(void * parg ) {
87     PARG * t_parg = reinterpret_cast< PARG * >(parg);//強制轉化執行緒引數
88     string _return;
89     t_parg->pthis->d_client->recv_ping(_return);
90     t_parg->pthis->call_back(_return);
91 }
複製程式碼

  其實大家可以注意到,我並沒有使用asyn_ping(const string & message, void(*)call_back(void));這種方式去定義它,這是因為asyn_ping本身可以獲取callback函式的指標。回撥的本質是任務的託管、時間的複用,也就是說等待結果返回後自動去呼叫一段程式碼而已,所以本質上上面就是回撥機制。如果你想使用傳函式指標的方式,也可以實現出來。

  注意:編譯時需要-L$(LIB_DIR) -lthrift -lthriftnb -levent。