西半球最強RPC框架--smf速記
說明
本文是對號稱西半球最強RPC框架smf( https:// github.com/smfrpc/smf )的速寫,花了一點時間簡單翻閱了下其內部實現,簡要記錄與此,以備不時之需。
smf是基於seastar( https:// github.com/scylladb/sea star )實現的高效能RPC框架,seastar由於其複雜性不再這裡多作描述,以後有機會可以專門描述其設計原理。由於使用了seastar,讓我對smf頗感興趣,以後有機會一定在專案中嚐嚐鮮。
資料結構
rpc_letter
struct rpc_letter { ... rpc::header header; std::unordered_mapastar::sstring, seastar::sstring> dynamic_headers; seastar::temporary_buffer<char> body; };
該結構應該是抽象網路傳輸請求,包含header和body。
rpc_envelope
struct rpc_envelope { constexpr static size_t kHeaderSize = sizeof(rpc::header); static seastar::future<> send(seastar::output_stream<char> *out, rpc_envelope req); rpc_letter letter; };
該結構最重要的方法是send() ,用來向對端的stream中傳送請求req。而最重要的成員是letter 。letter中儲存了要傳送的請求的header和body。
work_item
struct work_item { using promise_t = seastar::promise<stdx::optional<rpc_recv_context>>; promise_t pr; uint16_t session{0}; };
在客戶端傳送請求時,每個請求唯一分配一個session id並建立work_item 結構與之對應,然後將<session_id, work_item>插入全域性管理,以便在收到響應時可以快速恢復原始請求的上下文。
work_item結構中最核心的是promise型別的pr,在獲得服務端的響應後,設定promise的值,這樣等待在promise的future的呼叫者便可以繼續往下執行了。
rpc_recv_context
struct rpc_recv_context { static seastar::future<stdx::optional<rpc::header>> parse_header(rpc_connection *conn); static seastar::future<stdx::optional<rpc_recv_context>> parse_payload(rpc_connection *conn, rpc::header hdr); ... seastar::lw_shared_ptr<rpc_connection_limits> rpc_server_limits; const seastar::socket_address remote_address; rpc::header header; seastar::temporary_buffer<char> payload; };
該結構對應了客戶端傳送給服務端請求以及服務端返回給客戶端的響應上下文,其中維護了請求/響應的header、payload、remote server address等資訊。rpc_client返回給應用程式的便是該結構。該結構中兩個比較重要的方法是parse_header() 和parse_payload() ,分別用來解析從server端接收到的響應header與body。
rpc_client
class rpc_client { virtual seastar::future<> connect() final; virtual seastar::future<> reconnect() final; template <typename T> seastar::future<rpc_recv_typed_context<T>> send(rpc_envelope e); private: seastar::lw_shared_ptr<rpc_connection> conn_; std::unordered_map<uint16_t, seastar::lw_shared_ptr<work_item>> rpc_slots_; ... uint16_t session_idx_{0}; };
rpc_client是客戶端向伺服器端通訊的橋樑,它負責建立連線、傳送資料、處理響應等一系列工作,可以說這是一個RPC庫的核心了。其幾個核心成員的意義:
rpc_slots_:儲存發往服務端的請求,以便在收到響應時可以獲取請求的上下文資訊
每次rpc_client傳送客戶端請求(由rpc_envelope 代表)時,都會首先遞增session_idx_ 並根據其建立一個work_item 物件,這唯一對應了發往服務端的請求,並且將該work_item 插入至rpc_client的 rpc_slots 內。
當收到server端的響應時,會根據響應中攜帶的session_id來找到該響應對應的原始請求的work_item 。該處理在rpc_client::process_one_request() 內進行。
在該函式內,首先解析響應header,其次解析響應payload。一旦萬事俱備,接下來便可以通過設定work_item中的promise為ready來通知應用程式響應已經準備好,應用程式便可以繼續處理了。
seastar::future<> rpc_client::process_one_request() { // 解析header return rpc_recv_context::parse_header(conn_.get()).then([this](auto hdr) { if (SMF_UNLIKELY(!hdr)) { ... return seastar::make_ready_future<>(); } // 解析payload return rpc_recv_context::parse_payload(conn_.get(), std::move(hdr.value())) .then([this](stdx::optional<rpc_recv_context> opt) mutable { if (SMF_UNLIKELY(!opt)) { ... return seastar::make_ready_future<>(); } uint16_t sess = opt->session(); auto it = rpc_slots_.find(sess); if (SMF_UNLIKELY(it == rpc_slots_.end())) { ... return seastar::make_ready_future<>(); } --read_counter_; // 設定work_item內的promise的值 // 並將請求對應的work_item從rpc_slots內移除 it->second->pr.set_value(std::move(opt)); rpc_slots_.erase(it); return seastar::make_ready_future<>(); }); }); }
rpc_service
struct rpc_service_method_handle { using fn_t = std::function<seastar::future<rpc_envelope>(rpc_recv_context &&recv)>; // Method name const char *method_name; // Method id,規則是crc32(method_name) const uint32_t method_id; // Method實際執行函式 fn_t apply; }; struct rpc_service { // RPC service name virtual const char *service_name() const = 0; // RPC service id,規則是crc32(service_name) virtual uint32_t service_id() const = 0; // RPC service提供的所有method virtual const std::vector<rpc_service_method_handle> &methods() = 0; // 根據客戶端請求中的request id找到對應的Method virtual rpc_service_method_handle *method_for_request_id(uint32_t idx) = 0; };
rpc_service作為服務端的基類,定義了一個RPC伺服器需要實現的API。每個API含義見註釋。
rpc_server_connection
class rpc_connection final { public: ... seastar::connected_socket socket; const seastar::socket_address remote_address; seastar::input_stream<char> istream; seastar::output_stream<char> ostream; seastar::lw_shared_ptr<rpc_connection_limits> limits; uint32_t istream_active_parser{0}; } // 主要是對rpc_connection的包裝 class rpc_server_connection final { public: ... rpc_connection conn; const uint64_t id; seastar::semaphore serialize_writes{1}; private: rpc_server_connection_options opts_; };
rpc_server_connection 是對客戶端連線的抽象,主要是對rpc_connection 的包裝,server端收到新的連線時還會將該連線新增至自身的開啟連線表中。
服務端最重要的能力是根據客戶端請求找到相應的RPC服務以及服務提供的方法。而每個RPC服務提供的方法由rpc_service_method_handle 類來代表。
查詢的基本原理也很簡單,每個客戶端的請求都會攜帶一個request id,id生成規則是:
// ServiceID: 1969906889 == crc32(ServiceName) // MethodID:3312871568 == crc32(MethodName) // RequestID: 1969906889 ^ 3312871568 req.set_request_id(1969906889, 3312871568);
rpc_server的啟動入口是rpc_server::start()
void rpc_server::start() { ... seastar::listen_options lo; lo.reuse_address = true; listener_ = seastar::listen( seastar::make_ipv4_address( args_.ip.empty() ? seastar::ipv4_addr{args_.rpc_port} : seastar::ipv4_addr{args_.ip, args_.rpc_port}), lo); seastar::keep_doing([this] { return listener_->accept().then( [this, stats = stats_, limits = limits_]( seastar::connected_socket fd, seastar::socket_address addr) mutable { auto conn = seastar::make_lw_shared<rpc_server_connection>( std::move(fd), limits, addr, stats, ++connection_idx_); open_connections_.insert({connection_idx_, conn}); handle_client_connection(conn); }); }) .handle_exception([](std::exception_ptr eptr) { ... } }); }
服務端對每個客戶端發起的connection處理方法是handle_client_connection :
seastar::future<> rpc_server::handle_client_connection( seastar::lw_shared_ptr<rpc_server_connection> conn) { return seastar::do_until( [conn] { return !conn->is_valid(); }, [this, conn]() mutable { return handle_one_client_session(conn); }) .handle_exception([this, conn](auto ptr) { ... }); } seastar::future<> rpc_server::handle_one_client_session( seastar::lw_shared_ptr<rpc_server_connection> conn) { // 解析客戶端請求header return rpc_recv_context::parse_header(&conn->conn) .then([this, conn](stdx::optional<rpc::header> hdr) { ... auto payload_size = hdr->size(); return conn->limits() ->resources_available.wait(payload_size) .then([this, conn, h = hdr.value(), payload_size] { // 解析客戶端請求的payload return rpc_recv_context::parse_payload(&conn->conn, std::move(h)) .then([this, conn, payload_size](auto maybe_payload) { ... // 關鍵是這裡的dispatch_rpc dispatch_rpc(conn, std::move(maybe_payload.value())) ... }); return seastar::make_ready_future<>(); }); }); }); } seastar::future<> rpc_server::dispatch_rpc(seastar::lw_shared_ptr<rpc_server_connection> conn, rpc_recv_context &&ctx) { ... // 根據請求從service內找到method資訊 auto method_dispatch = routes_.get_handle_for_request(ctx.request_id()); return seastar::with_gate( reply_gate_, [this, ctx = std::move(ctx), conn, method_dispatch]() mutable { return stage_apply_incoming_filters(std::move(ctx)) .then([this, conn, method_dispatch](auto ctx) { ... return method_dispatch->apply(std::move(ctx)) .then([this](rpc_envelope e) { return stage_apply_outgoing_filters(std::move(e)); }); }); ); ); } smf::rpc_service_method_handle * rpc_handle_router::get_handle_for_request(const uint32_t &request_id) { for (auto &p : services_) { auto x = p->method_for_request_id(request_id); if (x != nullptr) return x; } return nullptr; }
這裡面主要完成了兩件事:
- 根據請求id找到rpc服務及其內的method,這個呼叫了方法rpc_handle_router::get_handle_for_request
- 呼叫1找到的method的apply方法
在1中,由於不同的service查詢method方法也不一樣,因此,這是與rpc service型別相關的。
如何使用
以自帶的應用程式為例說明smf的實際使用,包括客戶端和服務端。
客戶端
using client_t = smf_gen::demo::SmfStorageClient; using load_gen_t = smf::load_generator<client_t>; // 客戶端真正執行RPC遠端呼叫的邏輯 struct method_callback { seastar::future<> operator()(client_t *c, smf::rpc_envelope &&e) { // Get返回值型別為seastar::future<smf::rpc_recv_ctx_t<Response>> // smf_gen::demo::SmfStorageClient是由smf_gen生成的程式碼 // 內部實現是直接呼叫rpc_client::send() // 該函式負責傳送請求並在內部維護已傳送請求的狀態 // 將請求儲存在map中,並返回一個future給應用程式 // 當收到該請求響應時,設定觸發執行future的then return c->Get(std::move(e)).then([](auto ret) { return seastar::make_ready_future<>(); }); } }; int main(int args, char **argv, char **env) { seastar::distributed<load_gen_t> load; seastar::app_template app; cli_opts(app.add_options()); return app.run(args, argv, [&] { seastar::engine().at_exit([&] { return load.stop(); }); auto &cfg = app.configuration(); ::smf::load_generator_args largs( cfg["ip"].as<std::string>().c_str(), cfg["port"].as<uint16_t>(), cfg["req-num"].as<uint32_t>(), cfg["concurrency"].as<uint32_t>(), ...); return load.start(std::move(largs)) .then([&load] { ... }) .then([&load] { return load.invoke_on_all([](load_gen_t &server) { load_gen_t::generator_cb_t gen = generator{}; load_gen_t::method_cb_t method = method_callback{}; return server.benchmark(gen, method).then([](auto test) { ... return seastar::make_ready_future<>(); }); }); }) .then([&load] { ... }) .then([] { return seastar::make_ready_future<int>(0); }); }); }