深入解讀Service Mesh的資料面Envoy
此文已由作者劉超授權網易雲社群釋出。
歡迎訪問 ofollow,noindex" target="_blank"> 網易雲社群 ,瞭解更多網易技術產品運營經驗。
但是對於資料面的關鍵元件Envoy沒有詳細解讀,這篇文章補上。
一、Envoy的工作模式
Envoy的工作模式如圖所示,橫向是管理平面。
Envoy會暴露admin的API,可以通過API檢視Envoy中的路由或者叢集的配置。
例如通過curl http://127.0.0.1:15000/routes可以檢視路由的配置,結果如下圖,請記住路由的配置層級,後面在程式碼中會看到熟悉的資料結構。
routes下面有virtual_hosts,裡面有帶prefix的route,裡面是route entry,裡面是weight cluster,對於不同的cluster不同的路由權重,再裡面是cluster的列表,有cluster的名稱和權重。
再如通過curl http://127.0.0.1:15000/clusters可以得到叢集也即cluster的配置,這裡面是真正cluster的資訊。
在另外一面,Envoy會呼叫envoy API去pilot裡面獲取路由和叢集的配置,envoy API的詳情可以檢視https://www.envoyproxy.io/docs/envoy/v1.8.0/api-v2/http_routes/http_routes中的API文件,可以看到route的配置詳情,也是按照上面的層級組織的。
當Envoy從pilot獲取到路由和叢集資訊之後,會儲存在記憶體中,當有個客戶端要連線後端的時候,客戶端處於Downstream,後端處於Upstream,當資料從Downstream流向Upstream的時候,會通過Filter,根據路由和叢集的配置,選擇後端的應用建立連線,將請求轉發出去。
接下來我們來看Envoy是如何實現這些的。
二、Envoy的關鍵資料結構的建立
Envoy的啟動會在main函式中建立Envoy::MainCommon,在它的建構函式中,會呼叫父類MainCommonBase的建構函式,建立Server::InstanceImpl,接下來呼叫InstanceImpl::initialize(...)
接下來就進入關鍵的初始化階段。
載入初始化配置,裡面配置了Listener Discover Service, Router Discover Service, Cluster Discover Service等。
InstanceUtil::loadBootstrapConfig(bootstrap_, options);
建立AdminImpl,從而可以接受請求介面
admin_.reset(new AdminImpl(initial_config.admin().accessLogPath(), initial_config.admin().profilePath(), *this));
建立ListenerManagerImpl,用於管理監聽,因為Downstream要訪問Upstream的時候,envoy會進行監聽,Downstream會連線監聽的埠。
listener_manager_.reset( new ListenerManagerImpl(*this, listener_component_factory_, worker_factory_, time_system_));
在ListenerManagerImpl的建構函式中,建立了很多的worker,Envoy採用libevent監聽socket的事件,當有一個新的連線來的時候,會將任務分配給某個worker進行處理,從而實現非同步的處理。
ListenerManagerImpl::ListenerManagerImpl(...) {
for (uint32_t i = 0; i < server.options().concurrency(); i++) {
workers_.emplace_back(worker_factory.createWorker());
}
}
createWorker會建立WorkerImpl,初始化WorkerImpl需要兩個重要的引數。
一個是allocateDispatcher創建出來的DispatcherImpl,用來封裝libevent的事件分發的。
一個是ConnectionHandlerImpl,用來管理一個連線的。
我們接著看初始化過程,建立ProdClusterManagerFactory,用於建立Cluster Manager,管理上游的叢集。
cluster_manager_factory_.reset(new Upstream::ProdClusterManagerFactory(...);
在ProdClusterManagerFactory會建立ClusterManagerImpl,在ClusterManagerImpl的建構函式中,如果配置了CDS,就需要訂閱Cluster Discover Service。
if (bootstrap.dynamic_resources().has_cds_config()) {
cds_api_ = factory_.createCds(bootstrap.dynamic_resources().cds_config(), eds_config_, *this);
init_helper_.setCds(cds_api_.get());
}
會呼叫ProdClusterManagerFactory::createCds,裡面會建立CdsApiImpl。在CdsApiImpl的建構函式中,會建立CdsSubscription,訂閱CDS,當叢集的配置發生變化的時候,會呼叫CdsApiImpl::onConfigUpdate(...)
接下來在InstanceImpl::initialize的初始化函式中,如果配置了LDS,就需要訂閱Listener Discover Service。
if (bootstrap_.dynamic_resources().has_lds_config()) {
listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config());
}
ListenerManagerImpl的createLdsApi會呼叫ProdListenerComponentFactory的createLdsApi函式,會建立LdsApiImpl。在LdsApiImpl的建構函式中,會建立LdsSubscription,訂閱LDS,當Listener的配置改變的時候,會呼叫LdsApiImpl::onConfigUpdate(...)
三、Envoy的啟動
MainCommonBase::run() 會呼叫InstanceImpl::run(),會呼叫InstanceImpl::startWorkers(),會呼叫ListenerManagerImpl::startWorkers。
startWorkers會將Listener新增到所有的worker中,然後啟動worker的執行緒。
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
workers_started_ = true;
for (const auto& worker : workers_) {
for (const auto& listener : active_listeners_) {
addListenerToWorker(*worker, *listener);
}
worker->start(guard_dog);
}
}
對於每一個WorkerImpl,會呼叫ConnectionHandlerImpl的addListener函式,會建立ActiveListener物件,ActiveListener很重要,他的函式會在libevent收到事件的時候被呼叫。
在ActiveListener的建構函式中,會呼叫相同Worker的DispatcherImpl的createListener,建立Network::ListenerImpl,注意這個類的namespace,因為對於Envoy來講,LDS裡面有個Listener的概念,但是Socket也有Listener的概念,在Network這個namespace下面的,是對socket和libevent的封裝。
在Network::ListenerImpl的建構函式中,當收到事件的時候,會呼叫註冊的listenCallback函式。
if (bind_to_port) {
listener_.reset(evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));
在listenCallback函式中,會呼叫onAccept函式,這個函式是ConnectionHandlerImpl::ActiveListener::onAccept函式。
listener->cb_.onAccept(std::make_unique<AcceptedSocketImpl>(fd, local_address, remote_address), listener->hand_off_restored_destination_connections_);
四、Envoy從Pilot中獲取Listener
當Listener的配置有變化的時候,會呼叫LdsApiImpl::onConfigUpdate(...)。
會呼叫ListenerManagerImpl的addOrUpdateListener(...)函式,在這裡面,會建立一個ListenerImpl,
這裡的listener是LDS定義的Listener,而非網路的Listener了。
在ListenerImpl的建構函式中,首先會建立ListenerFilter,可以對監聽的socket進行定製化的配置,例如為了實現use_original_dst,就需要加入一個ListenerFilter。
為了建立ListenerFilter,先要呼叫ProdListenerComponentFactory::createListenerFilterFactoryList_來建立工廠。
建立完了ListenerFilter之後,為了對於進來的網路包進行處理,會建立NetworkFilter,正是這些NetworkFilter實現了對網路包的解析,並根據網路包的內容,實現路由和負載均衡策略。
為了建立NetworkFilter,先要呼叫ProdListenerComponentFactory::createNetworkFilterFactoryList_來建立工廠,在這個函式裡面,會遍歷所有的NamedNetworkFilterConfigFactory,也即起了名字的filter都過一遍,都有哪些呢?class NetworkFilterNameValues裡面有定義,這裡面最重要的是:
// HTTP connection manager filter
const std::string HttpConnectionManager = "envoy.http_connection_manager";
我們這裡重點看HTTP協議的轉發,我們重點看這個名字對應的HttpConnectionManagerFilterConfigFactory,並呼叫他的createFilterFactory函式,返回一個Network::FilterFactoryCb型別的callback函式。
五、Envoy訂閱RDS
HttpConnectionManagerFilterConfigFactory的createFilterFactory函式最終呼叫createFilterFactoryFromProtoTyped函式中,會建立
Router::RouteConfigProviderManagerImpl。
std::shared_ptr<Router::RouteConfigProviderManager>
route_config_provider_manager
= context.singletonManager().getTyped
<Router::RouteConfigProviderManager>(
SINGLETON_MANAGER_REGISTERED_NAME(route_config_provider_manager), [&context] {
return std::make_shared<Router::RouteConfigProviderManagerImpl>(context.admin());
});
建立完畢Router::RouteConfigProviderManagerImpl之後,會建立HttpConnectionManagerConfig,將Router::RouteConfigProviderManagerImpl作為成員變數傳入。
std::shared_ptr<HttpConnectionManagerConfig> filter_config(new HttpConnectionManagerConfig(proto_config, context, *date_provider, *route_config_provider_manager));
在HttpConnectionManagerConfig的建構函式中,呼叫Router::RouteConfigProviderUtil::create。
route_config_provider_ = Router::RouteConfigProviderUtil::create(config, context_, stats_prefix_,route_config_provider_manager_);
Router::RouteConfigProviderUtil::create會呼叫RouteConfigProviderManagerImpl::createRdsRouteConfigProvider,在這個函式裡面,會建立RdsRouteConfigSubscription訂閱RDS,然後建立RdsRouteConfigProviderImpl。
當Router的配置發生變化的時候,會呼叫RdsRouteConfigProviderImpl::onConfigUpdate(),在這個函式裡面,會從pilot獲取Route的配置,生成ConfigImpl物件。
當我們仔細分析ConfigImpl的時候,我們發現,這個資料結構和第一節Envoy中的路由配置是一樣的。
在ConfigImpl裡面有RouteMatcher用於匹配路由,在RouteMatcher裡面有VirtualHostImpl,在VirtualHostImpl裡面有RouteEntryImplBase,其中一種RouteEntry是WeightedClusterEntry,在WeightedClusterEntry裡面有cluster_name_和cluster_weight_。
到此RouteConfigProviderManagerImpl如何訂閱RDS告一段落,我們回到HttpConnectionManagerFilterConfigFactory的createFilterFactoryFromProtoTyped中,在這個函式的最後一部分,將返回Network::FilterFactoryCb,是一個callback函式,作為createFilterFactoryFromProtoTyped的返回值。
在這個callback函式中,會建立Http::ConnectionManagerImpl,將HttpConnectionManagerConfig作為成員變數傳入,Http::ConnectionManagerImpl是一個Network::ReadFilter。在Envoy裡面,有Network::WriteFilter和Network::ReadFilter,其中從Downstream傳送到Upstream的使用Network::ReadFilter對網路包進行處理,反方向的是使用Network::WriteFilter。
callback函式建立ConnectionManagerImpl之後呼叫filter_manager.addReadFilter,將ConnectionManagerImpl放到ReadFilter列表中。當然這個callback函式現在是不呼叫的。
createFilterFactoryFromProtoTyped的返回值callback函式會返回到ProdListenerComponentFactory::createNetworkFilterFactoryList_函式,這個函式返回一個callback函式列表,其中一個就是上面生成的這個函式。
再返回就回到了ListenerImpl的建構函式,他會呼叫addFilterChain,將ProdListenerComponentFactory::createNetworkFilterFactoryList_返回的callback函式列表加入到鏈裡面。
六、當一個新的連線建立的時候
通過前面的分析,我們制定當一個新的連線建立的時候,會觸發libevent的事件,最終呼叫ConnectionHandlerImpl::ActiveListener::onAccept函式。
在這個函式中,會呼叫使用上面createListenerFilterFactoryList_建立的ListenerFilter的工廠建立ListenerFilter,然後使用這些Filter進行處理。
// Create and run the filters
config_.filterChainFactory().createListenerFilterChain(*active_socket);
active_socket->continueFilterChain(true);
ConnectionHandlerImpl::ActiveSocket::continueFilterChain函式呼叫ConnectionHandlerImpl::ActiveListener::newConnection,建立一個新的連線。
在ConnectionHandlerImpl::ActiveListener::newConnection函式中,先是會呼叫DispatcherImpl::createServerConnection,建立Network::ConnectionImpl,在Network::ConnectionImpl的建構函式裡面:
file_event_ = dispatcher_.createFileEvent(
fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write);
對於一個新的socket連線,對於作業系統來講是一個檔案,也即有個檔案描述符,當一個socket可讀的時候,會觸發一個事件,當一個socket可寫的時候,可以觸發另一個事件,發生事件後,呼叫onFileEvent。
在ConnectionHandlerImpl::ActiveListener::newConnection函式中,接下來就應該為這個連線建立NetworkFilter了。
config_.filterChainFactory().createNetworkFilterChain(
*new_connection, filter_chain->networkFilterFactories());
上面這段程式碼,會呼叫ListenerImpl::createNetworkFilterChain。
裡面呼叫Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories);
bool FilterChainUtility::buildFilterChain(Network::FilterManager& filter_manager, const std::vector<Network::FilterFactoryCb>& factories)
{
for (const Network::FilterFactoryCb& factory : factories) {
factory(filter_manager);
}
return filter_manager.initializeReadFilters();
}
可以看出這裡會呼叫上面生成的callback函式,將ConnectionManagerImpl放到ReadFilter列表中。
七、當有新的資料到來的時候
當Downstream傳送資料到Envoy的時候,socket就處於可讀的狀態,因而ConnectionImpl::onFileEvent函式會被呼叫,當事件是Event::FileReadyType::Read的時候,呼叫ConnectionImpl::onReadReady()。
在ConnectionImpl::onReadReady()函式裡面,socket將資料讀入快取。
IoResult result = transport_socket_->doRead(read_buffer_);
然後呼叫ConnectionImpl::onRead,裡面呼叫filter_manager_.onRead()使用NetworkFilter對於資料進行處理。
FilterManagerImpl::onRead() 會呼叫FilterManagerImpl::onContinueReading有下面的邏輯。
for (; entry != upstream_filters_.end(); entry++) {
BufferSource::StreamBuffer read_buffer = buffer_source_.getReadBuffer();
if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
if (status == FilterStatus::StopIteration) {
return;
}
}
}
對於每一個Filter,都呼叫onData函式,咱們上面解析過,其中HTTP對應的ReadFilter是ConnectionManagerImpl,因而呼叫ConnectionManagerImpl::onData函式。
八、對資料進行解析

ConnectionManagerImpl::onData函式中,首先建立資料解析器。
codec_ = config_.createCodec(read_callbacks_->connection(), data, *this);
呼叫HttpConnectionManagerConfig::createCodec,對於HTTP1,建立Http::Http1::ServerConnectionImpl。
然後對資料進行解析。
codec_->dispatch(data);
呼叫ConnectionImpl::dispatch,裡面呼叫ConnectionImpl::dispatchSlice,在這裡面對HTTP進行解析。
<font size="3">http_parser_execute(&parser_, &settings_, slice, len);</font>
解析的引數是settings_,可以看出這裡面解析url,然後解析header,結束後呼叫onHeadersCompleteBase()函式,然後解析正文。
由於路由是根據HTTP頭裡面的資訊來的,因而我們重點看ConnectionImpl::onHeadersCompleteBase(),裡面會呼叫ServerConnectionImpl::onHeadersComplete函式。
active_request_->request_decoder_->decodeHeaders(std::move(headers), false);
ServerConnectionImpl::onHeadersComplete裡面呼叫ConnectionManagerImpl::ActiveStream::decodeHeaders這到了路由策略的重點。

這裡面呼叫了兩個函式,一個是refreshCachedRoute()重新整理路由配置,並查詢到匹配的路由項Route Entry。
另一個是ConnectionManagerImpl::ActiveStream::decodeHeaders的另一個實現。
decodeHeaders(nullptr, *request_headers_, end_stream);
在這裡會通過Route Entry找到後端的叢集,並建立連線。
九、路由匹配
我們先來解析refreshCachedRoute()
void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() {
Router::RouteConstSharedPtr route = snapped_route_config_->route(*request_headers_, stream_id_);
request_info_.route_entry_ = route ? route->routeEntry() : nullptr;
cached_route_ = std::move(route);
}
snapped_route_config_是什麼呢?snapped_route_config_的初始化如下。
snapped_route_config_(connection_manager.config_.routeConfigProvider().config())
routeConfigProvider()返回的就是RdsRouteConfigProviderImpl,其config函式返回的就是ConfigImpl,也就是上面我們描述過的層級的資料結構。
ConfigImpl的route函式如下
RouteConstSharedPtr route(const Http::HeaderMap& headers, uint64_t random_value) const override {
return route_matcher_->route(headers, random_value);
}
RouteMatcher的route函式,根據headers.Host()查詢virtualhost,然後呼叫 VirtualHostImpl::getRouteFromEntries。
RouteConstSharedPtr RouteMatcher::route(const Http::HeaderMap& headers, uint64_t random_value) const {
const VirtualHostImpl* virtual_host = findVirtualHost(headers);
if (virtual_host) {
return virtual_host->getRouteFromEntries(headers, random_value);
} else {
return nullptr;
}
}
VirtualHostImpl::getRouteFromEntries函式裡面有下面的迴圈
for (const RouteEntryImplBaseConstSharedPtr& route : routes_) {
RouteConstSharedPtr route_entry = route->matches(headers, random_value);
if (nullptr != route_entry) {
return route_entry;
}
}
對於每一個RouteEntry,看是否匹配。例如常用的有PrefixRouteEntryImpl::matches,看字首是否一致。
RouteConstSharedPtr PrefixRouteEntryImpl::matches(const Http::HeaderMap& headers,uint64_t random_value) const {
if (RouteEntryImplBase::matchRoute(headers, random_value) &&
StringUtil::startsWith(headers.Path()->value().c_str(), prefix_, case_sensitive_)) {
return clusterEntry(headers, random_value);
}
return nullptr;
}
得到匹配的Route Entry後,呼叫RouteEntryImplBase::clusterEntry獲取一個Cluster的名字。對於WeightedCluster,會呼叫下面的函式。
WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_, random_value,true);
在這個函式裡面,會根據權重選擇一個WeightedClusterEntry返回。
這個時候,我們得到了後面Cluster,也即叢集的名稱,接下來需要得到叢集的具體的IP地址並建立連線。
十、查詢叢集並建立連線
ConnectionManagerImpl::ActiveStream::decodeHeaders的另一個實現會呼叫Route.cc裡面的Filter::decodeHeaders。

在Filter::decodeHeaders函式中有以下的實現。
// A route entry matches for the request.
route_entry_ = route_->routeEntry();
Upstream::ThreadLocalCluster* cluster = config_.cm_.get(route_entry_->clusterName());
通過上一節的查詢,WeightedClusterEntry裡面有cluster的名稱,接下來就是從cluster manager裡面根據cluster名稱查詢到cluster的資訊。
cluster manager就是我們最初建立的ClusterManagerImpl
ThreadLocalCluster* ClusterManagerImpl::get(const std::string& cluster) {
ThreadLocalClusterManagerImpl& cluster_manager = tls_->getTyped<ThreadLocalClusterManagerImpl>();
auto entry = cluster_manager.thread_local_clusters_.find(cluster);
if (entry != cluster_manager.thread_local_clusters_.end()) {
return entry->second.get();
} else {
return nullptr;
}
}
返回ClusterInfoImpl,是cluster的資訊。
cluster_ = cluster->info();
找到Cluster後,開始建立連線。
// Fetch a connection pool for the upstream cluster.
Http::ConnectionPool::Instance* conn_pool = getConnPool();
route.cc的Filter::getConnPool()裡面呼叫以下函式。
config_.cm_.httpConnPoolForCluster(route_entry_->clusterName(), route_entry_->priority(), protocol, this);
ClusterManagerImpl::httpConnPoolForCluster呼叫ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool函式。
在這個函式裡面,HostConstSharedPtr host = lb_->chooseHost(context);通過負載均衡,在一個cluster裡面選擇一個後端的機器建立連線。
網易雲 容器服務 為使用者提供了無伺服器容器,讓企業能夠快速部署業務,輕鬆運維服務。容器服務支援彈性伸縮、垂直擴容、灰度升級、服務發現、服務編排、錯誤恢復及效能監測等功能。 點選可免費試用 。
最後。
劉老師 參加GIAC年度新人評選,馬了這麼多字,能幫忙投個票嗎? 請點選原文連線 。
劉超 網易雲技術架構部總監
長期致力於雲端計算開源技術的分享,佈道和落地,將網易內部最佳實踐服務客戶與行業。
技術分享:出版《Lucene應用開發解密》,極客時間專欄《趣談網路協議》,個人公眾號《劉超的通俗雲端計算》文章Kubernetes及微服務系列18篇,Mesos系列30篇,KVM系列25篇,Openvswitch系列31篇,OpenStack系列24篇,Hadoop系列10篇。公眾號文章《終於有人把雲端計算,大資料,人工智慧講明白了》累積10萬+
大會佈道:InfoQ架構師峰會明星講師,作為邀請講師在QCon,LC3,SACC,GIAC,CEUC,SoftCon,NJSD等超過10場大型技術峰會分享網易的最佳實踐
行業落地:將網易的容器和微服務產品在銀行,證券,物流,視訊監控,智慧製造等多個行業落地。
網易雲 免費體驗館 ,0成本體驗20+款雲產品!
更多網易技術、產品、運營經驗分享請 點選 。