Thrift 基礎(C++ rpc )
一、thrift簡介
thrift是Facebook開源的一套rpc框架,目前被許多公司使用
我理解的特點
- 使用IDL語言生成多語言的實現程式碼,程式員只需要實現自己的業務邏輯
- 支援序列化和反序列化操作,底層封裝協議,傳輸模組
- 以同步rpc呼叫為主,使用libevent evhttp支援http形式的非同步呼叫
- rpc服務端執行緒安全,客戶端大多數非執行緒安全
- 相比protocol buffer效率差些,protocol buffer不支援rpc,需要自己實現rpc擴充套件,目前有grpc可以使用
由於thrift支援序列化和反序列化,並且支援rpc呼叫,其程式碼風格較好並且使用方便,對效率要求不算太高的業務,以及需要rpc的場景,可以選擇thrift作為基礎庫
層次圖:

二、編譯(thrift for c++ && centos7)
1、官網獲取原始碼包 thrift-0.11.0.tar.gz 解壓
tar zxvf thrift-0.11.0.tar.gz
2、安裝依賴
yum -y install automake libtool flex bison pkgconfig gcc-c++ boost-devel libevent-devel zlib-devel python-devel ruby-devel openssl-devel
3、編譯boost
使用boost_1_63_0.tar.gz
./bootstrap.sh ./b2
4、編譯thrift
原始碼根目錄執行
./configure && make sudo make install
5、驗證安裝
thrift -version 顯示 Thrift version 0.11.0
三、編寫使用IDL編寫.thrift檔案
這裡給出一個thrift的IDL基本語法列表,詳細用法可以去官網查詢
namespace cpp thrift.Test //typedef 用法 typedef i32 MyInt32; typedef string MyString; typedef i32 UserId; //struct 結構定義 struct TypedefTestStruct { 1: MyInt32 field_MyInt32; 2: MyString field_MyString; 3: i32 field_Int32; 4: string filed_string; } //enum 列舉定義 enum Numberz { ONE = 1, TWO, THREE, FIVE = 5, SIX, EIGHT = 8 } //const 用法 const Numberz myNumberz = Numberz.ONE; struct Bonk { 1: string message, 2: i32 type } //型別巢狀 struct Xtruct { 1: string string_thing, 2: i8 byte_thing, 3: i32 i32_thing, 4: i64 i64_thing } struct Xtruct2 { 1: i8 byte_thing, 2: Xtruct struct_thing, 3: i32 i32_thing } //支援map list set型別分別對應C++中的 map = stl::map list = stl::vector set = stl::set typedef map<string, Bonk> MapType struct Insanity { 1: map<Numberz, UserId> userMap; 2: list<Xtruct> xtructs; } struct CrazyNesting { 1: string string_field, 2: optional set<Insanity> set_field; 3: required list<map<set<i32>, map<i32,set<list<map<Insanity,string>>>>>> list_field, 4: binary binary_field } //union用法 union SomeUnion { 1: map<NumberZ, UserId> map_thing, 2: string string_thing, 3: i32 i32_thing, 4: Xtruct3 xtruct_thing, 5: Insanity insanity_thing } //exception 異常 exception Xception { 1: i32 errorCode, 2: string message } exception Xception2 { 1: i32 errorCode, 2: Xtruct struct_thing } // empty struct struct EmptyStruct{} struct OneField { 1: EmptyStruct field; } //service 定義的一組rpc服務,一般是抽象出來的介面呼叫 service ThriftTest { void testVoid(), string testString(1: string thing), bool testBool(1: bool thing), i8 testByte(1: i8 thing), i32 testI32(1: i32 thing), i64 testI64(1: i64 thing), Xtruct testStruct(1: Xtruct thing), Xtruct2 testNest(1: Xtruct2 thing), map<string, string> testStringMap(1: map<string, string> thing), set<i32> testSet(1: set<i32> thing), list<i32> testList(1: list<i32> thing), Numberz testEnum(1: Numberz thing), map<i32, map<i32,i32>> testMapMap(1: i32 hello), map<UserId, map<Numberz,Insanity>> testInsanity(1: Insanity argument), Xtruct testMulti(1: i8 arg0, 2: i32 arg1, 3: i64 arg2, 4: map<i16, string> arg3, 5: Numberz arg4, 6: UserId arg5), void testException(1: string arg) throws(1: Xception err1), Xtruct testMultiException(1: string arg0, 2: string arg1) throws(1: Xception err1, 2: Xception2 err2), oneway void testOneway(1:i32 secondsToSleep) }
四、使用thrift檔案生成C++程式碼
1、生成同步呼叫的C++程式碼
thrift -r --gen cpp xxx.thrift
2、生成非同步呼叫的C++程式碼(同時同步呼叫的程式碼也被生成)
thrift --gen cpp:cob_style xxx.thrift
五、thrfit同步呼叫
1、StressTest.thrift檔案
namespace cpp test.stress service Service { void echoVoid(), i8 echoByte(1: i8 arg), i32 echoI32(1: i32 arg), i64 echoI64(1: i64 arg), string echoString(1: string arg), list<i8>echoList(1: list<i8> arg), set<i8>echoSet(1: set<i8> arg), map<i8, i8>echoMap(1: map<i8, i8> arg), }
2、使用thrift -r --gen cpp StressTest.thrift 生成程式碼
gen-cpp目錄有
StressTest_types.h StressTest_types.cpp StressTest_constants.h StressTest_constants.cpp Service.h Service.cpp Service_server.skeleton.cpp
生成
StressTest_types.h StressTest_constants.h 為相關型別定義檔案
Service_server.skeleton為服務端需要的實現檔案
3、程式碼實現
服務端:
#include <thrift/concurrency/ThreadManager.h> #include <thrift/concurrency/PlatformThreadFactory.h> #include <thrift/concurrency/Thread.h> #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/server/TNonblockingServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TNonblockingServerSocket.h> #include <thrift/transport/TNonblockingServerTransport.h> #include <thrift/transport/TBufferTransports.h> #include "Service.h" using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using namespace ::apache::thrift::server; using namespace::test::stress; class ServiceHandler : virtual public ServiceIf { public: ServiceHandler() { } void echoVoid() { // Your implementation goes here printf("echoVoid\n"); } int8_t echoByte(const int8_t arg) { printf("echoByte %c\n", arg); return arg; } int32_t echoI32(const int32_t arg) { printf("echoI32\n"); return arg; } int64_t echoI64(const int64_t arg) { printf("echoI64\n"); return arg; } void echoString(std::string& _return, const std::string& arg) { printf("echoString\n"); } void echoList(std::vector<int8_t> & _return, const std::vector<int8_t> & arg) { printf("echoList\n"); } void echoSet(std::set<int8_t> & _return, const std::set<int8_t> & arg) { printf("echoSet\n"); } void echoMap(std::map<int8_t, int8_t> & _return, const std::map<int8_t, int8_t> & arg) { printf("echoMap\n"); } }; int main(int argc, char **argv) { int port = 9090; stdcxx::shared_ptr<ServiceHandler> handler(new ServiceHandler()); stdcxx::shared_ptr<TProcessor> processor(new ServiceProcessor(handler)); stdcxx::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); stdcxx::shared_ptr<TNonblockingServerTransport> serverTransport(new TNonblockingServerSocket(port)); stdcxx::shared_ptr<PlatformThreadFactory> threadFactory = std::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()); stdcxx::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(10); threadManager->threadFactory(threadFactory); threadManager->start(); stdcxx::shared_ptr<TNonblockingServer> server(new TNonblockingServer(processor, protocolFactory, serverTransport, threadManager)); server->serve(); return 0; }
我們需要實現ServiceHandler繼承ServiceIf的相關介面,ServiceHandler是負責相關rpc呼叫業務的功能實現,
thrift伺服器模型基本模型有四種、SimpleServer ThreadedServer ThreadPoolServer NoBlockingServer
SimpleServer 簡單的單執行緒模型
ThreadedServer 一個執行緒一個連線
ThreadPoolServer 執行緒池
NoBlockingServer 基於libevent的IO複用模型 libevent在linux平臺是基於epoll的reactor模型
還有一個非同步Server模型TEvhttpServer 基於libevent的evhttp
這裡服務端使用了非阻塞epoll實現的thrift服務端模型
客戶端:
#include <iostream> #include <string> #include <thrift/transport/TTransportUtils.h> #include <thrift/transport/TSocket.h> #include <thrift/protocol/TBinaryProtocol.h> #include "Service.h" using namespace::test::stress; using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; int main() { ::apache::thrift::stdcxx::shared_ptr<TSocket> socket(new TSocket("localhost", 9090)); ::apache::thrift::stdcxx::shared_ptr<TTransport> transport(new TFramedTransport(socket)); ::apache::thrift::stdcxx::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); ServiceClient client(protocol); transport->open(); std::cout << "client echoByte byte=" << client.echoByte('A') << std::endl; std::cout << "send_echoByte('B')" << std::endl; client.send_echoByte('B'); std::cout << "send_echoByte('C')" << std::endl; client.send_echoByte('C'); std::cout << "recv_echoByte()" << client.recv_echoByte() << std::endl; std::cout << "recv_echoByte()" << client.recv_echoByte() << std::endl; transport->close(); return 0; }
客戶端使用則比較簡單,Service.h定義了相關介面,ServiceClient則是rpc客戶類
TTransport new TFramedTransport(socket) 這裡建立基於socket的傳輸層
TProtocol 協議層,序列化後的資料儲存方式,這裡以TBinaryProtocol 二進位制儲存
六、thrift非同步呼叫
1、thrift檔案同同步呼叫一致
2、使用thrift --gen cpp:cob_style StressTest.thrift 生成程式碼
StressTest_types.h StressTest_types.cpp StressTest_constants.h StressTest_constants.cpp Service.h Service.cpp Service_server.skeleton.cpp Service_async_server.skeleton.cpp
Service_server.skeleton.cpp 同步程式碼用不到
Service_async_server.skeleton.cpp則為http的非同步實現
服務端:
#include <thrift/protocol/TBinaryProtocol.h> #include <thrift/async/TAsyncProtocolProcessor.h> #include <thrift/async/TEvhttpServer.h> #include <event.h> #include <evhttp.h> #include <iostream> #include "Service.h" using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using namespace ::apache::thrift::async; using namespace::test::stress; class ServiceHandler : virtual public ServiceIf { public: ServiceHandler() { } void echoVoid() { printf("echoVoid\n"); } int8_t echoByte(const int8_t arg) { printf("echoByte %c\n", arg); return arg; } int32_t echoI32(const int32_t arg) { printf("echoI32\n"); return arg; } int64_t echoI64(const int64_t arg) { printf("echoI64\n"); return arg; } void echoString(std::string& _return, const std::string& arg) { printf("echoString %s\n", arg.c_str()); _return = arg; } void echoList(std::vector<int8_t> & _return, const std::vector<int8_t> & arg) { printf("echoList\n"); } void echoSet(std::set<int8_t> & _return, const std::set<int8_t> & arg) { printf("echoSet\n"); } void echoMap(std::map<int8_t, int8_t> & _return, const std::map<int8_t, int8_t> & arg) { printf("echoMap\n"); } }; class ServiceAsyncHandler : public ServiceCobSvIf { public: ServiceAsyncHandler() { syncHandler_ = std::auto_ptr<ServiceHandler>(new ServiceHandler); // Your initialization goes here } virtual ~ServiceAsyncHandler(){} void echoVoid(::apache::thrift::stdcxx::function<void()> cob) { syncHandler_->echoVoid(); return cob(); } void echoByte(::apache::thrift::stdcxx::function<void(int8_t const& _return)> cob, const int8_t arg) { int8_t _return = 0; _return = syncHandler_->echoByte(arg); return cob(_return); } void echoI32(::apache::thrift::stdcxx::function<void(int32_t const& _return)> cob, const int32_t arg) { int32_t _return = 0; _return = syncHandler_->echoI32(arg); return cob(_return); } void echoI64(::apache::thrift::stdcxx::function<void(int64_t const& _return)> cob, const int64_t arg) { int64_t _return = 0; _return = syncHandler_->echoI64(arg); return cob(_return); } void echoString(::apache::thrift::stdcxx::function<void(std::string const& _return)> cob, const std::string& arg) { std::string _return; syncHandler_->echoString(_return, arg); return cob(_return); } void echoList(::apache::thrift::stdcxx::function<void(std::vector<int8_t>const& _return)> cob, const std::vector<int8_t> & arg) { std::vector<int8_t>_return; syncHandler_->echoList(_return, arg); return cob(_return); } void echoSet(::apache::thrift::stdcxx::function<void(std::set<int8_t>const& _return)> cob, const std::set<int8_t> & arg) { std::set<int8_t>_return; syncHandler_->echoSet(_return, arg); return cob(_return); } void echoMap(::apache::thrift::stdcxx::function<void(std::map<int8_t, int8_t>const& _return)> cob, const std::map<int8_t, int8_t> & arg) { std::map<int8_t, int8_t>_return; syncHandler_->echoMap(_return, arg); return cob(_return); } protected: std::auto_ptr<ServiceHandler> syncHandler_; }; int main() { ::apache::thrift::stdcxx::shared_ptr<ServiceAsyncProcessor> asynProcessor(new ServiceAsyncProcessor( ::apache::thrift::stdcxx::shared_ptr<ServiceCobSvIf>(new ServiceAsyncHandler()))); ::apache::thrift::stdcxx::shared_ptr<TAsyncProtocolProcessor> asynProtocolProcessor(new TAsyncProtocolProcessor(asynProcessor, ::apache::thrift::stdcxx::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()))); TEvhttpServer server(asynProtocolProcessor, 9999); server.serve(); return 0; }
這裡實現ServiceHandler的相關業務介面即可實現rpc服務端的相關功能
客戶端:
#include "Service.h" #include <string> #include <iostream> #include <unistd.h> #include <thrift/concurrency/ThreadManager.h> #include <thrift/concurrency/PlatformThreadFactory.h> #include <thrift/concurrency/Thread.h> #include <thrift/async/TAsyncChannel.h> #include <thrift/async/TEvhttpClientChannel.h> #include <thrift/transport/TSocket.h> #include <thrift/transport/TTransportUtils.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/protocol/TProtocol.h> #include <event.h> using namespace::apache::thrift; using namespace::apache::thrift::transport; using namespace::apache::thrift::protocol; using namespace::apache::thrift::async; using namespace::apache::thrift::concurrency; using namespace::test::stress; class MyClient : public ServiceCobClient { public: MyClient(stdcxx::shared_ptr<TAsyncChannel> channel, TProtocolFactory* protocolFactory) : ServiceCobClient(channel, protocolFactory) { } virtual ~MyClient(){} virtual void completed__(bool success) { if (success) { std::cout << "completed" << std::endl; } else { std::cout << "completed failed" << std::endl; } } void my_send_byte() { std::cout << "begin my_send_byte" << std::endl; stdcxx::function<void(ServiceCobClient*)> cob = stdcxx::bind(&MyClient::recv_byte_callback, this, stdcxx::placeholders::_1); echoByte(cob, 'A'); std::cout << "end my_send_byte" << std::endl; } void my_send_string() { std::cout << "begin my_send_string" << std::endl; stdcxx::function<void(ServiceCobClient*)> cob = stdcxx::bind(&MyClient::recv_string_callback, this, stdcxx::placeholders::_1); echoString(cob, "test asynclient"); std::cout << "end my_send_string" << std::endl; } void recv_byte_callback(ServiceCobClient* client) { std::cout << "recv_byte_callback" << std::endl; _res_byte = recv_echoByte(); std::cout << "_res_byte =" << _res_byte << std::endl; } void recv_string_callback(ServiceCobClient* client) { std::cout << "recv_string_callback" << std::endl; recv_echoString(_res_string); std::cout << "_res_string=" << _res_string << std::endl; } private: char _res_byte; std::string _res_string; }; class ClientThread : public Runnable { public: ClientThread(event_base* base, std::string & host, int port) : _base(base), _host(host), _port(port) { } virtual ~ClientThread(){} virtual void run() { stdcxx::shared_ptr<TAsyncChannel>channel1(new TEvhttpClientChannel(_host, "/", _host.c_str(), _port, _base)); stdcxx::shared_ptr<TAsyncChannel>channel2(new TEvhttpClientChannel(_host, "/", _host.c_str(), _port, _base)); MyClient client1(channel1, new TBinaryProtocolFactory()); MyClient client2(channel2, new TBinaryProtocolFactory()); client1.my_send_byte(); client1.my_send_string(); client2.my_send_byte(); client2.my_send_string(); while (1) { client1.my_send_byte(); sleep(1); } } protected: private: event_base* _base; std::string _host; int _port; }; int main() { std::string host = "192.168.119.129"; int port = 9999; event_base* base = event_base_new(); stdcxx::shared_ptr<PlatformThreadFactory> threadFactory = std::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()); stdcxx::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(10); threadManager->threadFactory(threadFactory); threadManager->start(); stdcxx::shared_ptr<Thread> thread = threadFactory->newThread(std::shared_ptr<ClientThread>(new ClientThread(base, host, port))); thread->start(); event_base_dispatch(base); event_base_free(base); return 0; }
客戶端則實現了MyClient,MyClient繼承公共的rpc服務介面,提供了非同步回撥的recv_byte_callback,recv_string_callback函式, ClientThread的執行緒函式的實現則對MyClient非同步客戶端進了測試
七、簡單總結
通過這兩天的學習,簡單總結一下這個庫
1、thrift的C++程式碼實現很漂亮,很規範,適合學習閱讀
2、thrift可以滿足很多基本的rpc呼叫場景
3、本文只是簡單寫了thrift的用法,想深入瞭解這個庫的,其內部實現還是需要化時間好好研究
作者 [@karllen][3]
2018 年 09月 15日
QQ群: 347769318