1. 程式人生 > >Muduo 網路程式設計示例之八:用 Timing wheel 踢掉空閒連線

Muduo 網路程式設計示例之八:用 Timing wheel 踢掉空閒連線

Muduo 網路程式設計示例之八:Timing wheel 踢掉空閒連線

陳碩 (giantchen_AT_gmail)

這是《Muduo 網路程式設計示例》系列的第八篇文章,原計劃講檔案傳輸,這裡插入一點計劃之外的內容。

本文介紹如何使用 timing wheel 來踢掉空閒的連線,一個連線如果若干秒沒有收到資料,就認為是空閒連線。

在嚴肅的網路程式中,應用層的心跳協議是必不可少的。應該用心跳訊息來判斷對方程序是否能正常工作,“踢掉空閒連線”只是一時權宜之計。我這裡想順便講講 shared_ptr 和 weak_ptr 的用法。

如果一個連線連續幾秒鐘(後文以 8s 為例)內沒有收到資料,就把它斷開,為此有兩種簡單粗暴的做法:

  • 每個連線儲存“最後收到資料的時間 lastReceiveTime”,然後用一個定時器,每秒鐘遍歷一遍所有連線,斷開那些 (now - connection.lastReceiveTime) > 8s 的 connection。這種做法全域性只有一個 repeated timer,不過每次 timeout 都要檢查全部連線,如果連線數目比較大(幾千上萬),這一步可能會比較費時。
  • 每個連線設定一個 one-shot timer,超時定為 8s,在超時的時候就斷開本連線。當然,每次收到資料要去更新 timer。這種做法需要很多個 one-shot timer,會頻繁地更新 timers。如果連線數目比較大,可能對 reactor 的 timer queue 造成壓力。

使用 timing wheel 能避免上述兩種做法的缺點。timing wheel 可以翻譯為“時間輪盤”或“刻度盤”,本文保留英文。

連線超時不需要精確定時,只要大致 8 秒鐘超時斷開就行,多一秒少一秒關係不大。處理連線超時可以用一個簡單的資料結構:8 個桶組成的迴圈佇列。第一個桶放下一秒將要超時的連線,第二個放下 2 秒將要超時的連線。每個連線一收到資料就把自己放到第 8 個桶,然後在每秒鐘的 callback 裡把第一個桶裡的連線斷開,把這個空桶挪到隊尾。這樣大致可以做到 8 秒鐘沒有資料就超時斷開連線。更重要的是,每次不用檢查全部的 connection,只要檢查第一個桶裡的 connections,相當於把任務分散了。

Timing wheel 原理

《Hashed and hierarchical timing wheels: efficient data structures for implementing a timer facility》這篇論文詳細比較了實現定時器的各種資料結構,並提出了層次化的 timing wheel 與 hash timing wheel 等新結構。針對本文要解決的問題的特點,我們不需要實現一個通用的定時器,只用實現 simple timing wheel 即可。

Simple timing wheel 的基本結構是一個迴圈佇列,還有一個指向隊尾的指標 (tail),這個指標每秒鐘移動一格,就像鐘錶上的時針,timing wheel 由此得名。

以下是某一時刻 timing wheel 的狀態,格子裡的數字是倒計時(與通常的 timing wheel 相反),表示這個格子(桶子)中的連線的剩餘壽命。

wheel1

一秒鐘以後,tail 指標移動一格,原來四點鐘方向的格子被清空,其中的連線已被斷開。

wheel2

連線超時被踢掉的過程

假設在某個時刻,conn 1 到達,把它放到當前格子中,它的剩餘壽命是 7 秒。此後 conn 1 上沒有收到資料。

wheel3

1 秒鐘之後,tail 指向下一個格子,conn 1 的剩餘壽命是 6 秒。

wheel4

又過了幾秒鐘,tail 指向 conn 1 之前的那個格子,conn 1 即將被斷開。

wheel5

下一秒,tail 重新指向 conn 1 原來所在的格子,清空其中的資料,斷開 conn 1 連線。

wheel6

連線重新整理

如果在斷開 conn 1 之前收到資料,就把它移到當前的格子裡。

wheel4

收到資料,conn 1 的壽命延長為 7 秒。

wheel7

時間繼續前進,conn 1 壽命遞減,不過它已經比第一種情況長壽了。

wheel8

多個連線

timing wheel 中的每個格子是個 hash set,可以容納不止一個連線。

比如一開始,conn 1 到達。

wheel3

隨後,conn 2 到達,這時候 tail 還沒有移動,兩個連線位於同一個格子中,具有相同的剩餘壽命。(下圖中畫成連結串列,程式碼中是雜湊表。)

wheel9

幾秒鐘之後,conn 1 收到資料,而 conn 2 一直沒有收到資料,那麼 conn 1 被移到當前的格子中。這時 conn 1 的壽命比 conn 2 長。

wheel10

程式碼實現與改進

在具體實現中,格子裡放的不是連線,而是一個特製的 Entry struct,每個 Entry 包含 TcpConnection 的 weak_ptr。Entry 的解構函式會判斷連線是否還存在(用 weak_ptr),如果還存在則斷開連線。

資料結構:

  typedef boost::weak_ptr WeakTcpConnectionPtr;

  struct Entry : public muduo::copyable
  {
    Entry(const WeakTcpConnectionPtr& weakConn)
      : weakConn_(weakConn)
    {
    }

    ~Entry()
    {
      muduo::net::TcpConnectionPtr conn = weakConn_.lock();
      if (conn)
      {
        conn->shutdown();
      }
    }

    WeakTcpConnectionPtr weakConn_;
  };
  typedef boost::shared_ptr EntryPtr;
  typedef boost::weak_ptr WeakEntryPtr;
  typedef boost::unordered_set Bucket;
  typedef boost::circular_buffer WeakConnectionList;

在實現中,為了簡單起見,我們不會真的把一個連線從一個格子移到另一個格子,而是採用引用計數的辦法,用 shared_ptr 來管理 Entry。如果從連線收到資料,就把對應的 EntryPtr 放到這個格子裡,這樣它的引用計數就遞增了。當 Entry 的引用計數遞減到零,說明它沒有在任何一個格子裡出現,那麼連線超時,Entry 的解構函式會斷開連線。

Timing wheel 用 boost::circular_buffer 實現,其中每個 Bucket 元素是個 hash set of EntryPtr。

在建構函式中,註冊每秒鐘的回撥(EventLoop::runEvery() 註冊 EchoServer::onTimer() ),然後把 timing wheel 設為適當的大小。

EchoServer::EchoServer(EventLoop* loop,
                       const InetAddress& listenAddr,
                       int idleSeconds)
  : loop_(loop),
    server_(loop, listenAddr, "EchoServer"),
    connectionBuckets_(idleSeconds)
{
  server_.setConnectionCallback(
      boost::bind(&EchoServer::onConnection, this, _1));
  server_.setMessageCallback(
      boost::bind(&EchoServer::onMessage, this, _1, _2, _3));
  loop->runEvery(1.0, boost::bind(&EchoServer::onTimer, this));
  connectionBuckets_.resize(idleSeconds);
}

其中 EchoServer::onTimer() 的實現只有一行:往隊尾新增一個空的 Bucket,這樣 circular_buffer 會自動彈出隊首的 Bucket,並析構之。在析構 Bucket 的時候,會依次析構其中的 EntryPtr 物件,這樣 Entry 的引用計數就不用我們去操心,C++ 的值語意會幫我們搞定一切。

void EchoServer::onTimer()
{
  connectionBuckets_.push_back(Bucket());
}

在連線建立時,建立一個 Entry 物件,把它放到 timing wheel 的隊尾。另外,我們還需要把 Entry 的弱引用儲存到 TcpConnection 的 context 裡,因為在收到資料的時候還要用到 Entry。(思考題:如果 TcpConnection::setContext 儲存的是強引用 EntryPtr,會出現什麼情況?)

void EchoServer::onConnection(const TcpConnectionPtr& conn)
{
  LOG_INFO << "EchoServer - " << conn->peerAddress().toHostPort() << " -> "
    << conn->localAddress().toHostPort() << " is "
    << (conn->connected() ? "UP" : "DOWN");

  if (conn->connected())
  {
    EntryPtr entry(new Entry(conn));
    connectionBuckets_.back().insert(entry);
    WeakEntryPtr weakEntry(entry);
    conn->setContext(weakEntry);
  }
  else
  {
    assert(!conn->getContext().empty());
    WeakEntryPtr weakEntry(boost::any_cast(conn->getContext()));
    LOG_DEBUG << "Entry use_count = " << weakEntry.use_count();
  }
}

在收到訊息時,從 TcpConnection 的 context 中取出 Entry 的弱引用,把它提升為強引用 EntryPtr,然後放到當前的 timing wheel 隊尾。(思考題,為什麼要把 Entry 作為 TcpConnection 的 context 儲存,如果這裡再建立一個新的 Entry 會有什麼後果?)

void EchoServer::onMessage(const TcpConnectionPtr& conn,
                           Buffer* buf,
                           Timestamp time)
{
  string msg(buf->retrieveAsString());
  LOG_INFO << conn->name() << " echo " << msg.size() << " bytes at " << time.toString();
  conn->send(msg);

  assert(!conn->getContext().empty());
  WeakEntryPtr weakEntry(boost::any_cast(conn->getContext()));
  EntryPtr entry(weakEntry.lock());
  if (entry)
  {
    connectionBuckets_.back().insert(entry);
  }
}

然後呢?沒有然後了,程式已經完成了我們想要的功能。(完整的程式碼會列印 circular_buffer 變化的情況,執行一下即可理解。)

希望本文有助於您理解 shared_ptr 和 weak_ptr。

改進

在現在的實現中,每次收到訊息都會往隊尾新增 EntryPtr (當然,hash set 會幫我們去重。)一個簡單的改進措施是,在 TcpConnection 裡儲存“最後一次往隊尾新增引用時的 tail 位置”,然後先檢查 tail 是否變化,若無變化則不重複新增 EntryPtr。這樣或許能提高效率。

以上改進留作練習。