【Linux深入】epoll原始碼剖析
引入
之前講了select、poll、epoll的區別,由於許多應用中都用到了epoll,例如Netty、Redis等等,所以就來深入學習一下,現在我們就來剖析一下epoll的原始碼
我先來剖析理解epoll原始碼的基礎:主要的資料結構,然後再來解析epoll主要的三個方法:epoll_create()、epoll_ctl()、epoll_wait()。
主要的資料結構
1.eventpoll
// epoll的核心實現對應於一個epoll描述符
struct eventpoll {
spinlock_t lock;
struct mutex mtx;
wait_queue_head_t wq; // sys_epoll_wait() 等待在這裡
// f_op->poll() 使用的, 被其他事件通知機制利用的wait_address
wait_queue_head_t poll_wait;
//已就緒的需要檢查的epitem 列表
struct list_head rdllist;
//儲存所有加入到當前epoll的檔案對應的epitem
struct rb_root rbr;
// 當正在向用戶空間複製資料時, 產生的可用檔案
struct epitem *ovflist;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
struct file *file;
//優化迴圈檢查,避免迴圈檢查中重複的遍歷
int visited;
struct list_head visited_list_link;
}
2.epitem
// 對應於一個加入到epoll的檔案
struct epitem {
// 掛載到eventpoll 的紅黑樹節點
struct rb_node rbn;
// 掛載到eventpoll.rdllist 的節點
struct list_head rdllink;
// 連線到ovflist 的指標
struct epitem *next;
/* 檔案描述符資訊fd + file, 紅黑樹的key */
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait;
// 當前檔案的等待佇列(eppoll_entry)列表
// 同一個檔案上可能會監視多種事件,
// 這些事件可能屬於不同的wait_queue中
// (取決於對應檔案型別的實現),
// 所以需要使用連結串列
struct list_head pwqlist;
// 當前epitem 的所有者
struct eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* epoll_ctl 傳入的使用者資料 */
struct epoll_event event;
};
3.eppoll_entry
// 與一個檔案上的一個wait_queue_head 相關聯,因為同一檔案可能有多個等待的事件,
//這些事件可能使用不同的等待佇列
struct eppoll_entry {
// List struct epitem.pwqlist
struct list_head llink;
// 所有者
struct epitem *base;
// 新增到wait_queue 中的節點
wait_queue_t wait;
// 檔案wait_queue 頭
wait_queue_head_t *whead;
};
epoll_create()
1.epoll_create()
//先進行判斷size是否>=0,若是則直接呼叫epoll_create1
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
}
注:SYSCALL_DEFINE1是一個巨集,用於定義有一個引數的系統呼叫函式,上述巨集展開後即成為: int sys_epoll_create(int size),這就是epoll_create系統呼叫的入口。至於為何要用巨集而不是直接宣告,主要是因為系統呼叫的引數個數、傳參方式都有嚴格限制,最多六個引數,
2.epoll_create1()
/* 這才是真正的epoll_create啊~~ */
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error;
struct eventpoll *ep = NULL;//主描述符
/* Check the EPOLL_* constant for consistency. */
/* 這句沒啥用處... */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
/* 對於epoll來講, 目前唯一有效的FLAG就是CLOEXEC */
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
/* 分配一個struct eventpoll, 分配和初始化細節我們隨後深聊~ */
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
/* 這裡是建立一個匿名fd, 說起來就話長了...長話短說:
* epollfd本身並不存在一個真正的檔案與之對應, 所以核心需要建立一個
* "虛擬"的檔案, 併為之分配真正的struct file結構, 而且有真正的fd.
* 這裡2個引數比較關鍵:
* eventpoll_fops, fops就是file operations, 就是當你對這個檔案(這裡是虛擬的)進行操作(比如讀)時,
* fops裡面的函式指標指向真正的操作實現, 類似C++裡面虛擬函式和子類的概念.
* epoll只實現了poll和release(就是close)操作, 其它檔案系統操作都有VFS全權處理了.
* ep, ep就是struct epollevent, 它會作為一個私有資料儲存在struct file的private指標裡面.
* 其實說白了, 就是為了能通過fd找到struct file, 通過struct file能找到eventpoll結構.
* 如果懂一點Linux下字元裝置驅動開發, 這裡應該是很好理解的,
* 推薦閱讀 <Linux device driver 3rd>
*/
error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (error < 0)
ep_free(ep);
return error;
}
3.eventpoll_init
// epoll 檔案系統的相關實現
// epoll 檔案系統初始化, 在系統啟動時會呼叫
static int __init eventpoll_init(void)
{
struct sysinfo si;
si_meminfo(&si);
// 限制可新增到epoll的最多的描述符數量
max_user_watches = (((si.totalram - si.totalhigh) / 25) << PAGE_SHIFT) /
EP_ITEM_COST;
BUG_ON(max_user_watches < 0);
// 初始化遞迴檢查佇列
ep_nested_calls_init(&poll_loop_ncalls);
ep_nested_calls_init(&poll_safewake_ncalls);
ep_nested_calls_init(&poll_readywalk_ncalls);
// epoll 使用的slab分配器分別用來分配epitem和eppoll_entry
epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
0, SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);
pwq_cache = kmem_cache_create("eventpoll_pwq",
sizeof(struct eppoll_entry), 0, SLAB_PANIC, NULL);
return 0;
}
epoll_ctl()
1.epoll_crl()
//建立好epollfd後, 接下來新增fd
//epoll_ctl的引數:epfd 表示epollfd;op 有ADD,MOD,DEL,
//fd 是需要監聽的描述符,event 我們感興趣的events
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int did_lock_epmutex = 0;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
error = -EFAULT;
//錯誤處理以及從使用者空間將epoll_event結構copy到核心空間.
if (ep_op_has_event(op) &&
// 複製使用者空間資料到核心
copy_from_user(&epds, event, sizeof(struct epoll_event))) {
goto error_return;
}
// 取得 epfd 對應的檔案
error = -EBADF;
file = fget(epfd);
if (!file) {
goto error_return;
}
// 取得目標檔案
tfile = fget(fd);
if (!tfile) {
goto error_fput;
}
// 目標檔案必須提供 poll 操作
error = -EPERM;
if (!tfile->f_op || !tfile->f_op->poll) {
goto error_tgt_fput;
}
// 新增自身或epfd 不是epoll 控制代碼
error = -EINVAL;
if (file == tfile || !is_file_epoll(file)) {
goto error_tgt_fput;
}
// 取得內部結構eventpoll
ep = file->private_data;
// EPOLL_CTL_MOD 不需要加全域性鎖 epmutex
if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_DEL) {
mutex_lock(&epmutex);
did_lock_epmutex = 1;
}
if (op == EPOLL_CTL_ADD) {
if (is_file_epoll(tfile)) {
error = -ELOOP;
// 目標檔案也是epoll 檢測是否有迴圈包含的問題
if (ep_loop_check(ep, tfile) != 0) {
goto error_tgt_fput;
}
} else
{
// 將目標檔案新增到 epoll 全域性的tfile_check_list 中
list_add(&tfile->f_tfile_llink, &tfile_check_list);
}
}
mutex_lock_nested(&ep->mtx, 0);
// 以tfile 和fd 為key 在rbtree 中查詢檔案對應的epitem
epi = ep_find(ep, tfile, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
// 沒找到, 新增額外新增ERR HUP 事件
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tfile, fd);
} else {
error = -EEXIST;
}
// 清空檔案檢查列表
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi) {
error = ep_remove(ep, epi);
} else {
error = -ENOENT;
}
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else {
error = -ENOENT;
}
break;
}
mutex_unlock(&ep->mtx);
error_tgt_fput:
if (did_lock_epmutex) {
mutex_unlock(&epmutex);
}
fput(tfile);
error_fput:
fput(file);
error_return:
return error;
}
2.ep_insert()
//ep_insert()在epoll_ctl()中被呼叫, 完成往epollfd裡面新增一個監聽fd的工作
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
/*
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};
*/
// 增加監視檔案數
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches)) {
return -ENOSPC;
}
// 分配初始化 epi
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) {
return -ENOMEM;
}
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
// 初始化紅黑樹中的key
ep_set_ffd(&epi->ffd, tfile, fd);
// 直接複製使用者結構
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
// 初始化臨時的 epq
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 設定事件掩碼
epq.pt._key = event->events;
// 內部會呼叫ep_ptable_queue_proc, 在檔案對應的wait queue head 上
// 註冊回撥函式, 並返回當前檔案的狀態
revents = tfile->f_op->poll(tfile, &epq.pt);
// 檢查錯誤
error = -ENOMEM;
if (epi->nwait < 0) { // f_op->poll 過程出錯
goto error_unregister;
}
// 添加當前的epitem 到檔案的f_ep_links 連結串列
spin_lock(&tfile->f_lock);
list_add_tail(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);
// 插入epi 到rbtree
ep_rbtree_insert(ep, epi);
/* now check if we've created too many backpaths */
error = -EINVAL;
if (reverse_path_check()) {
goto error_remove_epi;
}
spin_lock_irqsave(&ep->lock, flags);
/* 檔案已經就緒插入到就緒連結串列rdllist */
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
if (waitqueue_active(&ep->wq))
// 通知sys_epoll_wait , 呼叫回撥函式喚醒sys_epoll_wait 程序
{
wake_up_locked(&ep->wq);
}
// 先不通知呼叫eventpoll_poll 的程序
if (waitqueue_active(&ep->poll_wait)) {
pwake++;
}
}
spin_unlock_irqrestore(&ep->lock, flags);
atomic_long_inc(&ep->user->epoll_watches);
if (pwake)
// 安全通知呼叫eventpoll_poll 的程序
{
ep_poll_safewake(&ep->poll_wait);
}
return 0;
error_remove_epi:
spin_lock(&tfile->f_lock);
// 刪除檔案上的 epi
if (ep_is_linked(&epi->fllink)) {
list_del_init(&epi->fllink);
}
spin_unlock(&tfile->f_lock);
// 從紅黑樹中刪除
rb_erase(&epi->rbn, &ep->rbr);
error_unregister:
// 從檔案的wait_queue 中刪除, 釋放epitem 關聯的所有eppoll_entry
ep_unregister_pollwait(ep, epi);
spin_lock_irqsave(&ep->lock, flags);
if (ep_is_linked(&epi->rdllink)) {
list_del_init(&epi->rdllink);
}
spin_unlock_irqrestore(&ep->lock, flags);
// 釋放epi
kmem_cache_free(epi_cache, epi);
return error;
}
3.ep_eventpoll_poll()
static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait)
{
int pollflags;
struct eventpoll *ep = file->private_data;
// 插入到wait_queue
poll_wait(file, &ep->poll_wait, wait);
// 掃描就緒的檔案列表, 呼叫每個檔案上的poll 檢測是否真的就緒,
// 然後複製到使用者空間
// 檔案列表中有可能有epoll檔案, 呼叫poll的時候有可能會產生遞迴,
// 呼叫所以用ep_call_nested 包裝一下, 防止死迴圈和過深的呼叫
pollflags = ep_call_nested(&poll_readywalk_ncalls, EP_MAX_NESTS,
ep_poll_readyevents_proc, ep, ep, current);
// static struct nested_calls poll_readywalk_ncalls;
return pollflags != -1 ? pollflags : 0;
}
4.poll_wait()
// 通用的poll_wait 函式, 檔案的f_ops->poll 通常會呼叫此函式
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address) {
// 呼叫_qproc 在wait_address 上新增節點和回撥函式
// 呼叫 poll_table_struct 上的函式指標向wait_address新增節點, 並設定節點的func
// (如果是select或poll 則是 __pollwait, 如果是 epoll 則是 ep_ptable_queue_proc),
p->_qproc(filp, wait_address, p);
}
}
5.ep_ptable_queue_proc()
/*
* 該函式在呼叫f_op->poll()時會被呼叫.
* 也就是epoll主動poll某個fd時, 用來將epitem與指定的fd關聯起來的.
* 關聯的辦法就是使用等待佇列(waitqueue)
*/
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
/* 初始化等待佇列, 指定ep_poll_callback為喚醒時的回撥函式,
* 當我們監聽的fd發生狀態改變時, 也就是佇列頭被喚醒時,
* 指定的回撥函式將會被呼叫. */
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
/* 將剛分配的等待佇列成員加入到頭中, 頭是由fd持有的 */
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
/* nwait記錄了當前epitem加入到了多少個等待佇列中,
* 我認為這個值最大也只會是1... */
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}
6.ep_poll_callback()
//回撥函式, 當我們監聽的fd發生狀態改變時, 它會被呼叫.
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
//從等待佇列獲取epitem.需要知道哪個程序掛載到這個裝置
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;//獲取
spin_lock_irqsave(&ep->lock, flags);
if (!(epi->event.events & ~EP_PRIVATE_BITS))
goto out_unlock;
/* 沒有我們關心的event... */
if (key && !((unsigned long) key & epi->event.events))
goto out_unlock;
/*
* 這裡看起來可能有點費解, 其實幹的事情比較簡單:
* 如果該callback被呼叫的同時, epoll_wait()已經返回了,
* 也就是說, 此刻應用程式有可能已經在迴圈獲取events,
* 這種情況下, 核心將此刻發生event的epitem用一個單獨的連結串列
* 鏈起來, 不發給應用程式, 也不丟棄, 而是在下一次epoll_wait
* 時返回給使用者.
*/
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi;
}
goto out_unlock;
}
/* 將當前的epitem放入ready list */
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
/* 喚醒epoll_wait... */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
/* 如果epollfd也在被poll, 那就喚醒佇列裡面的所有成員. */
if (waitqueue_active(&ep->poll_wait))
pwake++;
out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 1;
}
7.ep_remove()
static int ep_remove(struct eventpoll *ep, struct epitem *epi)
{
unsigned long flags;
struct file *file = epi->ffd.file;
/*
* Removes poll wait queue hooks. We _have_ to do this without holding
* the "ep->lock" otherwise a deadlock might occur. This because of the
* sequence of the lock acquisition. Here we do "ep->lock" then the wait
* queue head lock when unregistering the wait queue. The wakeup callback
* will run by holding the wait queue head lock and will call our callback
* that will try to get "ep->lock".
*/
ep_unregister_pollwait(ep, epi);
/* Remove the current item from the list of epoll hooks */
spin_lock(&file->f_lock);
if (ep_is_linked(&epi->fllink))
list_del_init(&epi->fllink);
spin_unlock(&file->f_lock);
rb_erase(&epi->rbn, &ep->rbr);
spin_lock_irqsave(&ep->lock, flags);
if (ep_is_linked(&epi->rdllink))
list_del_init(&epi->rdllink);
spin_unlock_irqrestore(&ep->lock, flags);
/* At this point it is safe to free the eventpoll item */
kmem_cache_free(epi_cache, epi);
atomic_long_dec(&ep->user->epoll_watches);
return 0;
}
8.ep_modify()
static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_event *event)
{
int pwake = 0;
unsigned int revents;
poll_table pt;
init_poll_funcptr(&pt, NULL);
/*
* Set the new event interest mask before calling f_op->poll();
* otherwise we might miss an event that happens between the
* f_op->poll() call and the new event set registering.
*/
epi->event.events = event->events;
pt._key = event->events;
epi->event.data = event->data; /* protected by mtx */
/*
* Get current event bits. We can safely use the file* here because
* its usage count has been increased by the caller of this function.
*/
revents = epi->ffd.file->f_op->poll(epi->ffd.file, &pt);
/*
* If the item is "hot" and it is not registered inside the ready
* list, push it inside.
*/
if (revents & event->events) {
spin_lock_irq(&ep->lock);
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irq(&ep->lock);
}
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 0;
}
epoll_wait()
1.epoll_wait()
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct file *file;
struct eventpoll *ep;
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
/* 這個地方有必要說明一下:
* 核心對應用程式採取的策略是"絕對不信任",
* 所以核心跟應用程式之間的資料互動大都是copy, 不允許(也時候也是不能...)指標引用.
* epoll_wait()需要核心返回資料給使用者空間, 記憶體由使用者程式提供,
* 所以核心會用一些手段來驗證這一段記憶體空間是不是有效的.
*/
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
error = -EFAULT;
goto error_return;
}
/* Get the "struct file *" for the eventpoll file */
error = -EBADF;
/* 獲取epollfd的struct file, epollfd也是檔案嘛 */
file = fget(epfd);
if (!file)
goto error_return;
error = -EINVAL;
/* 檢查一下它是不是一個真正的epollfd... */
if (!is_file_epoll(file))
goto error_fput;
/* 獲取eventpoll結構 */
ep = file->private_data;
/* 等待事件到來~~ */
error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fput(file);
error_return:
return error;
}
2.ep_poll()
/* 這個函式真正將執行epoll_wait的程序帶入睡眠狀態... */
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;//等待佇列
/* 計算睡覺時間, 毫秒要轉換為HZ */
jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
retry:
spin_lock_irqsave(&ep->lock, flags);
res = 0;
/* 如果ready list不為空, 就不睡了, 直接幹活... */
if (list_empty(&ep->rdllist)) {
/* OK, 初始化一個等待佇列, 準備直接把自己掛起,
* 注意current是一個巨集, 代表當前程序 */
init_waitqueue_entry(&wait, current);//初始化等待佇列,wait表示當前程序
__add_wait_queue_exclusive(&ep->wq, &wait);//掛載到ep結構的等待佇列
for (;;) {
/* 將當前程序設定位睡眠, 但是可以被訊號喚醒的狀態,
* 注意這個設定是"將來時", 我們此刻還沒睡! */
set_current_state(TASK_INTERRUPTIBLE);
/* 如果這個時候, ready list裡面有成員了,
* 或者睡眠時間已經過了, 就直接不睡了... */
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
/* 如果有訊號產生, 也起床... */
if (signal_pending(current)) {
res = -EINTR;
break;
}
/* 啥事都沒有,解鎖, 睡覺... */
spin_unlock_irqrestore(&ep->lock, flags);
/* jtimeout這個時間後, 會被喚醒,
* ep_poll_callback()如果此時被呼叫,
* 那麼我們就會直接被喚醒, 不用等時間了...
* 再次強調一下ep_poll_callback()的呼叫時機是由被監聽的fd
* 的具體實現, 比如socket或者某個裝置驅動來決定的,
* 因為等待佇列頭是他們持有的, epoll和當前程序
* 只是單純的等待...
**/
jtimeout = schedule_timeout(jtimeout);//睡覺
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);
/* OK 我們醒來了... */
set_current_state(TASK_RUNNING);
}
/* Is it worth to try to dig for events ? */
eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
spin_unlock_irqrestore(&ep->lock, flags);
/* 如果一切正常, 有event發生, 就開始準備資料copy給使用者空間了... */
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
}
3.ep_send_events()
//呼叫p_scan_ready_list()
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}
4.ep_scan_ready_list()
//由ep_send_events()呼叫本函式
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
mutex_lock(&ep->mtx);
spin_lock_irqsave(&ep->lock, flags);
/* 這一步要注意, 首先, 所有監聽到events的epitem都鏈到rdllist上了,
* 但是這一步之後, 所有的epitem都轉移到了txlist上, 而rdllist被清空了,
* 要注意哦, rdllist已經被清空了! */
list_splice_init(&ep->rdllist, &txlist);
/* ovflist, 在ep_poll_callback()裡面我解釋過, 此時此刻我們不希望
* 有新的event加入到ready list中了, 儲存後下次再處理... */
ep->ovflist = NULL;
spin_unlock_irqrestore(&ep->lock, flags);
/* 在這個回撥函式裡面處理每個epitem
* sproc 就是 ep_send_events_proc, 下面會註釋到. */
error = (*sproc)(ep, &txlist, priv);
spin_lock_irqsave(&ep->lock, flags);
/* 現在我們來處理ovflist, 這些epitem都是我們在傳遞資料給使用者空間時
* 監聽到了事件. */
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
/* 將這些直接放入readylist */
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
}
ep->ovflist = EP_UNACTIVE_PTR;
/* 上一次沒有處理完的epitem, 重新插入到ready list */
list_splice(&txlist, &ep->rdllist);
/* ready list不為空, 直接喚醒... */
if (!list_empty(&ep->rdllist)) {
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
mutex_unlock(&ep->mtx);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return error;
}
其他函式
1.ep_send_events_proc()
/* 該函式作為callbakc在ep_scan_ready_list()中被呼叫
* head是一個連結串列, 包含了已經ready的epitem,
* 這個不是eventpoll裡面的ready list, 而是上面函式中的txlist.
*/
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct ep_send_events_data *esed = priv;
int eventcnt;
unsigned int revents;
struct epitem *epi;
struct epoll_event __user *uevent;
/* 掃描整個連結串列... */
for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents;) {
/* 取出第一個成員 */
epi = list_first_entry(head, struct epitem, rdllink);
/* 然後從連結串列裡面移除 */
list_del_init(&epi->rdllink);
/* 讀取events,
* 注意events我們ep_poll_callback()裡面已經取過一次了, 為啥還要再取?
* 1. 我們當然希望能拿到此刻的最新資料, events是會變的~
* 2. 不是所有的poll實現, 都通過等待佇列傳遞了events, 有可能某些驅動壓根沒傳
* 必須主動去讀取. */
revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
epi->event.events;
if (revents) {
/* 將當前的事件和使用者傳入的資料都copy給使用者空間,
* 就是epoll_wait()後應用程式能讀到的那一堆資料. */
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET)) {
/* 嘿嘿, EPOLLET和非ET的區別就在這一步之差呀~
* 如果是ET, epitem是不會再進入到readly list,
* 除非fd再次發生了狀態改變, ep_poll_callback被呼叫.
* 如果是非ET, 不管你還有沒有有效的事件或者資料,
* 都會被重新插入到ready list, 再下一次epoll_wait
* 時, 會立即返回, 並通知給使用者空間. 當然如果這個
* 被監聽的fds確實沒事件也沒資料了, epoll_wait會返回一個0,
* 空轉一次.
*/
list_add_tail(&epi->rdllink, &ep->rdllist);
}
}
}
return eventcnt;
}
2.ep_free()
/* ep_free在epollfd被close時呼叫,
* 釋放一些資源而已, 比較簡單 */
static void ep_free(struct eventpoll *ep)
{
struct rb_node *rbp;
struct epitem *epi;
/* We need to release all tasks waiting for these file */
if (waitqueue_active(&ep->poll_wait))
ep_poll_safewake(&ep->poll_wait);
mutex_lock(&epmutex);
for (rbp = rb_first(&ep->rbr); rbp; rbp = rb_next(rbp)) {
epi = rb_entry(rbp, struct epitem, rbn);
ep_unregister_pollwait(ep, epi);
}
/* 之所以在關閉epollfd之前不需要呼叫epoll_ctl移除已經新增的fd,
* 是因為這裡已經做了... */
while ((rbp = rb_first(&ep->rbr)) != NULL) {
epi = rb_entry(rbp, struct epitem, rbn);
ep_remove(ep, epi);
}
mutex_unlock(&epmutex);
mutex_destroy(&ep->mtx);
free_uid(ep->user);
kfree(ep);
}
函式的關係調用圖
函式主要功能
1.epoll_create
從slab快取中建立一個eventpoll物件,並且建立一個匿名的fd跟fd對應的file物件,而eventpoll物件儲存在struct file結構的private指標中,並且返回,
該fd對應的file operations只是實現了poll跟release操作,建立eventpoll物件的初始化操作
獲取當前使用者資訊,是不是root,最大監聽fd數目等並且儲存到eventpoll物件中初始化等待佇列,初始化就緒連結串列,初始化紅黑樹的頭結點
2.epoll_ctl
將epoll_event結構拷貝到核心空間中,並且判斷加入的fd是否支援poll結(epoll,poll,selectI/O多路複用必須支援poll操作).
從epfd->file->privatedata獲取event_poll物件,根據op區分是新增刪除還是修改,
首先在eventpoll結構中的紅黑樹查詢是否已經存在了相對應的fd,沒找到就支援插入操作,否則報重複的錯誤,還有修改,刪除操作。
插入操作時,會建立一個與fd對應的epitem結構,並且初始化相關成員,並指定呼叫poll_wait時的回撥函式用於資料就緒時喚醒程序,(其內部,初始化裝置的等待佇列,將該程序註冊到等待佇列)完成這一步,
epitem就跟這個socket關聯起來了, 當它有狀態變化時,會通過ep_poll_callback()來通知.
最後呼叫加入的fd的fileoperation->poll函式(最後會呼叫poll_wait操作)用於完註冊操作,將epitem結構新增到紅黑樹中。
3.epoll_wait
計算睡眠時間(如果有),判斷eventpoll物件的連結串列是否為空,不為空那就幹活不睡明.並且初始化一個等待佇列,把自己掛上去,設定自己的程序狀態
若是可睡眠狀態.判斷是否有訊號到來(有的話直接被中斷醒來,),如果沒有那就呼叫schedule_timeout進行睡眠,
如果超時或者被喚醒,首先從自己初始化的等待佇列刪除,然後開始拷貝資源給使用者空間了
拷貝資源則是先把就緒事件連結串列轉移到中間連結串列,然後挨個遍歷拷貝到使用者空間,並且挨個判斷其是否為水平觸發,是的話再次插入到就緒連結串列
疑問
1.epoll_create中的size引數有什麼作用?
答:size這個引數其實沒有任何用處,它只是為了保持相容,因為之前的fd使用hash表儲存,size表示hash表的大小,而現在使用紅黑樹儲存,所以size就沒用了。
2.LT和ET的區別(原始碼級別)?
答:在原始碼中,兩種模式的區別是一個if判斷語句,通過ep_send_events_proc()函式實現,如果沒有標上EPOLLET(即預設的LT)且“事件被關注”的fd就會被重新放回了rdllist。那麼下次epoll_wait當然會又把rdllist裡的fd拿來拷給使用者了。
本人才疏學淺,若有錯,請指出,謝謝!
如果你有更好的建議,可以留言我們一起討論,共同進步!
衷心的感謝您能耐心的讀完本篇博文!