Thrift實踐(C++)
文章簡介
運用Thrift的TNonblockingServer編寫C++實踐案例。
Thrift服務端程式設計
從Thrift-0.12.0版本的C++原始碼來看服務端程式設計主要有:多執行緒模型,一個新的客戶端連線建立一個執行緒處理;多執行緒執行緒池模型,將新的客戶端連線放入任務佇列中由執行緒池讀取處理;事件驅動非同步模型,註冊監聽事件和可讀事件,將客戶端的資料放入任務佇列中由執行緒池進行處理。事件驅動非同步模型更適合運用在網際網路大量使用者的場景中,也就是TNonblockingServer。
Thrift中TNonblockingServer的設計和實現
圖1-1 TNonblockingServer的實現
如圖1-1所示,當一個新的客戶端連線accept時,會new一個TConnection物件,通過輪詢選擇演算法傳送給IOThread執行緒池,IOThread接收到客戶端發來的請求資料時,將它打包成Task放入任務佇列中,再由任務處理執行緒池Thread從任務佇列pop出來呼叫服務端實現的業務邏輯函式處理。下面以使用者註冊的業務場景為例,實現一個具體的TNonblockingServer案例。
Thrift介面檔案編寫
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
// 檔名稱 user.thrift namespace cpp thrift . user enum EUserType { QQ = 1 , WECHAT = 2 } struct UserRegisterReq { 1 : required string sUserName ; 2 : required string sMd5Pwd ; 3 : string sPhoneNumber ; 4 : optional i32 iUserAge ; } struct UserRegisterResp { 1 : required i32 iUserId ; 2 : required bool bSucc ; 3 : optional i32 iRetCode ; 4 : optional string sErrMsg ; } service UserService { UserRegisterResp UserRegister ( 1 : EUserType eType , 2 : UserRegisterReq oReq ) } |
執行thrift -r --gen cpp user.thrift,生成服務端和客戶端RPC介面檔案。其中required是必傳欄位,optional是可選欄位。
服務端程式碼實現
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
// 檔名稱 nonsocketserver.cpp #include <stdlib.h> #include <iostream> #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/transport/TNonblockingServerSocket.h> #include <thrift/server/TNonblockingServer.h> #include <thrift/concurrency/PlatformThreadFactory.h> #include <thrift/concurrency/ThreadManager.h> #include "gen-cpp/UserService.h" namespace thrift { namespace user { class UserServiceHandler : virtual public UserServiceIf { public : UserServiceHandler () {} virtual void UserRegister ( UserRegisterResp & _return , const EUserType :: type eType , const UserRegisterReq & oReq ) { std :: cout << "etype=" << eType << " oReq=" << oReq << " oResp=" << _return << std :: endl ; } }; } //user } //thrift int main ( int argc , char * argv []) { using namespace :: apache :: thrift ; using namespace :: apache :: thrift :: concurrency ; using namespace :: apache :: thrift :: protocol ; using namespace :: apache :: thrift :: transport ; using namespace :: apache :: thrift :: server ; using namespace :: thrift :: user ; int iIOThreadNum = 0 , iTaskThreadNum = 0 ; if ( 3 == argc ) { iIOThreadNum = :: strtol ( argv [ 1 ], NULL , 10 ); iTaskThreadNum = :: strtol ( argv [ 2 ], NULL , 10 ); } int port = 9090 ; // 業務邏輯 stdcxx :: shared_ptr < UserServiceHandler > testHandler ( new UserServiceHandler ()); // 業務介面邏輯 stdcxx :: shared_ptr < TProcessor > testProcessor ( new UserServiceProcessor ( testHandler )); // 報文協議序列化 protocol stdcxx :: shared_ptr < TProtocolFactory > protocolFactory ( new TBinaryProtocolFactoryT < TBufferBase >()); // 監聽埠 transport stdcxx :: shared_ptr < TNonblockingServerTransport > nbSocket ( new transport :: TNonblockingServerSocket ( port )); // 建立非阻塞 I/O 服務 server stdcxx :: shared_ptr < TNonblockingServer > nonblockingServer ( new TNonblockingServer ( testProcessor , protocolFactory , nbSocket )); nonblockingServer -> setNumIOThreads ( iIOThreadNum ); // 多執行緒 —— 連線處理 // 創 建多執行緒任務處理 if ( iTaskThreadNum > 0 ) { stdcxx :: shared_ptr < ThreadManager > pThreadManager = ThreadManager :: newSimpleThreadManager ( iTaskThreadNum ); // 多執行緒 —— 任務處理 pThreadManager -> threadFactory ( stdcxx :: shared_ptr < PlatformThreadFactory >( new PlatformThreadFactory )); pThreadManager -> start (); nonblockingServer -> setThreadManager ( pThreadManager ); } // 服務啟動 nonblockingServer -> serve (); // 服務停止 nonblockingServer -> stop (); return 0 ; } |
客戶端程式碼實現
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// 檔名稱 client.cpp #include <iostream> #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/transport/TSocket.h> #include <thrift/transport/TBufferTransports.h> #include "gen-cpp/UserService.h" int main ( int argc , char ** argv ) { using namespace :: apache :: thrift ; using namespace :: thrift :: user ; //I/O transport stdcxx :: shared_ptr < transport :: TSocket > socket ( new transport :: TSocket ( "127.0.0.1" , 9090 )); // 資料讀取方式 transport stdcxx :: shared_ptr < transport :: TTransport > transport ( new transport :: TFramedTransport ( socket )); // 報文協議序列化 protocol stdcxx :: shared_ptr < protocol :: TProtocol > protocol ( new protocol :: TBinaryProtocol ( transport )); UserServiceClient client ( protocol ); transport -> open (); EUserType :: type eType = EUserType :: QQ ; UserRegisterReq oReq ; UserRegisterResp oResp ; client . UserRegister ( oResp , eType , oReq ); transport -> close (); std :: cout << "eType=" << eType << " oReq=" << oReq << " oReps=" << oResp << std :: endl ; return 0 ; } |
編譯和執行
//編譯服務端程式和客戶端程式
g++ -c -o nonsocketserver.o nonsocketserver.cpp
g++ -c -o UserService.o gen-cpp/UserService.cpp
g++ -c -o user_types.o gen-cpp/user_types.cpp
g++ -g -Wall -o server nonsocketserver.o UserService.o user_types.o -lthrift -lthriftnb
g++ -c -o client.o client.cpp
g++ -g -Wall -o client client.o UserService.o user_types.o -lthrift -lthriftnb
//執行服務端
./server
Thrift: Mon Jan 28 00:01:38 2019 TNonblockingServer: Serving with 1 io threads.
Thrift: Mon Jan 28 00:01:38 2019 TNonblockingServer: using libevent 2.1.8-stable method epoll
Thrift: Mon Jan 28 00:01:38 2019 TNonblocking: IO thread #0 registered for listen.
Thrift: Mon Jan 28 00:01:38 2019 TNonblocking: IO thread #0 registered for notify.
Thrift: Mon Jan 28 00:01:38 2019 TNonblockingServer: IO thread #0 entering loop...
etype=QQ oReq=UserRegisterReq(sUserName=, sMd5Pwd=, sPhoneNumber=, iUserAge=<null>) oResp=UserRegisterResp(iUserId=0, bSucc=0, iRetCode=<null>, sErrMsg=<null>)
//執行客戶端
./client
eType=QQ oReq=UserRegisterReq(sUserName=, sMd5Pwd=, sPhoneNumber=, iUserAge=<null>) oReps=UserRegisterResp(iUserId=0, bSucc=0, iRetCode=<null>, sErrMsg=<null>)
參考文獻
[1] Mark Slee, Aditya Agarwal and Marc Kwiatkowski. Thrift: Scalable Cross-Language Services Implementation. 2007
[2] Apache Thrift官方網站, http:// thrift.apache.org
[3] Apache Thrift原始碼, https:// github.com/apache/thrift
[4] Lex & Yacc 官方網站, http:// dinosaur.compilertools.net