[原始碼解析] 機器學習引數伺服器ps-lite(2) ----- 通訊模組Van

目錄

0x00 摘要

本文是引數伺服器系列第二篇,介紹ps-lite的通訊模組 Van。

本系列其他文章是:

[原始碼解析] 機器學習引數伺服器ps-lite 之(1) ----- PostOffice

0x01 功能概述

郵局裡有了地址簿,就需要有貨車來負責拉送物件,Van 就是整個Parameter Server的通訊模組,其特點如下。

  • PostOffice 類在例項化的時候,會建立一個 Van 類的例項 作為成員變數。該 Van 例項與所屬 PostOffice 例項生命週期相同(每個節點只有一個該物件);
  • Van 負責具體的節點間通訊。具體來說就是負責建立起節點之間的互相連線(例如Worker與Scheduler之間的連線),並且開啟本地的receiving thread用來監聽收到的message。

VAN 目前有兩個實現:

  • ZMQVan是基於zeromq的Van的實現,即用zmq庫實現了連線的底層細節(zmq庫是一個開源庫,對socket進行了優良的封裝,他使得Socket程式設計更加簡單、簡潔和效能更高)。
  • IBVerbsVan 是位元組跳動的實現,具體沒有深入研究。

0x02 定義

2.1 UML圖

首先給出 UML 圖。

2.2 主要說明

下面我們只給出Van物件關鍵變數和成員函式說明。

其主要變數如下:

  • Node scheduler_ :Scheduler 節點引數,每一個node都會記錄Scheduler 節點的資訊

  • Node my_node_ : 本節點引數。如果本節點是Scheduler,則 my_node_ 會指向上面的 scheduler_ ;

  • bool is_scheduler_ : 本節點是否是 scheduler;

  • std::unique_ptr< std::thread> receiver_thread_ :接收訊息執行緒指標;

  • std::unique_ptr< std::thread> heartbeat_thread_ :傳送心跳執行緒指標;

  • std::vector barrier_count_ :barrier 計數,用來記錄登記節點數目,只有所有節點都登記之後,系統才到了 ready 狀態,scheduler 才會給所有節點發送 ready 訊息,系統才正式啟動。

  • Resender *resender_ = nullptr :重新發送訊息指標;

  • std::atomic timestamp_{0} :message 自增 id,原子變數;

  • std::unordered_map<std::string, int> connected_nodes_ : 記錄了目前連線到哪些 nodes;

其主要函式功能如下:

  • start :建立通訊初始化;

  • Receiving :接收訊息執行緒的處理函式;

  • Heartbeat :傳送心跳執行緒的處理函式;

  • ProcessAddNodeCommandAtScheduler :scheduler 的 AddNode 訊息處理函式;

    • ProcessHearbeat:心跳包處理函式;

    • ProcessDataMsg :資料訊息(push & pull)處理函式;

    • ProcessAddNodeCommand :worker 和 server 的 AddNode 訊息處理函式;

    • ProcessBarrierCommand :Barrier 訊息處理函式;

2.3 執行緒管理

PS Lite 定義的三種角色採用多執行緒機制工作,每個執行緒承擔特定的職責,在所屬的 Van 例項啟動時被建立。

具體描述如下:

  • Scheduler,Worker 和 Server 的 Van 例項裡均持有一個接受資料的執行緒。
  • Worker 和 Server 的 Van 例項裡還持有一個間歇地向 Scheduler 傳送心跳的執行緒。
  • 如果定義了值不為 0 環境變數 PS_RESEND,那麼 Scheduler、Worker 和 Server 還會啟動一個監控執行緒。

2.4 類定義

詳細程式碼(摘要)如下:

class Van {
public:
static Van *Create(const std::string &type);
virtual void Start(int customer_id);
int Send(const Message &msg);
virtual void Stop();
inline int GetTimestamp() { return timestamp_++; }
inline bool IsReady() { return ready_; } protected:
 //連結節點
virtual void Connect(const Node &node) = 0;
//繫結到自己節點之上
virtual int Bind(const Node &node, int max_retry) = 0;
//接收訊息,用阻塞方式
virtual int RecvMsg(Message *msg) = 0;
 //傳送訊息
virtual int SendMsg(const Message &msg) = 0; /**
* \brief pack meta into a string
*/
void PackMeta(const Meta &meta, char **meta_buf, int *buf_size);
/**
* \brief pack meta into protobuf
*/
void PackMetaPB(const Meta &meta, PBMeta *pb);
/**
* \brief unpack meta from a string
*/
void UnpackMeta(const char *meta_buf, int buf_size, Meta *meta); Node scheduler_;
Node my_node_;
bool is_scheduler_;
std::mutex start_mu_; private:
/** thread function for receving */
void Receiving(); /** thread function for heartbeat */
void Heartbeat(); // node's address string (i.e. ip:port) -> node id
// this map is updated when ip:port is received for the first time
std::unordered_map<std::string, int> connected_nodes_;
// maps the id of node which is added later to the id of node
// which is with the same ip:port and added first
std::unordered_map<int, int> shared_node_mapping_; /** whether it is ready for sending */
std::atomic<bool> ready_{false};
std::atomic<size_t> send_bytes_{0};
size_t recv_bytes_ = 0;
int num_servers_ = 0;
int num_workers_ = 0;
/** the thread for receiving messages */
std::unique_ptr<std::thread> receiver_thread_;
/** the thread for sending heartbeat */
std::unique_ptr<std::thread> heartbeat_thread_;
std::vector<int> barrier_count_;
/** msg resender */
Resender *resender_ = nullptr;
int drop_rate_ = 0;
std::atomic<int> timestamp_{0};
int init_stage = 0;  //以下是處理各種型別訊息
void ProcessAddNodeCommandAtScheduler(Message *msg, Meta *nodes,
Meta *recovery_nodes);
void ProcessTerminateCommand();
void ProcessAddNodeCommand(Message *msg, Meta *nodes, Meta *recovery_nodes);
void ProcessBarrierCommand(Message *msg);
void ProcessHearbeat(Message *msg);
void ProcessDataMsg(Message *msg); //更新本地NodeID
void UpdateLocalID(Message *msg, std::unordered_set<int> *deadnodes_set,
Meta *nodes, Meta *recovery_nodes); const char *heartbeat_timeout_val =
Environment::Get()->find("PS_HEARTBEAT_TIMEOUT");
int heartbeat_timeout_ =
heartbeat_timeout_val ? atoi(heartbeat_timeout_val) : 0; DISALLOW_COPY_AND_ASSIGN(Van);
};

0x03 初始化

Van物件的初始化函式作用就是依據本地節點型別的不同,做不同設定,從而啟動埠,建立到scheduler的連結,啟動接收訊息執行緒,心跳執行緒等,這樣就可以進行通訊了。具體如下:

  1. 首先從環境變數中得到相關資訊,比如scheduler 的 "ip,port"(這兩個是預先設定的),本節點的角色(Worker/Server/Scheduler)等等,然後 初始化scheduler_這個成員變數;
  2. 如果本節點是 scheduler,則把 scheduler_ 賦值給 my_node_;
  3. 如果本節點不是 scheduler,則:
    1. 從系統中獲取本節點的ip資訊;
    2. 使用 GetAvailablePort 獲取一個port;
  4. 使用 Bind 繫結一個埠;
  5. 呼叫 Connect 建立到 Scheduler 的連線(scheduler也連線到自己的那個預先設定的固定埠);
  6. 啟動本地Node的接收訊息執行緒receiver_thread_,執行Van::Receiving
  7. 如果本節點不是 scheduler,給 Scheduler 傳送一個 ADD_NODE 訊息,這樣可以將本地Node的資訊告知Scheduler,即註冊到 scheduler;
  8. 然後進入等待狀態,等待Scheduler通知 Ready(scheduler 會等待所有節點都完成註冊後,統一發送 ready); 注意,這裡 scheduler 節點也會等,但是不影響 scheduler 節點 的 recevie 執行緒接受處理訊息;
  9. Ready後啟動心跳執行緒,建立到Scheduler的Heartbeat 連線;

關於7,8兩點的進一步說明就是:

  • 當worker和server節點繫結ip和port後,便向scheduler節點發送ADD_NODE message。
  • 當 scheduler收到所有worker和server的ADD_NODE message後,則依次應答ADD_NODE message,
  • 各個節點在此過程中通過原子變數ready_等待上述過程完成。

具體程式碼如下:

void Van::Start(int customer_id) {
// get scheduler info
start_mu_.lock(); if (init_stage == 0) {
// 初始化scheduler_這個成員變數
scheduler_.hostname = std::string(
CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_URI")));
scheduler_.port =
atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT")));
scheduler_.role = Node::SCHEDULER;
scheduler_.id = kScheduler;
// 確認本節點是scheduler節點
is_scheduler_ = Postoffice::Get()->is_scheduler(); // get my node info
if (is_scheduler_) {
// 初始化本節點,因為是scheduler,所以直接賦值
my_node_ = scheduler_;
} else {
auto role = Postoffice::Get()->is_worker() ? Node::WORKER : Node::SERVER;
const char* nhost = Environment::Get()->find("DMLC_NODE_HOST");
std::string ip;
if (nhost) ip = std::string(nhost);
if (ip.empty()) {
const char* itf = Environment::Get()->find("DMLC_INTERFACE");
std::string interface;
if (itf) interface = std::string(itf);
if (interface.size()) {
GetIP(interface, &ip);
} else {
GetAvailableInterfaceAndIP(&interface, &ip);
}
}
int port = GetAvailablePort();
const char* pstr = Environment::Get()->find("PORT");
if (pstr) port = atoi(pstr);
my_node_.hostname = ip;
my_node_.role = role;
my_node_.port = port;
// cannot determine my id now, the scheduler will assign it later
// set it explicitly to make re-register within a same process possible
my_node_.id = Node::kEmpty;
my_node_.customer_id = customer_id;
} // bind.
//繫結介面,把本節點繫結到ip:port這個socket上,理論來說這個函式就是初始化了receiver_
my_node_.port = Bind(my_node_, is_scheduler_ ? 0 : 40); // connect to the scheduler
// 連線上scheduler_,由於本節點就是scheduler_,其實就是初始化senders_,由於傳送的節點很多,所以這裡是一個map<int,void*>
// 在這裡就是senders_[1] = socket_1, socket_1中的body設定一點字元“ps1***”, 注意連結不是sendMsg
Connect(scheduler_); // for debug use
if (Environment::Get()->find("PS_DROP_MSG")) {
drop_rate_ = atoi(Environment::Get()->find("PS_DROP_MSG"));
}
// start receiver
// 開啟一個接收訊息的執行緒,這裡就是處理訊息
receiver_thread_ =
std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
init_stage++;
}
start_mu_.unlock(); if (!is_scheduler_) {
// let the scheduler know myself
// worker和server節點會通過 ADD_NODE 訊息把本地節點的資訊告訴scheduler,比如角色,ip,port...
Message msg;
Node customer_specific_node = my_node_;
customer_specific_node.customer_id = customer_id;
msg.meta.recver = kScheduler;
msg.meta.control.cmd = Control::ADD_NODE;
msg.meta.control.node.push_back(customer_specific_node);
msg.meta.timestamp = timestamp_++;
Send(msg);
} // wait until ready
// 等待 ready_ 從false變成true,當是scheduler的時候,必須要有等worker和server節點過來,不然一直都是阻塞在這,如果是 worker/server,則是等待 scheduler 傳送系統allready訊息。
while (!ready_.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} start_mu_.lock();
if (init_stage == 1) {
// resender
if (Environment::Get()->find("PS_RESEND") &&
atoi(Environment::Get()->find("PS_RESEND")) != 0) {
int timeout = 1000;
if (Environment::Get()->find("PS_RESEND_TIMEOUT")) {
timeout = atoi(Environment::Get()->find("PS_RESEND_TIMEOUT"));
}
// 如果設定了超時重傳,就初始化resender_這個變數
resender_ = new Resender(timeout, 10, this);
} if (!is_scheduler_) {
// start heartbeat thread
// 初始化心跳執行緒
heartbeat_thread_ =
std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this));
}
init_stage++;
}
start_mu_.unlock();
}

0x04 接受訊息

我們首先介紹後臺執行緒是如何執行,然後會具體分析如何處理各種訊息。

4.1 後臺處理訊息執行緒

ps-lite 啟動了一個後臺執行緒 receiver_thread_ 進行接受/處理訊息。

// start receiver
receiver_thread_ =
std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));

4.2 處理函式

receiver_thread_ 使用 Receiving 函式進行訊息處理。

4.2.1 控制資訊

除了傳遞引數的資料訊息外,各個節點之間控制資訊有:

  • ADD_NODE:worker和server向shceduler進行節點註冊;
  • BARRIER:節點間的同步阻塞訊息;
  • HEARTBEAT:節點間的心跳訊號;
  • TERMINATE:節點退出訊號;
  • ACK:確認訊息,ACK 型別只有啟用了 Resender 類才會出現
  • EMPTY:push or pull;

因此在 Receiving 之中會呼叫 不同處理函式處理不同型別的訊息:

  • ProcessTerminateCommand :處理 TERMINATE;
  • ProcessAddNodeCommand :處理 ADD_NODE;
  • ProcessBarrierCommand :處理 BARRIER(在上文已經分析);
  • ProcessHearbeat :處理 HEARTBEAT;

4.2.2 執行緒內全域性變數

執行緒內有兩個變數,因為其是在 while (true) 迴圈之外,所以屬於執行緒內的全域性變數,這點在閱讀程式碼時候需要注意。

  • nodes :只有 scheduler 在處理 ADD_NODE 時候會用到,儲存目前 scheduler 內部擁有的所有 nodes;
  • recovery_nodes :只有 scheduler 在處理 ADD_NODE 時候會用到,儲存目前 scheduler 內部擁有的所有 recovery nodes(康復重啟的節點);

4.2.3 具體實現

Receiving 邏輯如下:

  • 呼叫 RecvMsg(派生類會實現)獲取最新訊息;
  • 如果設定了取樣,則進行 drop;
  • 若設定了重傳機制,則會檢測此訊息是否重複,利用 resender_->AddIncomming(msg) 來處理重複訊息;
  • 處理控制訊息或者資料訊息;

具體程式碼如下

void Van::Receiving() {
Meta nodes;
// 以下兩個可以認為是全域性變數
Meta recovery_nodes; // store recovery nodes 儲存康復重啟的節點
recovery_nodes.control.cmd = Control::ADD_NODE; // 康復重啟節點的control.cmd 都設定為 ADD_NODE while (true) {
Message msg;
int recv_bytes = RecvMsg(&msg); //利用receiver_ 變數拿到訊息
// For debug, drop received message
if (ready_.load() && drop_rate_ > 0) {
unsigned seed = time(NULL) + my_node_.id;
if (rand_r(&seed) % 100 < drop_rate_) {
LOG(WARNING) << "Drop message " << msg.DebugString();
continue;
}
} CHECK_NE(recv_bytes, -1);
recv_bytes_ += recv_bytes; //收到的位元組數累加
if (Postoffice::Get()->verbose() >= 2) {
PS_VLOG(2) << msg.DebugString();
}
// duplicated message
if (resender_ && resender_->AddIncomming(msg)) continue; //重傳確認機制 if (!msg.meta.control.empty()) { //如果是控制型別的訊息
// control msg
auto& ctrl = msg.meta.control;
if (ctrl.cmd == Control::TERMINATE) {
ProcessTerminateCommand();
break;
} else if (ctrl.cmd == Control::ADD_NODE) {
ProcessAddNodeCommand(&msg, &nodes, &recovery_nodes); //當執行到這個位置的時候繼續跳轉
} else if (ctrl.cmd == Control::BARRIER) {
ProcessBarrierCommand(&msg);
} else if (ctrl.cmd == Control::HEARTBEAT) {
ProcessHearbeat(&msg); // 發回Heartbeat的ACK
} else {
LOG(WARNING) << "Drop unknown typed message " << msg.DebugString();
}
} else { //非控制型別的訊息處理方式
ProcessDataMsg(&msg);
}
}
}

4.3 處理 ADD_NODE 訊息

ADD_NODE 是 worker / server 用來向 scheduler 註冊自身的控制訊息。

4.3.1 註冊邏輯

先回憶下注冊基本思路。

  • 當worker和server節點繫結ip和port後,便向scheduler節點發送ADD_NODE message。
  • 當 scheduler收到所有worker和server的ADD_NODE message後則依次應答ADD_NODE message,注意,應答的也是 同類型ADD_NODE 訊息。
  • 各個節點(scheduler, worker, server)在此過程中通過原子變數ready_等待上述過程完成。

4.3.2 ProcessAddNodeCommand

ProcessAddNodeCommand 邏輯如下。

  • 查出心跳包超時的id,轉存到dead_set之中。
  • 拿到收到訊息裡面的control資訊。
  • 呼叫 UpdateLocalID,在其中:
    • 如果是新node,Scheduler記錄這個新的node。
    • 如果這個node是重啟產生的,則將舊node的資訊更新。
  • 如果是 scheduler,則:
    • 呼叫 ProcessAddNodeCommandAtScheduler 收到所有worker和server的ADD_NODE 的訊息後進行節點id分配並應答,即 設定最新的所有node的rank併發送給所有Worker和Server。
  • 如果不是 scheduler,說明 work & server 收到了 scheduler 回答的 ADD_NODE 訊息,則:
    • 如果自身是現有節點,則在 connected_nodes_ 之中不會找到這個新節點,則先有節點會呼叫 Connect 與新節點建立連線。
    • 如果自身是新節點,則會連線所有現有節點(非同型別)。
    • 在 connected_nodes_ 之中更新 全域性節點資訊,包括 global rank(本地Node的全域性rank等資訊是由receiver_thread_在這裡獲取);
    • 最後設定 ready_ = true,即本節點也可以運行了,因為主執行緒會阻塞在其上。

具體程式碼如下:

void Van::ProcessAddNodeCommand(Message* msg, Meta* nodes,
Meta* recovery_nodes) {
auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);//查出心跳包超時的id
std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());//轉存到dead_set之中
auto& ctrl = msg->meta.control; //拿到收到訊息裡面的control資訊 UpdateLocalID(msg, &dead_set, nodes, recovery_nodes); if (is_scheduler_) { // Scheduler 節點
ProcessAddNodeCommandAtScheduler(msg, nodes, recovery_nodes);
} else { // Worker & Server 節點
for (const auto& node : ctrl.node) {
std::string addr_str = node.hostname + ":" + std::to_string(node.port);
if (connected_nodes_.find(addr_str) == connected_nodes_.end()) {
// 現有節點會在自己連線之中查詢這個新節點,發現現有連線中沒有這個新節點
// 如果是新節點,則會連線現有節點(非同型別)
Connect(node); // 與新節點進行連線
connected_nodes_[addr_str] = node.id; // 加入已經連線的節點
}
if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_;
if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_;
}
ready_ = true;
}
}

4.3.3 UpdateLocalID

此函式作用是更新節點內部的node id 資訊,也是分為兩種情況,函式邏輯如下:

  • 如果msg->meta.sender是Meta::kEmpty,即未設定,則處理此message的一定是Scheduler,會進入 if 分支。

    • 如果目前 nodes 的control.node數目小於 "配置的server數目 + 配置的worker數目",則說明是系統啟動階段,將當前訊息的node資訊加入到 control.node 之中。
    • 否則說明是系統執行階段,應該是有些節點死掉重啟後再次連線。那麼,就從 nodes 的control.node 之中找到一個已經死掉的且節點role 與當前訊息一致(同類型)的 node id,把這個 node id 賦給這個重啟的節點。並且更新 nodes->control.node 和 recovery_nodes。
  • 下面就是普通節點處理的邏輯:
    • 即在 scheduler 傳回來的所有節點資訊中查詢,目的是找到與自己的ip,port一致的節點。
    • 如果找到,就更新本地節點資訊(因為在本節點啟動時候,並沒有設定 node_id 這個資訊,這個需要scheduler統一設定,從註釋看,目的是為了使重新註冊成為可能)。包括全域性 rank 資訊。

具體程式碼如下:

void Van::UpdateLocalID(Message* msg, std::unordered_set<int>* deadnodes_set,
Meta* nodes, Meta* recovery_nodes) {
auto& ctrl = msg->meta.control;
size_t num_nodes =
Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers();
// assign an id
if (msg->meta.sender == Meta::kEmpty) { //如果sender未設定,則處理此message的一定是Scheduler
CHECK(is_scheduler_);
CHECK_EQ(ctrl.node.size(), 1); //msg中的control命令中的節點集合就是worker自己,所以就是1個節點
if (nodes->control.node.size() < num_nodes) { //沒有到齊
nodes->control.node.push_back(ctrl.node[0]);
} else { //如果所有work和server到齊了,就進入else
// some node dies and restarts
CHECK(ready_.load());
for (size_t i = 0; i < nodes->control.node.size() - 1; ++i) {
const auto& node = nodes->control.node[i];
if (deadnodes_set->find(node.id) != deadnodes_set->end() &&
node.role == ctrl.node[0].role) {
auto& recovery_node = ctrl.node[0];
// assign previous node id
recovery_node.id = node.id;
recovery_node.is_recovery = true;
nodes->control.node[i] = recovery_node;
recovery_nodes->control.node.push_back(recovery_node);
break;
}
}
}
} // update my id / 對普通的node,更新其rank,scheduler 節點不會起作用(因為找不到)。
// schedule發給此work節點的訊息,如果發現本地的ip和port和訊息中的某個一點重合,那麼就把本地節點的ID(初始化時候沒有ID,只是等於Empty)改為schedule發過來的 node id。
for (size_t i = 0; i < ctrl.node.size(); ++i) {
const auto& node = ctrl.node[i];
if (my_node_.hostname == node.hostname && my_node_.port == node.port) {
if (getenv("DMLC_RANK") == nullptr || my_node_.id == Meta::kEmpty) {
my_node_ = node;
std::string rank = std::to_string(Postoffice::IDtoRank(node.id));
#ifdef _MSC_VER
_putenv_s("DMLC_RANK", rank.c_str());
#else
setenv("DMLC_RANK", rank.c_str(), true);
#endif
}
}
}
}

4.3.4 ProcessAddNodeCommandAtScheduler

ProcessAddNodeCommandAtScheduler 是在 Scheduler 之內執行,是對控制型別訊息的處理。

對於Scheduler節點來說,scheduler收到所有worker和server的ADD_NODE的訊息後進行節點id分配並應答,即,需要設定 最新的所有node的 全域性rank 併發送給所有Worker和Server。

  • 當接受到所有 worker & server 的註冊訊息之後(nodes->control.node.size() == num_nodes):

    • 將節點按照 ip + port 組合排序。
    • Scheduler 與所有註冊的節點建立連線、更新心跳時間戳,給 scheduler所有連線的節點分配全域性 rank。
    • 向所有的worker和server傳送ADD_NODE訊息(攜帶scheduler之中的所有node資訊)。
    • 會把 ready_ = true; 即 scheduler 是一個 ready 狀態了,不管 worker 和 server 是否確認收到ADD_NODE訊息。
    • 而在接收端(worker & server)的,每一個本地Node的全域性rank等資訊是由接收端 receiver_thread_(其他函式)獲取,就是得到了 scheduler 返回的這些 nodes 資訊。
  • 如果 !recovery_nodes->control.node.empty(),這就表明是處理某些重啟節點的註冊行為:
    • 查出心跳包超時的id,轉存到dead_set之中。
    • 與重啟節點建立連線(因為接收到了一個ADD_NODE),所以只與這個新重啟節點建立連線即可(在程式碼中有 CHECK_EQ(recovery_nodes->control.node.size(), 1) 來確認重啟節點為 1 個)。
    • 更新重啟節點的心跳。
    • 因為新加入了重啟節點,所以用一個傳送達到兩個目的:
      • 向所有 recovery 的worker和server傳送ADD_NODE訊息(攜帶scheduler之中的目前所有node資訊)。
      • 向 alive 節點發送 recovery 節點資訊。
      • 這樣,收到訊息的節點會則分別與新節點相互建立連線;

具體程式碼如下:

void Van::ProcessAddNodeCommandAtScheduler(Message* msg, Meta* nodes,
Meta* recovery_nodes) {
recovery_nodes->control.cmd = Control::ADD_NODE;
time_t t = time(NULL);
size_t num_nodes =
Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers();
// scheduler收到所有worker和server的ADD_NODE的訊息後進行節點id分配並應答
if (nodes->control.node.size() == num_nodes) { // 節點收集完全
// sort the nodes according their ip and port, 根據IP和port給worker,server排個序
std::sort(nodes->control.node.begin(), nodes->control.node.end(),
[](const Node& a, const Node& b) {
return (a.hostname.compare(b.hostname) | (a.port < b.port)) > 0;
});
// assign node rank
for (auto& node : nodes->control.node) {
// 建立連線、更新心跳時間戳,給 scheduler所有連線的節點分配全域性 rank。
std::string node_host_ip =
node.hostname + ":" + std::to_string(node.port);
if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) { //如果ip:port不存在van_中的話
CHECK_EQ(node.id, Node::kEmpty); //判斷是不是初始化節點
int id = node.role == Node::SERVER
? Postoffice::ServerRankToID(num_servers_)
: Postoffice::WorkerRankToID(num_workers_); //如果是sever的話,就id產生一個id號,num_servers_初始化為0
node.id = id; //將這個新節點的id賦值為id
Connect(node); //連線這個新節點, 即建立一個socket, 然後senders_[id] = sender; 就是將目標id的socket存放起來後面使用
Postoffice::Get()->UpdateHeartbeat(node.id, t);//更新心跳包
connected_nodes_[node_host_ip] = id; //既然 worker, server 已經發message來了,scheduler要把這個節點作為已經連結的節點
} else {
int id = node.role == Node::SERVER
? Postoffice::ServerRankToID(num_servers_)
: Postoffice::WorkerRankToID(num_workers_);
shared_node_mapping_[id] = connected_nodes_[node_host_ip];
node.id = connected_nodes_[node_host_ip];
}
if (node.role == Node::SERVER) num_servers_++;//更新rank
if (node.role == Node::WORKER) num_workers_++;
}
nodes->control.node.push_back(my_node_); //把本節點放到裡面
nodes->control.cmd = Control::ADD_NODE;
Message back;
back.meta = *nodes;
// 向所有的worker和server傳送ADD_NODE訊息
for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
int recver_id = r;
if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) {
back.meta.recver = recver_id;
back.meta.timestamp = timestamp_++;
Send(back);
}
} ready_ = true; //scheduler已經準備好了
} else if (!recovery_nodes->control.node.empty()) { // 節點沒有收集完全
auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);//查出心跳包超時的id
std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());//轉存到dead_set
// send back the recovery node
CHECK_EQ(recovery_nodes->control.node.size(), 1);
Connect(recovery_nodes->control.node[0]);
Postoffice::Get()->UpdateHeartbeat(recovery_nodes->control.node[0].id, t);
Message back;
for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
if (r != recovery_nodes->control.node[0].id &&
dead_set.find(r) != dead_set.end()) {
// do not try to send anything to dead node
continue;
}
// only send recovery_node to nodes already exist
// but send all nodes to the recovery_node
back.meta =
(r == recovery_nodes->control.node[0].id) ? *nodes : *recovery_nodes;
back.meta.recver = r;
back.meta.timestamp = timestamp_++;
Send(back);
}
}
}

此部分流程邏輯如下:

                                                              +
Scheduler | Worker
|
+ | +
| | |
| | |
v | |
Postoffice::Start +----> Van::Start | |
+ | |
| | |
| | |
v | |
Connect--do nothing | |
+ | v
| |
| | Postoffice::Start +-----> Van::Start
| | +
v | |
receiver_thread_ +---+ | |
+ | | v
| | | Connect--to scheduler
| | | +
| | | |
| | | |
| | | |
| | | v
| | | receiver_thread_ +----->+
| | | + |
| | | | |
| | | | |
| | | v |
| | <---------------------------------------+ Send |
| | | ADD_NODE + |
| v | | |
| | | |
| ProcessAddNodeCommand | | |
| + | | |
| | | | |
| | All nodes OK | | |
| | | | |
v | | | |
| set rank | | |
wait until ready | | | |
+ | | | |
| +----------------------------------------------------------------> |
| | | ADD_NODE response(nodes info) | |
| | | | ProcessAddNodeCommand
| | | v |
| | | |
| <--------------+ | wait until ready |
| ready_ = true | + |
| | | <---------------+
+-------------------+ v | |
| | +--------------------+ v
| | |
v | |
| v
Postoffice::Barrier |
| Postoffice::Barrier
+

手機如下,左側是 Scheduler,右側是 worker:

4.3.5 一個新加節點的序列

其互聯過程可以分為3步:

第一步:worker/server節點初始化的時候,向schedular節點發送一個連線資訊,假定自身是節點 2;

if (!is_scheduler_) {
// let the scheduler know myself
Message msg;
Node customer_specific_node = my_node_;
customer_specific_node.customer_id = customer_id;
msg.meta.recver = kScheduler;
msg.meta.control.cmd = Control::ADD_NODE;
msg.meta.control.node.push_back(customer_specific_node);
msg.meta.timestamp = timestamp_++;
Send(msg); //傳送給schedular, 建立連結資訊。
}

第二步:Scheduler 節點收到資訊後,在 ProcessAddNodeCommandAtScheduler 之中,首先會和 節點 2 建立一個連線。會向所有已經和schedular建立連線的worker節點/server節點 廣播此 "節點的加入資訊“,並把 節點 2 請求連線的資訊放入meta資訊中。

    // assign node rank
for (auto& node : nodes->control.node) {
std::string node_host_ip =
node.hostname + ":" + std::to_string(node.port);
if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) {
int id = node.role == Node::SERVER
? Postoffice::ServerRankToID(num_servers_)
: Postoffice::WorkerRankToID(num_workers_);
node.id = id;
Connect(node); // 連線這個新節點, 即建立一個socket, 然後senders_[id] = sender; 就是將目標id的socket存放起來後面使用
Postoffice::Get()->UpdateHeartbeat(node.id, t);
connected_nodes_[node_host_ip] = id;
} else {
int id = node.role == Node::SERVER
? Postoffice::ServerRankToID(num_servers_)
: Postoffice::WorkerRankToID(num_workers_);
shared_node_mapping_[id] = connected_nodes_[node_host_ip];
node.id = connected_nodes_[node_host_ip];
}
if (node.role == Node::SERVER) num_servers_++;
if (node.role == Node::WORKER) num_workers_++;
}
nodes->control.node.push_back(my_node_);
nodes->control.cmd = Control::ADD_NODE; Message back;
back.meta = *nodes;
// 向所有已經和schedular建立連線的worker節點/server節點 廣播此 "節點的加入資訊“,並把 節點 2 請求連線的資訊放入meta資訊中。
for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) {
int recver_id = r;
if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) {
back.meta.recver = recver_id;
back.meta.timestamp = timestamp_++;
Send(back);
}
}

第三步:現有worker/server節點收到這個命令後,在 ProcessAddNodeCommand 之中 會和 節點 2 形成連線。

   for (const auto& node : ctrl.node) {
std::string addr_str = node.hostname + ":" + std::to_string(node.port);
if (connected_nodes_.find(addr_str) == connected_nodes_.end()) { // 現有連線中沒有這個新節點
Connect(node); // 與新節點進行連線
connected_nodes_[addr_str] = node.id;
}
if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_;
if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_;

至此,整個過程就描述完了。每個新節點加入後,已經加入的節點都會通過schedular節點和這個新節點建立連線。

4.4 處理 HEARTBEAT 訊息

我們接下來分析心跳機制。

4.4.1 心跳機制

為了記錄網路的可達性,PS Lite 設計了心跳機制。具體而言:

  • 每一個節點的 PostOffice 單例中維護了一個 MAP 結構,儲存了心跳關聯的節點的活躍資訊。鍵為節點編號,值為上次收到其 HEARTBEAT 訊息的時間戳。
  • Worker/Server 只記錄 Scheduler 的心跳,Scheduler 則記錄所有節點的心跳。基於時間戳和心跳超時,可以輸出所有的死亡節點。
  • 每一個 Worker/Server 節點,會新建立一個心跳執行緒,每隔 PS_HEARTBEAT_INTERVAL 秒向 Scheduler 傳送一條 HEARTBEAT 訊息;
  • Scheduler 節點收到後,響應一個 HEARTBEAT 訊息。
  • scheduler進行應答,通過當前時間與心跳包接收時間之差判斷是否alive。
  • Scheduler 會依據心跳節點的時間戳來判斷死亡節點。如果新增的節點id在dead_node容器裡,表示這個節點是重新恢復的;而新增節點通過schedular的中轉與現有節點形成互聯。

具體如下:

4.4.2 資料結構

std::unordered_map<int, time_t> heartbeats_ 就是儲存了心跳關聯的節點的活躍資訊。鍵為節點編號,值為上次收到其 HEARTBEAT 訊息的時間戳。

UpdateHeartbeat 會定期更新心跳。

  void UpdateHeartbeat(int node_id, time_t t) {
std::lock_guard<std::mutex> lk(heartbeat_mu_);
heartbeats_[node_id] = t;
} std::unordered_map<int, time_t> heartbeats_;

4.4.3 Worker / Server 傳送心跳

在這兩種節點中,啟動了一個執行緒,每一個 Worker/Server 節點,每隔 PS_HEARTBEAT_INTERVAL 秒向 Scheduler 傳送一條 HEARTBEAT 訊息:

    if (!is_scheduler_) {
// start heartbeat thread
heartbeat_thread_ =
std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this));
}

具體心跳函式是:

void Van::Heartbeat() {
const char* val = Environment::Get()->find("PS_HEARTBEAT_INTERVAL");
const int interval = val ? atoi(val) : kDefaultHeartbeatInterval;
while (interval > 0 && ready_.load()) {
std::this_thread::sleep_for(std::chrono::seconds(interval));
Message msg;
msg.meta.recver = kScheduler;
msg.meta.control.cmd = Control::HEARTBEAT;
msg.meta.control.node.push_back(my_node_);
msg.meta.timestamp = timestamp_++;
Send(msg);
}
}

4.4.4 Scheduler 節點處理心跳

Scheduler 節點收到後 HEARTBEAT 訊息後,響應一個 HEARTBEAT 訊息。UpdateHeartbeat 會定期更新心跳。

void Van::ProcessHearbeat(Message* msg) {
auto& ctrl = msg->meta.control;
time_t t = time(NULL);
for (auto& node : ctrl.node) {
Postoffice::Get()->UpdateHeartbeat(node.id, t);
if (is_scheduler_) {
Message heartbeat_ack;
heartbeat_ack.meta.recver = node.id;
heartbeat_ack.meta.control.cmd = Control::HEARTBEAT;
heartbeat_ack.meta.control.node.push_back(my_node_);
heartbeat_ack.meta.timestamp = timestamp_++;
// send back heartbeat
Send(heartbeat_ack);
}
}
}

4.4.5 死亡節點

Scheduler 在處理 ADD_NODE 訊息時候,會看看是否已經有死亡節點,具體判通過當前時間戳與心跳包接收時間戳之差判斷是否alive。

std::vector<int> Postoffice::GetDeadNodes(int t) {
std::vector<int> dead_nodes;
if (!van_->IsReady() || t == 0) return dead_nodes; time_t curr_time = time(NULL);
const auto& nodes = is_scheduler_
? GetNodeIDs(kWorkerGroup + kServerGroup)
: GetNodeIDs(kScheduler);
{
std::lock_guard<std::mutex> lk(heartbeat_mu_);
for (int r : nodes) {
auto it = heartbeats_.find(r);
if ((it == heartbeats_.end() || it->second + t < curr_time)
&& start_time_ + t < curr_time) {
dead_nodes.push_back(r);
}
}
}
return dead_nodes;
}

邏輯如下:

+----------------------------------------------------+
| Scheduler |
| |
| |
| |
| heartbeats_ |
| |
| receiver_thread_+--------> ProcessHearbeat |
| ^ + ^ + |
| | | | | |
| | | | | |
| | | | | |
+----------------------------------------------------+
| | | |
| | | | RESPONSE
| | | +-------------------------------------+
| | | |
| | +-------------------------------+ |
| | | |
HEARTBEAT | | RESPONSE HEARTBEAT | |
| | | |
+-----------------------------------------+ +-----------------------------------------+
| Worker | | | | Server | | |
| | | | | | | |
| | | | | | | |
| | | | | | | |
| heartbeats_ | | | | heartbeats_ | | |
| + | | | + | |
| heartbeat_thread_+----> Heartbeat | | | heartbeat_thread_+--> Heartbeat | |
| | | | | |
| v | | v |
| receiver_thread_ +---> ProcessHearbeat | | receiver_thread_ +--> ProcessHearbeat |
| | | |
| | | |
| | | |
+-----------------------------------------+ +-----------------------------------------+

4.5 處理 TERMINATE 訊息

ProcessTerminateCommand 會處理結束訊息,具體就是設定 ready_ 為 false。

這樣就預示著 Van 狀態不對,不可以繼續處理。

void Van::ProcessTerminateCommand() {
PS_VLOG(1) << my_node().ShortDebugString() << " is stopped";
ready_ = false;
} inline bool IsReady() { return ready_; }

4.6 處理 ACK 訊息

4.6.1 Ack機制

在分散式系統中,通訊也是不可靠的,丟包、延時都是必須考慮的場景。PS Lite 設計了 Resender類來提高通訊的可靠性,它引入了 ACK 機制。即:

  • 每一個節點,對於收到的非 ACK/TERMINATE 訊息,必須響應一個 ACK 訊息。
  • 每一個節點,對於傳送的每一個非 ACK/TERMINATE 訊息,必須在本地快取下來。儲存的資料結構是一個 MAP,根據訊息的內容生產唯一的鍵。
  • 每一個節點,對於收到的 ACK 訊息,必須根據反饋的鍵從本地快取中移除對應的訊息。
  • 每一個節點執行一個監控執行緒,每隔 PS_RESEND_TIMEOUT 毫秒檢查一下本地快取。根據每個訊息的傳送時間戳和當前時間,找出超時的訊息進行重發,並累加其重試次數。

4.6.2 Resender類

定義如下,其中 send_buff_ 就是傳送快取,用來儲存傳送了的訊息列表。acked_ 就是已經確認的訊息。

class Resender {
std::thread* monitor_;
std::unordered_set<uint64_t> acked_;
std::atomic<bool> exit_{false};
std::mutex mu_;
int timeout_;
int max_num_retry_;
Van* van_;
using Time = std::chrono::milliseconds;
// the buffer entry
struct Entry {
Message msg;
Time send;
int num_retry = 0;
};
std::unordered_map<uint64_t, Entry> send_buff_;
};

4.6.3 監控執行緒

監控執行緒以及函式如下如下,就是被喚醒時候,從send_buff_(本地快取)找到每個訊息的傳送時間戳和當前時間,找出超時的訊息進行重發,並累加其重試次數。 :

  monitor_ = new std::thread(&Resender::Monitoring, this);

  void Monitoring() {
while (!exit_) {
std::this_thread::sleep_for(Time(timeout_));
std::vector<Message> resend;
Time now = Now();
mu_.lock();
for (auto& it : send_buff_) {
if (it.second.send + Time(timeout_) * (1+it.second.num_retry) < now) {
resend.push_back(it.second.msg);
++it.second.num_retry;
CHECK_LT(it.second.num_retry, max_num_retry_);
}
}
mu_.unlock(); for (const auto& msg : resend) van_->Send(msg);
}
}

4.6.4 傳送時快取

當 Van 傳送訊息時候,如果配置了重傳,就呼叫AddOutgoing函式把訊息加入到傳送快取。

int Van::Send(const Message& msg) {
int send_bytes = SendMsg(msg);
CHECK_NE(send_bytes, -1);
send_bytes_ += send_bytes;
if (resender_) resender_->AddOutgoing(msg);
if (Postoffice::Get()->verbose() >= 2) {
PS_VLOG(2) << msg.DebugString();
}
return send_bytes;
}

下面函式就是加入到傳送快取。

/**
* \brief add an outgoining message
*
*/
void AddOutgoing(const Message& msg) {
if (msg.meta.control.cmd == Control::ACK) return;
CHECK_NE(msg.meta.timestamp, Meta::kEmpty) << msg.DebugString();
auto key = GetKey(msg);
std::lock_guard<std::mutex> lk(mu_);
// already buffered, which often due to call Send by the monitor thread
if (send_buff_.find(key) != send_buff_.end()) return; auto& ent = send_buff_[key];
ent.msg = msg;
ent.send = Now();
ent.num_retry = 0;
}

4.6.5 清除快取

下面函式有兩個作用:

  • 檢查是否是重複訊息,則已經收到的確認訊息;
  • 如果是確認訊息,則從傳送快取中清除。
/**
* \brief add an incomming message
* \brief return true if msg has been added before or a ACK message
*/
bool AddIncomming(const Message& msg) {
// a message can be received by multiple times
if (msg.meta.control.cmd == Control::TERMINATE) {
return false;
} else if (msg.meta.control.cmd == Control::ACK) {
mu_.lock();
auto key = msg.meta.control.msg_sig;
auto it = send_buff_.find(key);
if (it != send_buff_.end()) send_buff_.erase(it);
mu_.unlock();
return true;
} else {
mu_.lock();
auto key = GetKey(msg);
auto it = acked_.find(key);
bool duplicated = it != acked_.end();
if (!duplicated) acked_.insert(key);
mu_.unlock();
// send back ack message (even if it is duplicated)
Message ack;
ack.meta.recver = msg.meta.sender;
ack.meta.sender = msg.meta.recver;
ack.meta.control.cmd = Control::ACK;
ack.meta.control.msg_sig = key;
van_->Send(ack);
// warning
if (duplicated) LOG(WARNING) << "Duplicated message: " << msg.DebugString();
return duplicated;
}
}

4.7 處理資料訊息

ProcessDataMsg 用來處理 worker 發過來的資料訊息(就是worker向server更新梯度),具體是取得對應的Customer後,呼叫 Customer 的方法進行處理,直接將msg放入處理佇列中。

我們會放在 Customer 之中進行介紹。

void Van::ProcessDataMsg(Message* msg) {
// data msg
int app_id = msg->meta.app_id;
int customer_id =
Postoffice::Get()->is_worker() ? msg->meta.customer_id : app_id;
auto* obj = Postoffice::Get()->GetCustomer(app_id, customer_id, 5);
obj->Accept(*msg); // 這裡給 Customer 新增訊息
}

0x05 ZMQVan

ZMQVan是基於zeromq的Van的實現,即為用zmq庫實現了連線的底層細節(zmq庫是一個開源庫,對socket進行了優良的封裝,他使得Socket程式設計更加簡單、簡潔和效能更高)。

5.1 定義

ZMQVan定義如下:

ZMQVan 繼承於Van ,在這個類的基礎上加了兩個成員變數,分別是:

  • unordered_map<int, void*> senders_ :senders_是一個集合,就是本節點發送 socket 的集合,即node id 與 socket 的對映。比如 8號節點要給9號節點發訊息,那麼只要找到(9,socket_9)這個組合就行了,然後呼叫 socket_9.send(message),
  • void *receiver_ = nullptr :是 Bind 函式得到的 socket 連線,因為是接受端,所以只有一個 socket 就行。

具體如下:

class ZMQVan : public Van {
void *context_ = nullptr;
/**
* \brief node_id to the socket for sending data to this node
*/
std::unordered_map<int, void*> senders_;
std::mutex mu_;
void *receiver_ = nullptr;
};

5.2 Van 函式

Van類 有如下函式會呼叫到 ZMQVan 或者被 ZMQVan 呼叫。

5.2.1 傳送訊息

Send 函式就是呼叫 ZMQVan 的 SendMsg 函式進行傳送訊息,傳送之後如果設定了ACK機制,則會呼叫 resender_->AddOutgoing。

int Van::Send(const Message& msg) {
int send_bytes = SendMsg(msg);
CHECK_NE(send_bytes, -1);
send_bytes_ += send_bytes;
if (resender_) resender_->AddOutgoing(msg);
if (Postoffice::Get()->verbose() >= 2) {
PS_VLOG(2) << msg.DebugString();
}
return send_bytes;
}

5.2.2 Meta 類

Meta封裝了元資料,傳送者,接受者,時間戳,請求還是響應等。

/**
* \brief meta info of a message
*/
struct Meta {
/** \brief the empty value */
static const int kEmpty;
/** \brief an int head */
int head;
/** \brief the unique id of the application of messsage is for*/
int app_id;
/** \brief customer id*/
int customer_id;
/** \brief the timestamp of this message */
int timestamp;
/** \brief the node id of the sender of this message */
int sender;
/** \brief the node id of the receiver of this message */
int recver;
/** \brief whether or not this is a request message*/
bool request;
/** \brief whether or not a push message */
bool push;
/** \brief whether or not a pull message */
bool pull;
/** \brief whether or not it's for SimpleApp */
bool simple_app;
/** \brief an string body */
std::string body;
/** \brief data type of message.data[i] */
std::vector<DataType> data_type;
/** \brief system control message */
Control control;
/** \brief the byte size */
int data_size = 0;
/** \brief message priority */
int priority = 0;
};

為了緩解通訊壓力,ps-lite 使用了Protobuf對 Meta 進行資料壓縮。

5.2.3 壓縮 Meta

就是按照 protobuf 來進行資料壓縮。

void Van::PackMeta(const Meta& meta, char** meta_buf, int* buf_size) {
// convert into protobuf
PBMeta pb;
pb.set_head(meta.head);
if (meta.app_id != Meta::kEmpty) pb.set_app_id(meta.app_id);
if (meta.timestamp != Meta::kEmpty) pb.set_timestamp(meta.timestamp);
if (meta.body.size()) pb.set_body(meta.body);
pb.set_push(meta.push);
pb.set_pull(meta.pull);
pb.set_request(meta.request);
pb.set_simple_app(meta.simple_app);
pb.set_priority(meta.priority);
pb.set_customer_id(meta.customer_id);
for (auto d : meta.data_type) pb.add_data_type(d);
if (!meta.control.empty()) {
auto ctrl = pb.mutable_control();
ctrl->set_cmd(meta.control.cmd);
if (meta.control.cmd == Control::BARRIER) {
ctrl->set_barrier_group(meta.control.barrier_group);
} else if (meta.control.cmd == Control::ACK) {
ctrl->set_msg_sig(meta.control.msg_sig);
}
for (const auto& n : meta.control.node) {
auto p = ctrl->add_node();
p->set_id(n.id);
p->set_role(n.role);
p->set_port(n.port);
p->set_hostname(n.hostname);
p->set_is_recovery(n.is_recovery);
p->set_customer_id(n.customer_id);
}
} // to string
*buf_size = pb.ByteSize();
*meta_buf = new char[*buf_size + 1];
CHECK(pb.SerializeToArray(*meta_buf, *buf_size))
<< "failed to serialize protobuf";
}

5.2.3 解壓 UnpackMeta

按照protobuf 預先生成的 PBMeta 格式進行解壓。

void Van::UnpackMeta(const char* meta_buf, int buf_size, Meta* meta) {
// to protobuf
PBMeta pb;
CHECK(pb.ParseFromArray(meta_buf, buf_size))
<< "failed to parse string into protobuf"; // to meta
meta->head = pb.head();
meta->app_id = pb.has_app_id() ? pb.app_id() : Meta::kEmpty;
meta->timestamp = pb.has_timestamp() ? pb.timestamp() : Meta::kEmpty;
meta->request = pb.request();
meta->push = pb.push();
meta->pull = pb.pull();
meta->simple_app = pb.simple_app();
meta->priority = pb.priority();
meta->body = pb.body();
meta->customer_id = pb.customer_id();
meta->data_type.resize(pb.data_type_size());
for (int i = 0; i < pb.data_type_size(); ++i) {
meta->data_type[i] = static_cast<DataType>(pb.data_type(i));
}
if (pb.has_control()) {
const auto& ctrl = pb.control();
meta->control.cmd = static_cast<Control::Command>(ctrl.cmd());
meta->control.barrier_group = ctrl.barrier_group();
meta->control.msg_sig = ctrl.msg_sig();
for (int i = 0; i < ctrl.node_size(); ++i) {
const auto& p = ctrl.node(i);
Node n;
n.role = static_cast<Node::Role>(p.role());
n.port = p.port();
n.hostname = p.hostname();
n.id = p.has_id() ? p.id() : Node::kEmpty;
n.is_recovery = p.is_recovery();
n.customer_id = p.customer_id();
meta->control.node.push_back(n);
}
} else {
meta->control.cmd = Control::EMPTY;
}
}

5.2.4 PackMetaPB

PackMetaPB 從註釋看,是位元組跳動提交的,主要用於 ibverbs_van.h,所以我們不做深入研究。

void Van::PackMetaPB(const Meta& meta, PBMeta* pb) {
pb->set_head(meta.head);
if (meta.app_id != Meta::kEmpty) pb->set_app_id(meta.app_id);
if (meta.timestamp != Meta::kEmpty) pb->set_timestamp(meta.timestamp);
if (meta.body.size()) pb->set_body(meta.body);
pb->set_push(meta.push);
pb->set_request(meta.request);
pb->set_simple_app(meta.simple_app);
pb->set_priority(meta.priority);
pb->set_customer_id(meta.customer_id);
for (auto d : meta.data_type) pb->add_data_type(d);
if (!meta.control.empty()) {
auto ctrl = pb->mutable_control();
ctrl->set_cmd(meta.control.cmd);
if (meta.control.cmd == Control::BARRIER) {
ctrl->set_barrier_group(meta.control.barrier_group);
} else if (meta.control.cmd == Control::ACK) {
ctrl->set_msg_sig(meta.control.msg_sig);
}
for (const auto& n : meta.control.node) {
auto p = ctrl->add_node();
p->set_id(n.id);
p->set_role(n.role);
p->set_port(n.port);
p->set_hostname(n.hostname);
p->set_is_recovery(n.is_recovery);
p->set_customer_id(n.customer_id);
}
}
pb->set_data_size(meta.data_size);
}

5.3 ZMQVan 派生函式

ZMQVan 有如下重要的派生函式。

5.3.1 Bind

Bind 邏輯如下:

  • 使用 zmq_bind() 來把一個socket繫結在一個本地的網路節點(endpoint)上,然後開始接收發送到本節點上的訊息。
  • 節點地址資訊是一個字串,它包括一個協議 / 然後跟著一個address。
  • Bind 函式會依據配置的變數 "DMLC_LOCAL" 來決定是啟用 ipc 方式還是 tcp 方式,從而配置節點地址資訊。
  • 如果是 schedule節點呼叫,則不需要指定port,但是對於work和server需要自己查詢一個本地可用埠。
  • 在查詢埠時候,會設定最大重試次數。
  int Bind(const Node& node, int max_retry) override {
receiver_ = zmq_socket(context_, ZMQ_ROUTER);
int local = GetEnv("DMLC_LOCAL", 0);
std::string hostname = node.hostname.empty() ? "*" : node.hostname;
int use_kubernetes = GetEnv("DMLC_USE_KUBERNETES", 0);
if (use_kubernetes > 0 && node.role == Node::SCHEDULER) {
hostname = "0.0.0.0";
}
std::string addr = local ? "ipc:///tmp/" : "tcp://" + hostname + ":";
int port = node.port;
unsigned seed = static_cast<unsigned>(time(NULL) + port);
for (int i = 0; i < max_retry + 1; ++i) {
auto address = addr + std::to_string(port);
if (zmq_bind(receiver_, address.c_str()) == 0) break;
if (i == max_retry) {
port = -1;
} else {
port = 10000 + rand_r(&seed) % 40000;
}
}
return port;
}

5.3.2 Connect

主要就是初始化 Sender_,邏輯如下:

  • 如果找到了對應socket就關閉socket。
  • 如果發現是 worker 發給同類,或者 server 發給同類,並且不是自己發給自己(Scheduler 可以自己發給自己),則返回。
  • 建立一個ZMQ套接字(socket),並且以一個不透明指標的形式把這新建立的socket賦值給 sender。
  • 如果本身是scheduler,則配置socket,把自己的 id 繫結到 socket上。
  • 將sender這個socket和目標地址連線。
  • 將目標id的socket存放起來,即把 socket 加入到Sender_。

具體如下:

void Connect(const Node& node) override {
int id = node.id;
auto it = senders_.find(id);
if (it != senders_.end()) {
zmq_close(it->second); // 如果找到了對應socket就關閉socket
}
// worker doesn't need to connect to the other workers. same for server
if ((node.role == my_node_.role) && (node.id != my_node_.id)) {
return;
}
void *sender = zmq_socket(context_, ZMQ_DEALER); //建立一個socket //如果本身是scheduler,則一開始就是知道自己的id = 1,所以這個if條件就是說把自己的id繫結到socket上
if (my_node_.id != Node::kEmpty) {
std::string my_id = "ps" + std::to_string(my_node_.id);
zmq_setsockopt(sender, ZMQ_IDENTITY, my_id.data(), my_id.size());
const char* watermark = Environment::Get()->find("DMLC_PS_WATER_MARK");
if (watermark) {
const int hwm = atoi(watermark);
zmq_setsockopt(sender, ZMQ_SNDHWM, &hwm, sizeof(hwm));
}
}
// connect
std::string addr = "tcp://" + node.hostname + ":" + std::to_string(node.port);
if (GetEnv("DMLC_LOCAL", 0)) {
addr = "ipc:///tmp/" + std::to_string(node.port);
}
if (zmq_connect(sender, addr.c_str()) != 0) { //將sender這個socket和目標地址連線
LOG(FATAL) << "connect to " + addr + " failed: " + zmq_strerror(errno);
}
senders_[id] = sender; //將目標id的socket存放起來後面使用
}

5.3.3 SendMsg

邏輯如下:

  • 從儲存的 sender_ 之中找到之前保留的socket;
  • 壓縮 meta;
  • 傳送 meta;
  • 迴圈分段傳送data;
  int SendMsg(const Message& msg) override {
std::lock_guard<std::mutex> lk(mu_);
// find the socket
int id = msg.meta.recver;
CHECK_NE(id, Meta::kEmpty);
auto it = senders_.find(id);
if (it == senders_.end()) {
LOG(WARNING) << "there is no socket to node " << id;
return -1;
}
void *socket = it->second; // send meta
int meta_size; char* meta_buf;
PackMeta(msg.meta, &meta_buf, &meta_size);
int tag = ZMQ_SNDMORE;
int n = msg.data.size();
if (n == 0) tag = 0;
zmq_msg_t meta_msg;
zmq_msg_init_data(&meta_msg, meta_buf, meta_size, FreeData, NULL);
while (true) {
if (zmq_msg_send(&meta_msg, socket, tag) == meta_size) break;
if (errno == EINTR) continue;
return -1;
}
// zmq_msg_close(&meta_msg);
int send_bytes = meta_size;
// send data
for (int i = 0; i < n; ++i) {
zmq_msg_t data_msg;
SArray<char>* data = new SArray<char>(msg.data[i]);
int data_size = data->size();
zmq_msg_init_data(&data_msg, data->data(), data->size(), FreeData, data);
if (i == n - 1) tag = 0;
while (true) {
if (zmq_msg_send(&data_msg, socket, tag) == data_size) break;
if (errno == EINTR) continue;
return -1;
}
// zmq_msg_close(&data_msg);
send_bytes += data_size;
}
return send_bytes;
}

5.3.4 RecvMsg

RecvMsg 就是在繫結的埠上接受訊息。

接受訊息時候,會判斷是第幾個訊息,然後做不同的處理。

  int RecvMsg(Message* msg) override {
msg->data.clear();
size_t recv_bytes = 0;
for (int i = 0; ; ++i) {
zmq_msg_t* zmsg = new zmq_msg_t;
CHECK(zmq_msg_init(zmsg) == 0) << zmq_strerror(errno);
while (true) {
if (zmq_msg_recv(zmsg, receiver_, 0) != -1) break;
if (errno == EINTR) {
std::cout << "interrupted";
continue;
}
return -1;
}
char* buf = CHECK_NOTNULL((char *)zmq_msg_data(zmsg));
size_t size = zmq_msg_size(zmsg);
recv_bytes += size; if (i == 0) {
// identify
msg->meta.sender = GetNodeID(buf, size);
msg->meta.recver = my_node_.id;
CHECK(zmq_msg_more(zmsg));
zmq_msg_close(zmsg);
delete zmsg;
} else if (i == 1) {
// task
UnpackMeta(buf, size, &(msg->meta));
zmq_msg_close(zmsg);
bool more = zmq_msg_more(zmsg);
delete zmsg;
if (!more) break;
} else {
// zero-copy
SArray<char> data;
data.reset(buf, size, [zmsg, size](char* buf) {
zmq_msg_close(zmsg);
delete zmsg;
});
msg->data.push_back(data);
if (!zmq_msg_more(zmsg)) {
break;
}
}
}
return recv_bytes;
}

GetNodeID 函式是

  /**
* return the node id given the received identity
* \return -1 if not find
*/
int GetNodeID(const char* buf, size_t size) {
if (size > 2 && buf[0] == 'p' && buf[1] == 's') {
int id = 0;
size_t i = 2;
for (; i < size; ++i) {
if (buf[i] >= '0' && buf[i] <= '9') {
id = id * 10 + buf[i] - '0';
} else {
break;
}
}
if (i == size) return id;
}
return Meta::kEmpty;
}

0x06 總結

我們最後進行一下總結:

郵局裡有了地址簿,就需要有貨車來負責拉送物件,Van 就是整個Parameter Server的通訊模組,其特點如下。

  • PostOffice 類在例項化的時候,會建立一個 Van 類的例項 作為成員變數。該 Van 例項與所屬 PostOffice 例項生命週期相同(每個節點只有一個該物件);
  • Van 負責具體的節點間通訊。具體來說就是負責建立起節點之間的互相連線(例如Worker與Scheduler之間的連線),並且開啟本地的receiving thread用來監聽收到的message。
  • Van物件的初始化函式作用就是依據本地節點型別的不同,做不同設定,從而啟動埠,建立本地節點到scheduler的連結,啟動接收訊息執行緒,心跳執行緒等,這樣就可以進行通訊了。
  • Parameter Server在後臺執行緒 receiver_thread_ 進行接受/處理訊息。除了傳遞引數的資料訊息外,各個節點之間控制資訊有:
    • ADD_NODE:worker和server向shceduler進行節點註冊;
    • BARRIER:節點間的同步阻塞訊息;
    • HEARTBEAT:節點間的心跳訊號;
    • TERMINATE:節點退出訊號;
    • ACK:確認訊息,ACK 型別只有啟用了 Resender 類才會出現
    • EMPTY:push or pull;

0xEE 個人資訊

★★★★★★關於生活和技術的思考★★★★★★

微信公眾賬號:羅西的思考

如果您想及時得到個人撰寫文章的訊息推送,或者想看看個人推薦的技術資料,敬請關注。

0xFF 參考

入門分散式機器學習---基於引數伺服器的邏輯迴歸實現原理

【分散式】基於ps-lite的分散式計算例項解析

ps-lite 原始碼分析

官方簡要使用說明

PS-Lite原始碼分析-KangRoger

ps-lite原始碼剖析-zybuluo

ps-lite程式碼筆記-willzhang