Socket程式設計實踐(11) --epoll原理與封裝
常用模型的特點
Linux 下設計併發網路程式,有典型的Apache模型(Process Per Connection,PPC), TPC(Thread Per Connection)模型,以及 select/polL模型和epoll模型。
1 、PPC/TPC 模型
這兩種模型思想類似,就是讓每一個到來的連線一邊自己做事去,別再來煩我(詳見本系列部落格).只是 PPC 是為它開了一個程序,而 TPC 開了一個執行緒。可是別煩我是有代價的,它要時間和空間啊,連線多了之後,那麼多的程序/執行緒切換,這開銷就上來了;因此這類模型能接受的最大連線數都不會高,一般在幾百個左右。
2 、select 模型
1) 最大併發數限制,因為一個程序所開啟的 FD (檔案描述符)是有限制的,由 FD_SETSIZE 設定,預設值是 1024,因此 Select 模型的最大併發數就被相應限制了。自己改改這個 FD_SETSIZE ?想法雖好,可是先看看下面吧 …
2) 效率問題, select 每次呼叫都會線性掃描全部的 FD 集合,這樣效率就會呈現線性下降,把 FD_SETSIZE 改大的後果就是,大家都慢慢來,什麼?都超時了??!!
3) 核心/使用者空間記憶體拷貝問題,如何讓核心把 FD 訊息通知給使用者空間呢?在這個問題上 select 採取了記憶體拷貝方法。
3、 poll 模型
基本上效率和 select 是相同的, select 缺點的 2 和 3 它都沒有改掉。
Epoll 的提升
1. Epoll 沒有最大併發連線的限制,上限是最大可以開啟檔案的數目,這個數字一般遠大於 2048, 一般來說這個數目和系統記憶體關係很大 ,具體數目可以 cat /proc/sys/fs/file-max[599534] 察看。
2. 效率提升, Epoll最大的優點就在於它只管你“活躍”的連線 ,而跟連線總數無關,因此在實際的網路環境中, Epoll的效率就會遠遠高於 select 和 poll 。
3. 記憶體拷貝, Epoll 在這點上使用了“共享記憶體(詳見本系列其他部落格)”,這個記憶體拷貝也省略了。
epoll的使用
epoll的介面非常簡單,一共就3/4個函式:
int epoll_create(int size); int epoll_create1(int flags); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
1. 對於epoll_create1 的flag引數: 可以設定為0 或EPOLL_CLOEXEC,為0時函式表現與epoll_create一致, EPOLL_CLOEXEC標誌與open 時的O_CLOEXEC 標誌類似,即程序被替換時會關閉開啟的檔案描述符(需要注意的是,epoll_create與epoll_create1當建立好epoll控制代碼後,它就是會佔用一個fd值,在linux下如果檢視/proc/<pid>/fd/,是能夠看到這個fd的,所以在使用完epoll後,必須呼叫close()關閉,否則可能導致fd被耗盡)。
2. 對於epoll_ctl, op引數表示動作,用三個巨集來表示:
EPOLL_CTL_ADD | 註冊新的fd到epfd中 |
EPOLL_CTL_DEL | 從epfd中刪除一個fd |
EPOLL_CTL_MOD | 修改已經註冊的fd的監聽事件 |
3. 對於epoll_wait:
events:結構體指標, 一般是一個數組
maxevents:事件的最大個數, 或者說是陣列的大小
timeout:超時時間, 含義與poll的timeout引數相同,設為-1表示永不超時;
4. epoll_event結構體
struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
一般data 共同體我們設定其成員fd即可,也就是epoll_ctl 函式的第三個引數。
events集合 | |
EPOLLIN | 表示對應的檔案描述符可以讀(包括對端SOCKET正常關閉) |
EPOLLOUT | 表示對應的檔案描述符可以寫 |
EPOLLPRI | 表示對應的檔案描述符有緊急的資料可讀(這裡應該表示有帶外資料到來) |
EPOLLERR | 表示對應的檔案描述符發生錯誤 |
EPOLLHUP | 表示對應的檔案描述符被結束通話 |
EPOLLET | 將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來說的 |
EPOLLONESHOT | 只監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL佇列裡 |
/**示例: epoll使用示例
注:client端與測試端與前同, 而且使用相同的測試端測試select/poll/epoll, 可以發現epoll的效率是非常高的**/
//新增fd到epoll
void addFd(int epollfd, int fd, const uint32_t &events = EPOLLIN, bool et = false)
{
struct epoll_event event;
event.events = events;
if (et)
event.events |= EPOLLET;
event.data.fd = fd;
if( epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1 )
err_exit("epoll_ctl_add error");
}
//從epoll刪除fd
void delFd(int epollfd, int fd)
{
struct epoll_event event;
event.data.fd = fd;
if( epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &event) == -1 )
err_exit("epoll_ctl_del error");
}
int main()
{
signal(SIGPIPE, sigHandlerForSigPipe);
try
{
TCPServer server(8001);
int listenfd = server.getfd();
int epollfd = epoll_create1(EPOLL_CLOEXEC);
if (epollfd == -1)
err_exit("epoll_create1 error");
// 將監聽套接字註冊到epoll
addFd(epollfd, listenfd, EPOLLIN, true);
// 用於儲存epoll_wait返回事件陣列
std::vector<struct epoll_event> events(16);
char buf[BUFSIZ];
int count = 0;
while (true)
{
// 等待epoll返回
int nReady = epoll_wait(epollfd, &*events.begin(), (int)events.size(), -1);
if (nReady == -1)
{
if (errno == EINTR)
continue;
err_exit("epoll_wait error");
}
if ((size_t)nReady == events.size())
events.resize(events.size()*2);
for (int i = 0; i < nReady; ++i)
{
// 如果是監聽套接字傳送了可讀事件
if (events[i].data.fd == listenfd)
{
int connectfd = accept(listenfd, NULL, NULL);
if (connectfd == -1)
err_exit("accept error");
cout << "accept success..." << endl;
cout << "count = " << ++count << endl;
setUnBlock(connectfd, true);
addFd(epollfd, connectfd, EPOLLIN, true);
}
// 如果是已連線套接字發生了可讀事件
else if (events[i].events & EPOLLIN)
{
int connectfd = events[i].data.fd;
if (connectfd < 0)
continue;
memset(buf, 0, sizeof(buf));
int ret = readline(connectfd, buf, sizeof(buf)-1);
if (ret == -1)
err_exit("read-line error");
// 如果對端關閉
else if (ret == 0)
{
cerr << "client connect closed..." << endl;
// 將該套接字同epoll中移除
delFd(epollfd, connectfd);
close(connectfd);
continue;
}
cout << buf;
writen(connectfd, buf, strlen(buf));
}
}
}
}
catch (const SocketException &e)
{
cerr << e.what() << endl;
err_exit("TCPServer error");
}
}
小結-epoll與select、poll的區別
1.相比於select與poll, epoll最大的好處在於它不會隨著監聽fd數目的增長而降低效率。
因為核心中select/poll的實現是採用輪詢來處理的, 因此他們檢測就緒實踐的演算法時間複雜度是O(N), 因此, 需要輪詢的fd數目越多, 自然耗時越多, 他們的效能呈線性甚至指數的方式下降。
而epoll的實現是基於事件回撥的,如果fd有期望的事件發生就通過回撥函式將其加入epoll就緒佇列中,也就是說它只關心“活躍”的fd,與fd數目無關 其演算法時間複雜度為O(1)。
2. 核心空間與使用者空間記憶體拷貝問題,如何讓核心把 fd訊息通知給使用者空間呢?在這個問題上select/poll採取了記憶體拷貝方法。而epoll採用了核心和使用者空間共享記憶體的方式。
3. epoll不僅會告訴應用程式有I/0 事件到來,還會告訴應用程式相關的資訊,這些資訊是應用程式填充的,因此根據這些資訊應用程式就能直接定位到事件,而不必遍歷整個fd集合。而select/poll模型,當有 I/O 事件到來時, select/poll通知應用程式有事件到達,而應用程式必須輪詢所有的fd集合,測試每個fd是否有事件發生,並處理事件。
4. 當活動連線比較多的時候, epoll_wait的效率就未必比select/poll高了, 因為這時候對於epoll 來說一直在呼叫callback 函式, 回撥函式被觸發得過於頻繁, 所以epoll_wait適用於連線數量多, 但活動連線少的情況;
ET/LT模式
1、EPOLLLT:完全靠Linux-kernel-epoll驅動,應用程式只需要處理從epoll_wait返回的fds, 這些fds我們認為它們處於就緒狀態。此時epoll可以認為是更快速的poll。
2、EPOLLET:此模式下,系統僅僅通知應用程式哪些fds變成了就緒狀態,一旦fd變成就緒狀態,epoll將不再關注這個fd的任何狀態資訊(從epoll佇列移除), 直到應用程式通過讀寫操作(非阻塞)觸發EAGAIN狀態,epoll認為這個fd又變為空閒狀態,那麼epoll又重新關注這個fd的狀態變化(重新加入epoll佇列)。 隨著epoll_wait的返回,佇列中的fds是在減少的,所以在大併發的系統中,EPOLLET更有優勢,但是對程式設計師的要求也更高,因為有可能會出現資料讀取不完整的問題,舉例如下:
假設現在對方傳送了2k的資料,而我們先讀取了1k,然後這時呼叫了epoll_wait,如果是邊沿觸發ET,那麼這個fd變成就緒狀態就會從epoll 佇列移除,則epoll_wait 會一直阻塞,忽略尚未讀取的1k資料; 而如果是水平觸發LT,那麼epoll_wait 還會檢測到可讀事件而返回,我們可以繼續讀取剩下的1k 資料。
因此總結來說: LT模式可能觸發的次數更多, 一旦觸發的次數多, 也就意味著效率會下降; 但這樣也不能就說LT模式就比ET模式效率更低, 因為ET的使用對程式設計人員提出了更高更精細的要求, 一旦程式設計人員水平達不到(比如本人), 那ET模式還不如LT模式;
Epoll-Class封裝
在本部分我們實現一個較為好用實用的Epoll併發類, 由於實現程式碼與使用方式較簡單, 因此就不在此贅述了, 下面我還使用了該類實現了一個基於Epoll的echo-server, 以演示該類的用法;
由於此處僅為Epoll類庫的第一個版本, 因此錯誤之處必然會存在, 如果讀者在閱讀的過程中發現了該類庫的BUG, 還望這篇部落格的讀者朋友不吝賜教; 而作者也會不斷的更新該類庫(主要更新程式碼我會發布到此處), 以處理新的業務需求;
Epoll類設計
class Epoll
{
public:
Epoll(int flags = EPOLL_CLOEXEC, int noFile = 1024);
~Epoll();
void addfd(int fd, uint32_t events = EPOLLIN, bool ETorNot = false);
void modfd(int fd, uint32_t events = EPOLLIN, bool ETorNot = false);
void delfd(int fd);
int wait(int timeout = -1);
int getEventOccurfd(int eventIndex) const;
uint32_t getEvents(int eventIndex) const;
public:
bool isValid()
{
if (m_epollfd == -1)
return false;
return true;
}
void close()
{
if (isValid())
{
:: close(m_epollfd);
m_epollfd = -1;
}
}
private:
std::vector<struct epoll_event> events;
int m_epollfd;
int fdNumber;
int nReady;
private:
struct epoll_event event;
};
Epoll類實現
/** epoll_create **/
Epoll::Epoll(int flags, int noFile) : fdNumber(0), nReady(0)
{
struct rlimit rlim;
rlim.rlim_cur = rlim.rlim_max = noFile;
if ( ::setrlimit(RLIMIT_NOFILE, &rlim) == -1 )
throw EpollException("setrlimit error");
m_epollfd = ::epoll_create1(flags);
if (m_epollfd == -1)
throw EpollException("epoll_create1 error");
}
Epoll::~Epoll()
{
this -> close();
}
/** epoll_ctl **/
void Epoll::addfd(int fd, uint32_t events, bool ETorNot)
{
bzero(&event, sizeof(event));
event.events = events;
if (ETorNot)
event.events |= EPOLLET;
event.data.fd = fd;
if( ::epoll_ctl(m_epollfd, EPOLL_CTL_ADD, fd, &event) == -1 )
throw EpollException("epoll_ctl_add error");
++ fdNumber;
}
void Epoll::modfd(int fd, uint32_t events, bool ETorNot)
{
bzero(&event, sizeof(event));
event.events = events;
if (ETorNot)
event.events |= EPOLLET;
event.data.fd = fd;
if( ::epoll_ctl(m_epollfd, EPOLL_CTL_MOD, fd, &event) == -1 )
throw EpollException("epoll_ctl_mod error");
}
void Epoll::delfd(int fd)
{
bzero(&event, sizeof(event));
event.data.fd = fd;
if( ::epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, &event) == -1 )
throw EpollException("epoll_ctl_del error");
-- fdNumber;
}
/** epoll_wait **/
int Epoll::wait(int timeout)
{
events.resize(fdNumber);
while (true)
{
nReady = epoll_wait(m_epollfd, &*events.begin(), fdNumber, timeout);
if (nReady == 0)
throw EpollException("epoll_wait timeout");
else if (nReady == -1)
{
if (errno == EINTR)
continue;
else throw EpollException("epoll_wait error");
}
else
return nReady;
}
return -1;
}
int Epoll::getEventOccurfd(int eventIndex) const
{
if (eventIndex > nReady)
throw EpollException("parameter(s) error");
return events[eventIndex].data.fd;
}
uint32_t Epoll::getEvents(int eventIndex) const
{
if (eventIndex > nReady)
throw EpollException("parameter(s) error");
return events[eventIndex].events;
}
使用Epoll的echoserver(測試)程式碼:
int main()
{
signal(SIGPIPE, SIG_IGN);
/**
將下面的這兩個變數設定成為放在程式的開頭,
只是因為這樣可以使得業務處理部分的程式碼顯
得簡潔一些,在實際應用(C++)中,沒必要也不
推薦這樣使用
**/
char buf[BUFSIZ];
int clientCount = 0;
try
{
TCPServer server(8001);
int listenfd = server.getfd();
Epoll epoll;
// 將監聽套接字註冊到epoll
epoll.addfd(server.getfd(), EPOLLIN, true);
while (true)
{
int nReady = epoll.wait();
for (int i = 0; i < nReady; ++i)
// 如果是監聽套接字發生了可讀事件
if (epoll.getEventOccurfd(i) == listenfd)
{
int connectfd = accept(listenfd, NULL, NULL);
if (connectfd == -1)
err_exit("accept error");
cout << "accept success..." << endl;
cout << "clientCount = " << ++ clientCount << endl;
setUnBlock(connectfd, true);
epoll.addfd(connectfd, EPOLLIN, true);
}
else if (epoll.getEvents(i) & EPOLLIN)
{
TCPClient *client = new TCPClient(epoll.getEventOccurfd(i));
memset(buf, 0, sizeof(buf));
if (client->read(buf, sizeof(buf)) == 0)
{
cerr << "client connect closed..." << endl;
// 將該套接字從epoll中移除
epoll.delfd(client->getfd());
delete client;
continue;
}
cout << buf;
client->write(buf);
}
}
}
catch (const SocketException &e)
{
cerr << e.what() << endl;
err_exit("TCPServer error");
}
catch (const EpollException &e)
{
cerr << e.what() << endl;
err_exit("Epoll error");
}
}
完整原始碼請參照: