1. 程式人生 > >muduo網路庫學習筆記(四) 通過eventfd實現的事件通知機制

muduo網路庫學習筆記(四) 通過eventfd實現的事件通知機制

目錄

muduo網路庫學習筆記(四) 通過eventfd實現的事件通知機制


上篇文章為EventLoop添加了一個定時器Fd,為EventLoop增加了3個介面:runAfter()、runAt()、runEvery()、這三個介面用於處理定時任務和週期任務. 底層通過封裝TimerFd實現。

    TimerId runAt(const TimeStamp& time, const NetCallBacks::TimerCallBack& cb);
    TimerId runAfter(double delay, const NetCallBacks::TimerCallBack& cb);
    TimerId runEvery(double interval, const NetCallBacks::TimerCallBack& cb);

今天為EventLoop新增另一個Fd:EventFd, 用於實現執行緒間的事件通知機制.本文會先介紹eventfd的使用,然後給出muduo中EventLoop對eventfd的封裝.

eventfd的使用

eventfd系統函式

eventfd  - 事件通知檔案描述符

#include <sys/eventfd.h>
int eventfd(unsigned int initval ,int flags );

建立一個能被使用者應用程式用於時間等待喚醒機制的eventfd物件.
initval :
eventfd()建立一個可用作事件的“eventfd物件”使用者空間應用程式和核心等待/通知機制通知使用者空間應用程式的事件。該物件包含一個由核心維護的無符號64位整型(uint64_t)計數器。此計數器的初始值通過initval指定。一般設0.

flags


以下標誌中按位OR運算以更改eventfd()的行為,(檔案中常用的這兩個flags肯定都懂意思吧,就不翻譯了,第三個訊號量的不管它.):

   EFD_CLOEXEC (since Linux 2.6.27)
          Set the close-on-exec (FD_CLOEXEC) flag on the new file
          descriptor.  See the description of the O_CLOEXEC flag in
          open(2) for reasons why this may be useful.

   EFD_NONBLOCK (since Linux 2.6.27)
          Set the O_NONBLOCK file status flag on the new open file
          description.  Using this flag saves extra calls to fcntl(2) to
          achieve the same result.

   EFD_SEMAPHORE (since Linux 2.6.30)
          Provide semaphore-like semantics for reads from the new file
          descriptor.  See below.

read(2)

成功讀取返回一個8byte的整數。read(2)如果提供的緩衝區的大小小於8個位元組返回錯誤EINVAL

write (2)

將緩衝區寫入的8位元組整形值加到核心計數器上。可以寫入的最大值
是計數器中是最大的無符號64位值減1(即0xfffffffffffffffe)。

返回值:

On success, eventfd() returns a new eventfd file descriptor. On error, -1 is returned and errno is set to indicate the error.

使用示例

#include <iostream>
#include <assert.h>
#include <poll.h>
#include <signal.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <string.h>
#include <thread>

static int s_efd = 0;

int createEventfd()
{
  int evtfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);

  std::cout << "createEventfd() fd : " << evtfd << std::endl;

  if (evtfd < 0)
  {
    std::cout << "Failed in eventfd\n";
    abort();
  }

  return evtfd;
}

void testThread()
{
  int timeout = 0;
  while(timeout < 3) {
    sleep(1);
    timeout++;
  }

  uint64_t one = 1;
  ssize_t n = write(s_efd, &one, sizeof one);
  if(n != sizeof one)
  {
    std::cout << " writes " << n << " bytes instead of 8\n";
  }
}

int main()
{
  s_efd = createEventfd();

  fd_set rdset;
  FD_ZERO(&rdset);
  FD_SET(s_efd, &rdset);

  struct timeval timeout;
  timeout.tv_sec = 1;
  timeout.tv_usec = 0;

  std::thread t(testThread);

  while(1)
  {
    if(select(s_efd + 1, &rdset, NULL, NULL, &timeout) == 0)
    {
      std::cout << "timeout\n";
      timeout.tv_sec = 1;
      timeout.tv_usec = 0;
      FD_SET(s_efd, &rdset);
        continue;
    }

    uint64_t one = 0;

    ssize_t n = read(s_efd, &one, sizeof one);
    if(n != sizeof one)
    {
      std::cout << " read " << n << " bytes instead of 8\n";
    }

    std::cout << " wakeup !\n";

    break;
  }

  t.join();
  close(s_efd);

  return 0;
}
./test.out
createEventfd() fd : 3
timeout
timeout
timeout
 wakeup !

eventfd 單純的使用檔案描述符實現的執行緒間的通知機制,可以很好的融入select、poll、epoll的I/O複用機制中.

EventLoop對eventfd的封裝

所增加的介面及成員:

    typedef std::function<void()> Functor;
    void runInLoop(const Functor& cb);
    void wakeup(); //是寫m_wakeupFd 通知poll 處理讀事件.
    void queueInLoop(const Functor& cb);
private:
    //used to waked up
    void handleRead();
    void doPendingFunctors();
    
    int m_wakeupFd;
    std::unique_ptr<Channel> p_wakeupChannel;
    mutable MutexLock m_mutex;
    bool m_callingPendingFunctors; /* atomic */
    std::vector<Functor> m_pendingFunctors; // @GuardedBy mutex_

工作時序

(runInLoop() -> quueInLoop())/queueInLoop() -> wakeup() -> poll() -> handleRead() -> doPendingFunctors()

runInLoop()

如果使用者在當前IO執行緒呼叫這個函式, 回撥會同步進行; 如果使用者在其他執行緒呼叫runInLoop(),cb會被加入佇列, IO執行緒會被喚醒來呼叫這個Functor.

void EventLoop::runInLoop(const Functor&  cb)
{
  if(isInloopThread())
    cb();
  else
    queueInLoop(cb);
}

queueInLoop()

會將回調新增到容器,同時通過wakeup()喚醒poll()呼叫容器內的回撥.

void EventLoop::queueInLoop(const Functor& cb)
{
  LOG_TRACE << "EventLoop::queueInLoop()";
  {
    MutexLockGuard lock(m_mutex);
    m_pendingFunctors.push_back(std::move(cb));
  }

  if(!isInloopThread())
  {
    wakeup();
  }
}

內部實現,

wakeup()

寫已註冊到poll的eventfd 通知poll 處理讀事件.

//  m_wakeupFd(createEventfd()),
//  p_wakeupChannel(new Channel(this, m_wakeupFd)),
void EventLoop::wakeup()
{
  uint64_t one = 1;
  ssize_t n = sockets::write(m_wakeupFd, &one, sizeof one);
  if(n != sizeof one)
  {
    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  }
}

handleRead()

poll回撥讀事件,處理eventfd.

void EventLoop::handleRead() //handle wakeup Fd
{
  LOG_TRACE << "EventLoop::handleRead() handle wakeup Fd";
  uint64_t one = 1;
  ssize_t n = sockets::read(m_wakeupFd, &one, sizeof one);
  if(n != sizeof one)
  {
    LOG_ERROR << "EventLoop::handleRead() reads " << n << "bytes instead of 8";
  }
  doPendingFunctors();
}

doPendingFunctors()

處理掛起的事件.

void EventLoop::doPendingFunctors()
{
  LOG_TRACE << "EventLoop::doPendingFunctors()";
  std::vector<Functor> functors;
  m_callingPendingFunctors = true;

  {
    MutexLockGuard lock(m_mutex);
    functors.swap(m_pendingFunctors);
  }

  for(size_t i = 0; i < functors.size(); ++i)
  {
    functors[i]();
  }

  m_callingPendingFunctors = false;

}

總結

本文主要介紹了muduo中EventLoop通過 通過封裝一層eventfd實現的runInLoop()函式,使得其他執行緒想往EventLoop所在的I/O執行緒註冊任務成為可能.

下篇文章會寫Connector和Acceptor,連結器和監聽器 實現第一條連結。