1. 程式人生 > >Socket程式設計實踐(11) --epoll原理與封裝

Socket程式設計實踐(11) --epoll原理與封裝

常用模型的特點

    Linux 下設計併發網路程式,有典型的Apache模型(Process Per Connection,PPC), TPC(Thread Per Connection)模型,以及 select/polL模型和epoll模型。

1 、PPC/TPC 模型

這兩種模型思想類似,就是讓每一個到來的連線一邊自己做事去,別再來煩我(詳見本系列部落格).只是 PPC 是為它開了一個程序,而 TPC 開了一個執行緒。可是別煩我是有代價的,它要時間和空間啊,連線多了之後,那麼多的程序/執行緒切換,這開銷就上來了;因此這類模型能接受的最大連線數都不會高,一般在幾百個左右

2 、select 模型

1) 最大併發數限制,因為一個程序所開啟的 FD (檔案描述符)是有限制的,由 FD_SETSIZE 設定,預設值是 1024,因此 Select 模型的最大併發數就被相應限制了。自己改改這個 FD_SETSIZE ?想法雖好,可是先看看下面吧 …

2) 效率問題, select 每次呼叫都會線性掃描全部的 FD 集合,這樣效率就會呈現線性下降,把 FD_SETSIZE 改大的後果就是,大家都慢慢來,什麼?都超時了??!!

3) 核心/使用者空間記憶體拷貝問題,如何讓核心把 FD 訊息通知給使用者空間呢?在這個問題上 select 採取了記憶體拷貝方法。

3、 poll 模型

基本上效率和 select 是相同的, select 缺點的 2 和 3 它都沒有改掉。

Epoll 的提升

1. Epoll 沒有最大併發連線的限制,上限是最大可以開啟檔案的數目,這個數字一般遠大於 2048, 一般來說這個數目和系統記憶體關係很大 ,具體數目可以 cat /proc/sys/fs/file-max[599534] 察看。

2. 效率提升, Epoll最大的優點就在於它只管你“活躍”的連線 ,而跟連線總數無關,因此在實際的網路環境中, Epoll的效率就會遠遠高於 select 和 poll 。

3. 記憶體拷貝, Epoll 在這點上使用了“共享記憶體(詳見本系列其他部落格)”,這個記憶體拷貝也省略了。

epoll的使用

epoll的介面非常簡單,一共就3/4個函式:

int epoll_create(int size);
int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

   1. 對於epoll_create1 的flag引數: 可以設定為0 或EPOLL_CLOEXEC,為0時函式表現與epoll_create一致, EPOLL_CLOEXEC標誌與open 時的O_CLOEXEC 標誌類似,即程序被替換時會關閉開啟的檔案描述符(需要注意的是,epoll_create與epoll_create1當建立好epoll控制代碼後,它就是會佔用一個fd值,在linux下如果檢視/proc/<pid>/fd/,是能夠看到這個fd的,所以在使用完epoll後,必須呼叫close()關閉,否則可能導致fd被耗盡)。

2. 對於epoll_ctl, op引數表示動作,用三個巨集來表示:

EPOLL_CTL_ADD

註冊新的fd到epfd中

EPOLL_CTL_DEL

從epfd中刪除一個fd

EPOLL_CTL_MOD

修改已經註冊的fd的監聽事件

3. 對於epoll_wait:

events:結構體指標, 一般是一個數組

maxevents:事件的最大個數, 或者說是陣列的大小

timeout:超時時間, 含義與poll的timeout引數相同,設為-1表示永不超時;

4. epoll_event結構體

struct epoll_event
{
    uint32_t     events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
};
typedef union epoll_data
{
    void        *ptr;
    int          fd;
    uint32_t     u32;
    uint64_t     u64;
} epoll_data_t;

一般data 共同體我們設定其成員fd即可,也就是epoll_ctl 函式的第三個引數。

events集合

EPOLLIN

表示對應的檔案描述符可以讀(包括對端SOCKET正常關閉)

EPOLLOUT

表示對應的檔案描述符可以寫

EPOLLPRI

表示對應的檔案描述符有緊急的資料可讀(這裡應該表示有帶外資料到來)

EPOLLERR

表示對應的檔案描述符發生錯誤

EPOLLHUP

表示對應的檔案描述符被結束通話

EPOLLET

將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來說的

EPOLLONESHOT

只監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL佇列裡

/**示例: epoll使用示例
 	注:client端與測試端與前同, 而且使用相同的測試端測試select/poll/epoll, 可以發現epoll的效率是非常高的**/

//新增fd到epoll
void addFd(int epollfd, int fd, const uint32_t &events = EPOLLIN, bool et = false)
{
    struct epoll_event event;
    event.events = events;
    if (et)
        event.events |= EPOLLET;
    event.data.fd = fd;
    if( epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1 )
        err_exit("epoll_ctl_add error");
}
//從epoll刪除fd
void delFd(int epollfd, int fd)
{
    struct epoll_event event;
    event.data.fd = fd;
    if( epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &event) == -1 )
        err_exit("epoll_ctl_del error");
}

int main()
{
    signal(SIGPIPE, sigHandlerForSigPipe);
    try
    {
        TCPServer server(8001);
        int listenfd = server.getfd();
        int epollfd = epoll_create1(EPOLL_CLOEXEC);
        if (epollfd == -1)
            err_exit("epoll_create1 error");
        // 將監聽套接字註冊到epoll
        addFd(epollfd, listenfd, EPOLLIN, true);

        // 用於儲存epoll_wait返回事件陣列
        std::vector<struct epoll_event> events(16);
        char buf[BUFSIZ];
        int count = 0;
        while (true)
        {
            // 等待epoll返回
            int nReady = epoll_wait(epollfd, &*events.begin(), (int)events.size(), -1);
            if (nReady == -1)
            {
                if (errno == EINTR)
                    continue;
                err_exit("epoll_wait error");
            }
            if ((size_t)nReady == events.size())
                events.resize(events.size()*2);

            for (int i = 0; i < nReady; ++i)
            {
                // 如果是監聽套接字傳送了可讀事件
                if (events[i].data.fd == listenfd)
                {
                    int connectfd = accept(listenfd, NULL, NULL);
                    if (connectfd == -1)
                        err_exit("accept error");
                    cout << "accept success..." << endl;
                    cout << "count = " << ++count << endl;
                    setUnBlock(connectfd, true);
                    addFd(epollfd, connectfd, EPOLLIN, true);
                }
                // 如果是已連線套接字發生了可讀事件
                else if (events[i].events & EPOLLIN)
                {
                    int connectfd = events[i].data.fd;
                    if (connectfd < 0)
                        continue;

                    memset(buf, 0, sizeof(buf));
                    int ret = readline(connectfd, buf, sizeof(buf)-1);
                    if (ret == -1)
                        err_exit("read-line error");
                    // 如果對端關閉
                    else if (ret == 0)
                    {
                        cerr << "client connect closed..." << endl;
                        // 將該套接字同epoll中移除
                        delFd(epollfd, connectfd);
                        close(connectfd);
                        continue;
                    }
                    cout << buf;
                    writen(connectfd, buf, strlen(buf));
                }
            }
        }
    }
    catch (const SocketException &e)
    {
        cerr << e.what() << endl;
        err_exit("TCPServer error");
    }
}

小結-epoll與select、poll的區別

1.相比於select與poll, epoll最大的好處在於它不會隨著監聽fd數目的增長而降低效率。

因為核心中select/poll的實現是採用輪詢來處理的, 因此他們檢測就緒實踐的演算法時間複雜度是O(N), 因此, 需要輪詢的fd數目越多, 自然耗時越多, 他們的效能呈線性甚至指數的方式下降。

而epoll的實現是基於事件回撥的,如果fd有期望的事件發生就通過回撥函式將其加入epoll就緒佇列中,也就是說它只關心“活躍”的fd,與fd數目無關 其演算法時間複雜度為O(1)。

2. 核心空間與使用者空間記憶體拷貝問題,如何讓核心把 fd訊息通知給使用者空間呢?在這個問題上select/poll採取了記憶體拷貝方法。而epoll採用了核心和使用者空間共享記憶體的方式。

3. epoll不僅會告訴應用程式有I/0 事件到來,還會告訴應用程式相關的資訊,這些資訊是應用程式填充的,因此根據這些資訊應用程式就能直接定位到事件,而不必遍歷整個fd集合。而select/poll模型,當有 I/O 事件到來時, select/poll通知應用程式有事件到達,而應用程式必須輪詢所有的fd集合,測試每個fd是否有事件發生,並處理事件。

4. 當活動連線比較多的時候, epoll_wait的效率就未必比select/poll高了, 因為這時候對於epoll 來說一直在呼叫callback 函式, 回撥函式被觸發得過於頻繁, 所以epoll_wait適用於連線數量多, 但活動連線少的情況;

ET/LT模式

1、EPOLLLT:完全靠Linux-kernel-epoll驅動,應用程式只需要處理從epoll_wait返回的fds, 這些fds我們認為它們處於就緒狀態。此時epoll可以認為是更快速的poll。

2、EPOLLET:此模式下,系統僅僅通知應用程式哪些fds變成了就緒狀態,一旦fd變成就緒狀態,epoll將不再關注這個fd的任何狀態資訊(從epoll佇列移除), 直到應用程式通過讀寫操作(非阻塞)觸發EAGAIN狀態,epoll認為這個fd又變為空閒狀態,那麼epoll又重新關注這個fd的狀態變化(重新加入epoll佇列)。 隨著epoll_wait的返回,佇列中的fds是在減少的,所以在大併發的系統中,EPOLLET更有優勢,但是對程式設計師的要求也更高,因為有可能會出現資料讀取不完整的問題,舉例如下:

假設現在對方傳送了2k的資料,而我們先讀取了1k,然後這時呼叫了epoll_wait,如果是邊沿觸發ET,那麼這個fd變成就緒狀態就會從epoll 佇列移除,則epoll_wait 會一直阻塞,忽略尚未讀取的1k資料; 而如果是水平觸發LT,那麼epoll_wait 還會檢測到可讀事件而返回,我們可以繼續讀取剩下的1k 資料。

因此總結來說: LT模式可能觸發的次數更多, 一旦觸發的次數多, 也就意味著效率會下降; 但這樣也不能就說LT模式就比ET模式效率更低, 因為ET的使用對程式設計人員提出了更高更精細的要求, 一旦程式設計人員水平達不到(比如本人), 那ET模式還不如LT模式;

Epoll-Class封裝

在本部分我們實現一個較為好用實用的Epoll併發類, 由於實現程式碼與使用方式較簡單, 因此就不在此贅述了, 下面我還使用了該類實現了一個基於Epoll的echo-server, 以演示該類的用法;

由於此處僅為Epoll類庫的第一個版本, 因此錯誤之處必然會存在, 如果讀者在閱讀的過程中發現了該類庫的BUG, 還望這篇部落格的讀者朋友不吝賜教; 而作者也會不斷的更新該類庫(主要更新程式碼我會發布到此處), 以處理新的業務需求;

Epoll類設計

class Epoll
{
public:
    Epoll(int flags = EPOLL_CLOEXEC, int noFile = 1024);
    ~Epoll();

    void addfd(int fd, uint32_t events = EPOLLIN, bool ETorNot = false);
    void modfd(int fd, uint32_t events = EPOLLIN, bool ETorNot = false);
    void delfd(int fd);
    int wait(int timeout = -1);
    int getEventOccurfd(int eventIndex) const;
    uint32_t getEvents(int eventIndex) const;

public:
    bool isValid()
    {
        if (m_epollfd == -1)
            return false;
        return true;
    }
    void close()
    {
        if (isValid())
        {
            :: close(m_epollfd);
            m_epollfd = -1;
        }
    }

private:
    std::vector<struct epoll_event> events;
    int m_epollfd;
    int fdNumber;
    int nReady;
private:
    struct epoll_event event;
};

Epoll類實現

/** epoll_create **/
Epoll::Epoll(int flags, int noFile) : fdNumber(0), nReady(0)
{
    struct rlimit rlim;
    rlim.rlim_cur = rlim.rlim_max = noFile;
    if ( ::setrlimit(RLIMIT_NOFILE, &rlim) == -1 )
        throw EpollException("setrlimit error");

    m_epollfd = ::epoll_create1(flags);
    if (m_epollfd == -1)
        throw EpollException("epoll_create1 error");
}
Epoll::~Epoll()
{
    this -> close();
}
/** epoll_ctl **/
void Epoll::addfd(int fd, uint32_t events, bool ETorNot)
{
    bzero(&event, sizeof(event));
    event.events = events;
    if (ETorNot)
        event.events |= EPOLLET;
    event.data.fd = fd;
    if( ::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &event) == -1 )
        throw EpollException("epoll_ctl_add error");
    ++ fdNumber;
}
void Epoll::modfd(int fd, uint32_t events, bool ETorNot)
{
    bzero(&event, sizeof(event));
    event.events = events;
    if (ETorNot)
        event.events |= EPOLLET;
    event.data.fd = fd;
    if( ::epoll_ctl(m_epollfd, EPOLL_CTL_MOD, fd, &event) == -1 )
        throw EpollException("epoll_ctl_mod error");
}
void Epoll::delfd(int fd)
{
    bzero(&event, sizeof(event));
    event.data.fd = fd;
    if( ::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, &event) == -1 )
        throw EpollException("epoll_ctl_del error");
    -- fdNumber;
}
/** epoll_wait **/
int Epoll::wait(int timeout)
{
    events.resize(fdNumber);
    while (true)
    {
        nReady = epoll_wait(m_epollfd, &*events.begin(), fdNumber, timeout);
        if (nReady == 0)
            throw EpollException("epoll_wait timeout");
        else if (nReady == -1)
        {
            if (errno == EINTR)
                continue;
            else  throw EpollException("epoll_wait error");
        }
        else
            return nReady;
    }
    return -1;
}

int Epoll::getEventOccurfd(int eventIndex) const
{
    if (eventIndex > nReady)
        throw EpollException("parameter(s) error");
    return events[eventIndex].data.fd;
}
uint32_t Epoll::getEvents(int eventIndex) const
{
    if (eventIndex > nReady)
        throw EpollException("parameter(s) error");
    return events[eventIndex].events;
}

使用Epoll的echoserver(測試)程式碼:

int main()
{
signal(SIGPIPE, SIG_IGN);
    /**
    將下面的這兩個變數設定成為放在程式的開頭,
    只是因為這樣可以使得業務處理部分的程式碼顯
    得簡潔一些,在實際應用(C++)中,沒必要也不
    推薦這樣使用
    **/
    char buf[BUFSIZ];
    int clientCount = 0;
    try
    {
        TCPServer server(8001);
        int listenfd = server.getfd();
        Epoll epoll;
        // 將監聽套接字註冊到epoll
        epoll.addfd(server.getfd(), EPOLLIN, true);
        while (true)
        {
            int nReady = epoll.wait();
            for (int i = 0; i < nReady; ++i)
                // 如果是監聽套接字發生了可讀事件
                if (epoll.getEventOccurfd(i) == listenfd)
                {
                    int connectfd = accept(listenfd, NULL, NULL);
                    if (connectfd == -1)
                        err_exit("accept error");
                    cout << "accept success..." << endl;
                    cout << "clientCount = " << ++ clientCount << endl;
                    setUnBlock(connectfd, true);
                    epoll.addfd(connectfd, EPOLLIN, true);
                }
                else if (epoll.getEvents(i) & EPOLLIN)
                {
                    TCPClient *client = new TCPClient(epoll.getEventOccurfd(i));
                    memset(buf, 0, sizeof(buf));
                    if (client->read(buf, sizeof(buf)) == 0)
                    {
                        cerr << "client connect closed..." << endl;
                        // 將該套接字從epoll中移除
                        epoll.delfd(client->getfd());
                        delete client;
                        continue;
                    }
                    cout << buf;
                    client->write(buf);
                }
        }
    }
    catch (const SocketException &e)
    {
        cerr << e.what() << endl;
        err_exit("TCPServer error");
    }
    catch (const EpollException &e)
    {
        cerr << e.what() << endl;
        err_exit("Epoll error");
    }
}

完整原始碼請參照: