1. 程式人生 > >Ceph網路模組使用案例:OSD心跳檢測機制

Ceph網路模組使用案例:OSD心跳檢測機制

ceph是一個分散式的檔案系統。對於一個分散式系統,需要一個穩定的底層網路通訊模組(訊息模組),用於元件之間的互聯互通。

ceph的元件主要包括,OSD,monitor,mgr,osdclient,client等,這些模組內部的通訊,以及模組間的通訊都使用了,後面我們直接把這些元件稱為網路模組的使用者。使用者在使用網路模組的時候,主要涉及到2個角色,一個是messenger,一個是dispatcher。messenger相當於一個訊息管理器(網路模組的核心,訊息的傳送和接收都是messenger通過底層的類實現),dispatcher相當於一個訊息的處理器。

-messenger

  • ① 將dispatcher移交給它的資訊傳送給其它節點(節點,這裡節點是一個邏輯單元,比如osd.0就算是一個節點,osd.1算一個新的節點)處理
  • ② 從其它節點的messenger獲取訊息移交給本節點的dispatcher。

-dispatcher

  • ① 對接收到的訊息(有messenger移交給它)進行處理
  • ② 把需要傳送的訊息移交給本節點的messenger

每個ceph元件都會註冊多個messenger和多個dispatcher用於處理不同型別的訊息。以OSD元件為例,OSD的守護程序啟動的時候會註冊7個messenger來管理訊息,每個messenger的用途不一樣。這部分程式碼在ceph osd守護程序啟動中 /src/ceph_osd.cc。 每一個OSD都有一個守護程序(OSD deamon)。這個deamon負責完成OSD的所有邏輯功能,包括與monitor和其他OSD(事實上是其他OSD的deamon)通訊以維護更新系 統狀態,與其他OSD共同完成資料的儲存和維護,與client通訊完成各種資料物件操作等等。

例子:一個OSD模組會註冊7個messenger和2個dispatcher。

編號

Messenger例項名稱

作用

1

*ms_public

用來處理OSD和Client之間的訊息

2

*ms_cluster

用來處理OSD和叢集之間的訊息

3

*ms_hb_front_client

用來向其它OSD傳送心跳的訊息

4

*ms_hb_back_client

用來向其它OSD傳送心跳的訊息

5

*ms_hb_back_server

用來接收其他OSD的心跳訊息

6

*ms_hb_front_server

用來接收其他OSD的心跳訊息

7

*ms_objecter

用來處理OSD和Objecter之間的訊息

編號

Dispatcher例項名稱

作用

1

*OSD

可以處理部分osd節點的訊息

2

*heartbeat_dispatcher

處理心跳連線

1 訊息模組的使用框架

1.1 傳送訊息

本節描述ceph中的應用(osd、mgr、monitor等)是如何使用網路模組傳送訊息的。在實際的應用中,有兩種常見的傳送訊息的方式,當然這兩種方式只是看起來有些不同,的底層實現都是相同,都是呼叫AsyncConnection::send_message(Message *m)把訊息傳送出去。

1)方式一:使用AsyncMessenger::send_message(Message *m, const entity_inst_t& dest))

其中Message *m是要傳送的訊息,dest是目的地址。

send_message會首先去判斷自己和目標地址之前是不是已經存在連結Connection,如果沒有就建立一個,conn->send_message(m)傳送訊息

-----------------------------------------------------------

  conn->send_message(m)

  -----------------------------------------------------------

2) 方式二:

① 首先要通過Messenger類,獲取對應的Connection:

  ------------------------------------------------------------

  conn = messenger->get_connection(dest_server);

  ------------------------------------------------------------

  get_connection過程是這樣的,如果dest.addr是my_inst.addr,就直接返回local_connection。

  如果連結不存在就新建一個。

② 當獲得一個Connection之後,就可以呼叫Connection的傳送函式來發送訊息。

  -----------------------------------------------------------

  conn->send_message(m)

  -----------------------------------------------------------

具體傳送的實現過程依賴於選擇的訊息模式,simple、async等實現方式都不同。在另一個文章裡我會講到具體實現過程,這裡不多做解釋。

1.2 訊息的接收

訊息的接收過程簡言之就是通過監聽socket判斷是否有訊息到來,如果有就接收。這個過程是個很複雜的過程,涉及到了連線建立、錯誤處理等等。具體的實現依賴於選擇的訊息模式,比如,SimpleMessenger是使用一個read執行緒來實現;AsyncMessenger是使用基於事件的機制實現。接收的過程對應用層都是透明的,本章不做解釋。

1.3 訊息的處理

訊息接收完成後,就進入訊息的處理。首先判斷訊息m是否可以fast_dispatch,如果可以,呼叫註冊fast_dispatcher函式處理訊息。如果不能fast_dispatch,呼叫函式in_q->enqueue,將接收到的訊息加入到DispatchQueue的mqueue佇列中,排隊等待處理。

2 ceph OSD心跳檢測與網路模組 

下面我們具體舉一個OSD心跳檢測的例子來講解,通過心跳檢測機制來了解網路模組的使用。在ceph中需要通過心跳檢測來判斷OSD是不是線上,因為這部分的功能比較簡單獨立。

2.1 Messenger & Dispatcher的註冊

在OSD模組註冊的7個Messenger和2個Dispatcher中,4個Messenger都和心跳檢測相關,一個heartbeat_dispatcher用來處理心跳連線。

編號

Messenger例項名稱

作用

1

*ms_public

用來處理OSD和Client之間的訊息

2

*ms_cluster

用來處理OSD和叢集之間的訊息

3

*ms_hb_front_client

用來向其它OSD傳送心跳的訊息

4

*ms_hb_back_client

用來向其它OSD傳送心跳的訊息

5

*ms_hb_back_server

用來接收其他OSD的心跳訊息

6

*ms_hb_front_server

用來接收其他OSD的心跳訊息

7

*ms_objecter

用來處理OSD和Objecter之間的訊息

對應程式碼在ceph_osd.cc中

-------------------------------------------------------------------------------------------------------

-HeartBeatMessenger

在ceph守護程序啟動過程中(ceph-osd.cc),建立了4個messenger用於心跳檢測。

在ceph守護程序建立osd的時候將這些messenger傳給了osd, 注意這些messenger在osd中被重新命名了。

重新命名為了:

hb_front_client_messenger

hb_back_client_messenger

hb_front_server_messenger

hb_back_server_messenger

---------------------------------------------------------------------------------------------------

編號

Dispatcher例項名稱

作用

1

*OSD

可以處理部分osd節點的訊息

2

*heartbeat_dispatcher

處理心跳連線

對應程式碼在osd.cc中

-------------------------------------------------------------------------------------------------------

  hb_front_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);

  hb_back_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);

  hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);

  hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);

-------------------------------------------------------------------------------------------------------

2.2 心跳機制介紹

OSD 中有一個heartbeat_thread,這個heartbeat_thread的作用就是不斷的傳送ping請求給其他節點。在Ceph中,OSD的地位都是對等的,每一個OSD在向其他OSD傳送ping訊息的同時,也會收到其他OSD發來ping訊息。OSD收到ping訊息後會傳送一個回覆訊息reply message。在部署ceph的時候,通常會使用兩張網絡卡front和back,將流量分開。所以OSD使用兩對messenger和來分別傳送和監聽front額back的ping心跳。圖中展示了一個3個OSD的ceph叢集的心跳檢測過程,只畫出了front網絡卡的心跳檢測,back雷同。

2.3 心跳檢測中網路模組的使用

我們著重看兩個osd節點間如何通訊。上圖展示了OSD A向OSD B傳送Ping訊息(hb_front_client_messenger(A) ->ms_hb_front_server(B)),然後OSD B收到訊息後交給dispatcher進行處理,然後傳送回覆訊息給OSD A (hb_front_client_messenger(A) <-ms_hb_front_server(B))的過程。具體的可以拆分成5個步驟:

  • ① 連線(connection)建立
  • ② Ping訊息傳送
  • ③ Ping訊息接受及處理
  • ④ 回覆訊息
  • ⑤ 處理PING_REPLY

 

下面對這五個步驟進行一一介紹。

① 連線(connection)建立

獲取目標 messenger B的連結connection

conn =hb_front_client_messenger(A)->get_connection(dest_server_B),如果沒有連結就建立一個。

monitor中的osdmap記錄了每一個osd的front地址和back地址,這個是在osd啟動的時候就告訴monitor的。

OSDService::get_con_osd_hb(),首先先獲取osdmap,獲取目標osdB的front地址,然後在A的messenger hb_front_client_messenger中建立一個connection。

pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
{
  OSDMapRef next_map = get_nextmap_reserved();
  // service map is always newer/newest
  assert(from_epoch <= next_map->get_epoch());

  pair<ConnectionRef,ConnectionRef> ret;
  if (next_map->is_down(peer) ||
      next_map->get_info(peer).up_from > from_epoch) {
    release_map(next_map);
    return ret;
  }
  ret.first = osd->hb_back_client_messenger->get_connection(next_map->get_hb_back_inst(peer));
  if (next_map->get_hb_front_addr(peer) != entity_addr_t())
    ret.second = osd->hb_front_client_messenger->get_connection(next_map->get_hb_front_inst(peer));
  release_map(next_map);
  return ret;
}osd->hb_back_client_messenger->get_connection(next_map->get_hb_back_inst(peer));
  if (next_map->get_hb_front_addr(peer) != entity_addr_t())
    ret.second = osd->hb_front_client_messenger->get_connection(next_map->get_hb_front_inst(peer));
  release_map(next_map);
  return ret;
}

② Ping訊息傳送

在OSD::heartbeat()中,對記錄了osd連線資訊map進行遍歷,每一個heartbeatinfo中記錄了一個目標osd的連結資訊(connection),通過這些conns把訊息傳送出去。重點的就是紅色部分。

void OSD::heartbeat()
{
  ......
  // send heartbeats
  for (map<int,HeartbeatInfo>::iterator i = heartbeat_peers.begin();
       i != heartbeat_peers.end();
       ++i) {
    int peer = i->first;
    i->second.last_tx = now;
    if (i->second.first_tx == utime_t())
      i->second.first_tx = now;
    dout(30) << "heartbeat sending ping to osd." << peer << dendl;
    i->second.con_back->send_message(new MOSDPing(monc->get_fsid(),
                      service.get_osdmap()->get_epoch(),
                      MOSDPing::PING, now,
                      cct->_conf->osd_heartbeat_min_size));

    if (i->second.con_front)
      i->second.con_front->send_message(new MOSDPing(monc->get_fsid(),
                         service.get_osdmap()->get_epoch(),
                         MOSDPing::PING, now,
                      cct->_conf->osd_heartbeat_min_size));
  ......
  }  
}i->second.con_back->send_message(new MOSDPing(monc->get_fsid(),
                      service.get_osdmap()->get_epoch(),
                      MOSDPing::PING, now,
                      cct->_conf->osd_heartbeat_min_size));

    if (i->second.con_front)
      i->second.con_front->send_message(new MOSDPing(monc->get_fsid(),
                         service.get_osdmap()->get_epoch(),
                         MOSDPing::PING, now,
                      cct->_conf->osd_heartbeat_min_size));
  ......
  }  
}

③ Ping訊息接受及處理

OSD B收到訊息,Messenger B內部的dispatch執行緒會呼叫事先鍵入的dispatcher對訊息進行處理。HeartbeatDispatcher會將message交給osd->heartbeat_dispatch()處理。

struct HeartbeatDispatcher : public Dispatcher {
    OSD *osd;
    explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}

    bool ms_dispatch(Message *m) override {
      return osd->heartbeat_dispatch(m);
    }
    ......
}heartbeat_dispatcher
bool OSD::heartbeat_dispatch(Message *m)
{
  dout(30) << "heartbeat_dispatch " << m << dendl;
  switch (m->get_type()) {

  case CEPH_MSG_PING:
    dout(10) << "ping from " << m->get_source_inst() << dendl;
    m->put();
    break;

  case MSG_OSD_PING:
    handle_osd_ping(static_cast<MOSDPing*>(m));
    break;

  default:
    dout(0) << "dropping unexpected message " << *m << " from " << m->get_source_inst() << dendl;
    m->put();
  }
  return true;
}osd->heartbeat_dispatch(m);
    }
    ......
}heartbeat_dispatcher
bool OSD::heartbeat_dispatch(Message *m)
{
  dout(30) << "heartbeat_dispatch " << m << dendl;
  switch (m->get_type()) {

  case CEPH_MSG_PING:
    dout(10) << "ping from " << m->get_source_inst() << dendl;
    m->put();
    break;

  case MSG_OSD_PING:
    handle_osd_ping(static_cast<MOSDPing*>(m));
    break;

  default:
    dout(0) << "dropping unexpected message " << *m << " from " << m->get_source_inst() << dendl;
    m->put();
  }
  return true;
}

heartbeat_dispatch()根據訊息的type進行處理,因為訊息的type是MSG_OSD_PING,調到OSD::handle_osd_ping(MOSDPing *m)進行處理, 進入case MOSDPing::PING:

void OSD::handle_osd_ping(MOSDPing *m)
{
  switch (m->op) {
  case MOSDPing::PING:
    {
      //做了一系列處理

      ...
      //傳送回覆包
      Message *r = new MOSDPing(monc->get_fsid(),
                curmap->get_epoch(),
                MOSDPing::PING_REPLY, m->stamp,
                cct->_conf->osd_heartbeat_min_size);
      m->get_connection()->send_message(r);
      ...
     }
  case MOSDPing::PING_REPLY:
    {
     // 更新時間戳,避免心跳超時
// osd有專門的tick執行緒進行週期性的檢查,如果發現有心跳超時的,就會上報monitor

      }
   }case MOSDPing::PING:
    {
      //做了一系列處理

      ...
      //傳送回覆包
      Message *r = new MOSDPing(monc->get_fsid(),
                curmap->get_epoch(),
                MOSDPing::PING_REPLY, m->stamp,
                cct->_conf->osd_heartbeat_min_size);
      m->get_connection()->send_message(r);
      ...
     }
  case MOSDPing::PING_REPLY:
    {
     // 更新時間戳,避免心跳超時
// osd有專門的tick執行緒進行週期性的檢查,如果發現有心跳超時的,就會上報monitor

      }
   }

④ 回覆訊息

第③步中,在OSD B的dispatcher中對訊息做一系列處理後,會封裝一個回覆訊息PING_REPLY,然後傳送OSD A。

------------------------------------------------------------------------

      Message *r = new MOSDPing(monc->get_fsid(),

                curmap->get_epoch(),

                MOSDPing::PING_REPLY, m->stamp,

                cct->_conf->osd_heartbeat_min_size);

      m->get_connection()->send_message(r);

------------------------------------------------------------------------

這個過程和前面類似,不同的是這次是OSD B的Messenger去獲取connection連結,(這個連結就是之前OSD A建立的連結,通過看Asyncmessenger.cc 可以看到在執行第②步中AsyncConnection::send_message(Message *m)時,通過 m->set_connection(this);將connection賦給了message,所以說A向B傳送訊息和B向A傳送回覆訊息都是用的同一條連結connection),傳送回覆訊息給OSD A。

⑤ 處理PING_REPLY

OSD A 的Messenger(hb_front_client_messenger)監聽到了回覆訊息,交給自己的Dispatcher處理。

還是先進入osd->heartbeat_dispatch(m)的MSG_OSD_PING,然後使用OSD::handle_osd_ping(MOSDPing *m)根據 訊息的type做相應處理,這次是進入ping reply

void OSD::handle_osd_ping(MOSDPing *m)
{
  switch (m->op) {
  case MOSDPing::PING:
    {
      ...
      //傳送回覆包
      Message *r = new MOSDPing(monc->get_fsid(),
                curmap->get_epoch(),
                MOSDPing::PING_REPLY, m->stamp,
                cct->_conf->osd_heartbeat_min_size);
      m->get_connection()->send_message(r);
      ...
     }
  case MOSDPing::PING_REPLY:
    {
     // 更新時間戳,避免心跳超時
// osd有專門的tick執行緒進行週期性的檢查,如果發現有心跳超時的,就會上報monitor

      }
}case MOSDPing::PING_REPLY:
    {
     // 更新時間戳,避免心跳超時
// osd有專門的tick執行緒進行週期性的檢查,如果發現有心跳超時的,就會上報monitor

      }
}

可以看出,心跳的傳送流程是很簡單的,也是很獨立的。在設計分散式系統的時候,為了保證叢集的內部狀態正確,應儘量不要引入過多複雜的因素影響心跳的流程。 畢竟心跳快速正確的處理是確保叢集運轉正常的最基本條件。

 參考文章【reference】:

整理中。。。