1. 程式人生 > >【Linux深入】epoll原始碼剖析

【Linux深入】epoll原始碼剖析

引入

  • 之前講了select、poll、epoll的區別,由於許多應用中都用到了epoll,例如Netty、Redis等等,所以就來深入學習一下,現在我們就來剖析一下epoll的原始碼

  • 我先來剖析理解epoll原始碼的基礎:主要的資料結構,然後再來解析epoll主要的三個方法:epoll_create()、epoll_ctl()、epoll_wait()。

主要的資料結構

1.eventpoll

// epoll的核心實現對應於一個epoll描述符  
struct eventpoll {  
    spinlock_t lock;  
    struct mutex mtx;  
    wait_queue_head_t wq; // sys_epoll_wait() 等待在這裡  
// f_op->poll() 使用的, 被其他事件通知機制利用的wait_address wait_queue_head_t poll_wait; //已就緒的需要檢查的epitem 列表 struct list_head rdllist; //儲存所有加入到當前epoll的檔案對應的epitem struct rb_root rbr; // 當正在向用戶空間複製資料時, 產生的可用檔案 struct epitem *ovflist; /* The user that created the eventpoll descriptor */
struct user_struct *user; struct file *file; //優化迴圈檢查,避免迴圈檢查中重複的遍歷 int visited; struct list_head visited_list_link; }

2.epitem

// 對應於一個加入到epoll的檔案  
struct epitem {  
    // 掛載到eventpoll 的紅黑樹節點  
    struct rb_node rbn;  
    // 掛載到eventpoll.rdllist 的節點  
    struct list_head rdllink;  
    // 連線到ovflist 的指標  
struct epitem *next; /* 檔案描述符資訊fd + file, 紅黑樹的key */ struct epoll_filefd ffd; /* Number of active wait queue attached to poll operations */ int nwait; // 當前檔案的等待佇列(eppoll_entry)列表 // 同一個檔案上可能會監視多種事件, // 這些事件可能屬於不同的wait_queue中 // (取決於對應檔案型別的實現), // 所以需要使用連結串列 struct list_head pwqlist; // 當前epitem 的所有者 struct eventpoll *ep; /* List header used to link this item to the "struct file" items list */ struct list_head fllink; /* epoll_ctl 傳入的使用者資料 */ struct epoll_event event; };

3.eppoll_entry

// 與一個檔案上的一個wait_queue_head 相關聯,因為同一檔案可能有多個等待的事件,
//這些事件可能使用不同的等待佇列  
struct eppoll_entry {  
    // List struct epitem.pwqlist  
    struct list_head llink;  
    // 所有者  
    struct epitem *base;  
    // 新增到wait_queue 中的節點  
    wait_queue_t wait;  
    // 檔案wait_queue 頭  
    wait_queue_head_t *whead;  
}; 

epoll_create()

1.epoll_create()

//先進行判斷size是否>=0,若是則直接呼叫epoll_create1
SYSCALL_DEFINE1(epoll_create, int, size)
{
        if (size <= 0)
                return -EINVAL;
        return sys_epoll_create1(0);
}

注:SYSCALL_DEFINE1是一個巨集,用於定義有一個引數的系統呼叫函式,上述巨集展開後即成為: int sys_epoll_create(int size),這就是epoll_create系統呼叫的入口。至於為何要用巨集而不是直接宣告,主要是因為系統呼叫的引數個數、傳參方式都有嚴格限制,最多六個引數,

2.epoll_create1()

/* 這才是真正的epoll_create啊~~ */
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    int error;
    struct eventpoll *ep = NULL;//主描述符
    /* Check the EPOLL_* constant for consistency.  */
    /* 這句沒啥用處... */
    BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
    /* 對於epoll來講, 目前唯一有效的FLAG就是CLOEXEC */
    if (flags & ~EPOLL_CLOEXEC)
        return -EINVAL;
    /*
     * Create the internal data structure ("struct eventpoll").
     */
    /* 分配一個struct eventpoll, 分配和初始化細節我們隨後深聊~ */
    error = ep_alloc(&ep);
    if (error < 0)
        return error;
    /*
     * Creates all the items needed to setup an eventpoll file. That is,
     * a file structure and a free file descriptor.
     */
    /* 這裡是建立一個匿名fd, 說起來就話長了...長話短說:
     * epollfd本身並不存在一個真正的檔案與之對應, 所以核心需要建立一個
     * "虛擬"的檔案, 併為之分配真正的struct file結構, 而且有真正的fd.
     * 這裡2個引數比較關鍵:
     * eventpoll_fops, fops就是file operations, 就是當你對這個檔案(這裡是虛擬的)進行操作(比如讀)時,
     * fops裡面的函式指標指向真正的操作實現, 類似C++裡面虛擬函式和子類的概念.
     * epoll只實現了poll和release(就是close)操作, 其它檔案系統操作都有VFS全權處理了.
     * ep, ep就是struct epollevent, 它會作為一個私有資料儲存在struct file的private指標裡面.
     * 其實說白了, 就是為了能通過fd找到struct file, 通過struct file能找到eventpoll結構.
     * 如果懂一點Linux下字元裝置驅動開發, 這裡應該是很好理解的,
     * 推薦閱讀 <Linux device driver 3rd>
     */
    error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
                 O_RDWR | (flags & O_CLOEXEC));
    if (error < 0)
        ep_free(ep);
    return error;
}

3.eventpoll_init

// epoll 檔案系統的相關實現  
// epoll 檔案系統初始化, 在系統啟動時會呼叫  

static int __init eventpoll_init(void)  
{  
    struct sysinfo si;  

    si_meminfo(&si);  
    // 限制可新增到epoll的最多的描述符數量  

    max_user_watches = (((si.totalram - si.totalhigh) / 25) << PAGE_SHIFT) /  
                       EP_ITEM_COST;  
    BUG_ON(max_user_watches < 0);  

    // 初始化遞迴檢查佇列  
   ep_nested_calls_init(&poll_loop_ncalls);  
    ep_nested_calls_init(&poll_safewake_ncalls);  
    ep_nested_calls_init(&poll_readywalk_ncalls);  
    // epoll 使用的slab分配器分別用來分配epitem和eppoll_entry  
    epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),  
                                  0, SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);  
    pwq_cache = kmem_cache_create("eventpoll_pwq",  
                                  sizeof(struct eppoll_entry), 0, SLAB_PANIC, NULL);  

    return 0;  
}  

epoll_ctl()

1.epoll_crl()

//建立好epollfd後, 接下來新增fd
//epoll_ctl的引數:epfd 表示epollfd;op 有ADD,MOD,DEL,
//fd 是需要監聽的描述符,event 我們感興趣的events
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,  
                struct epoll_event __user *, event)  
{  
    int error;  
    int did_lock_epmutex = 0;  
    struct file *file, *tfile;  
    struct eventpoll *ep;  
    struct epitem *epi;  
    struct epoll_event epds;  

    error = -EFAULT;  
    //錯誤處理以及從使用者空間將epoll_event結構copy到核心空間.
    if (ep_op_has_event(op) &&  
            // 複製使用者空間資料到核心  
            copy_from_user(&epds, event, sizeof(struct epoll_event))) {  
        goto error_return;  
    }  

    // 取得 epfd 對應的檔案  
    error = -EBADF;  
    file = fget(epfd);  
    if (!file) {  
        goto error_return;  
    }  

    // 取得目標檔案  
    tfile = fget(fd);  
    if (!tfile) {  
        goto error_fput;  
    }  

    // 目標檔案必須提供 poll 操作  
    error = -EPERM;  
    if (!tfile->f_op || !tfile->f_op->poll) {  
        goto error_tgt_fput;  
    }  

    // 新增自身或epfd 不是epoll 控制代碼  
    error = -EINVAL;  
    if (file == tfile || !is_file_epoll(file)) {  
        goto error_tgt_fput;  
    }  

    // 取得內部結構eventpoll  
    ep = file->private_data;  

    // EPOLL_CTL_MOD 不需要加全域性鎖 epmutex  
    if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_DEL) {  
        mutex_lock(&epmutex);  
        did_lock_epmutex = 1;  
    }  
    if (op == EPOLL_CTL_ADD) {  
        if (is_file_epoll(tfile)) {  
            error = -ELOOP;  
            // 目標檔案也是epoll 檢測是否有迴圈包含的問題  
            if (ep_loop_check(ep, tfile) != 0) {  
                goto error_tgt_fput;  
            }  
        } else  
        {  
            // 將目標檔案新增到 epoll 全域性的tfile_check_list 中  
            list_add(&tfile->f_tfile_llink, &tfile_check_list);  
        }  
    }  

    mutex_lock_nested(&ep->mtx, 0);  

    // 以tfile 和fd 為key 在rbtree 中查詢檔案對應的epitem  
    epi = ep_find(ep, tfile, fd);  

    error = -EINVAL;  
    switch (op) {  
    case EPOLL_CTL_ADD:  
        if (!epi) {  
            // 沒找到, 新增額外新增ERR HUP 事件  
            epds.events |= POLLERR | POLLHUP;  
            error = ep_insert(ep, &epds, tfile, fd);  
        } else {  
            error = -EEXIST;  
        }  
        // 清空檔案檢查列表  
        clear_tfile_check_list();  
        break;  
    case EPOLL_CTL_DEL:  
        if (epi) {  
            error = ep_remove(ep, epi);  
        } else {  
            error = -ENOENT;  
        }  
        break;  
    case EPOLL_CTL_MOD:  
        if (epi) {  
            epds.events |= POLLERR | POLLHUP;  
            error = ep_modify(ep, epi, &epds);  
        } else {  
            error = -ENOENT;  
        }  
        break;  
    }  
    mutex_unlock(&ep->mtx);  

error_tgt_fput:  
    if (did_lock_epmutex) {  
        mutex_unlock(&epmutex);  
    }  

    fput(tfile);  
error_fput:  
    fput(file);  
error_return:  

    return error;  
}  

2.ep_insert()

//ep_insert()在epoll_ctl()中被呼叫, 完成往epollfd裡面新增一個監聽fd的工作
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,  
                     struct file *tfile, int fd)  
{  
    int error, revents, pwake = 0;  
    unsigned long flags;  
    long user_watches;  
    struct epitem *epi;  
    struct ep_pqueue epq;  
    /* 
    struct ep_pqueue { 
        poll_table pt; 
        struct epitem *epi; 
    }; 
    */  

    // 增加監視檔案數  
    user_watches = atomic_long_read(&ep->user->epoll_watches);  
    if (unlikely(user_watches >= max_user_watches)) {  
        return -ENOSPC;  
    }  

    // 分配初始化 epi  
    if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) {  
        return -ENOMEM;  
    }  

    INIT_LIST_HEAD(&epi->rdllink);  
    INIT_LIST_HEAD(&epi->fllink);  
    INIT_LIST_HEAD(&epi->pwqlist);  
    epi->ep = ep;  
    // 初始化紅黑樹中的key  
    ep_set_ffd(&epi->ffd, tfile, fd);  
    // 直接複製使用者結構  
    epi->event = *event;  
    epi->nwait = 0;  
    epi->next = EP_UNACTIVE_PTR;  

    // 初始化臨時的 epq  
    epq.epi = epi;  
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);  
    // 設定事件掩碼  
    epq.pt._key = event->events;  
    //  內部會呼叫ep_ptable_queue_proc, 在檔案對應的wait queue head 上  
    // 註冊回撥函式, 並返回當前檔案的狀態  
    revents = tfile->f_op->poll(tfile, &epq.pt);  

    // 檢查錯誤  
    error = -ENOMEM;  
    if (epi->nwait < 0) { // f_op->poll 過程出錯  
        goto error_unregister;  
    }  
    // 添加當前的epitem 到檔案的f_ep_links 連結串列  
    spin_lock(&tfile->f_lock);  
    list_add_tail(&epi->fllink, &tfile->f_ep_links);  
    spin_unlock(&tfile->f_lock);  

    // 插入epi 到rbtree  
    ep_rbtree_insert(ep, epi);  

    /* now check if we've created too many backpaths */  
    error = -EINVAL;  
    if (reverse_path_check()) {  
        goto error_remove_epi;  
    }  

    spin_lock_irqsave(&ep->lock, flags);  

    /* 檔案已經就緒插入到就緒連結串列rdllist */  
    if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {  
        list_add_tail(&epi->rdllink, &ep->rdllist);  


        if (waitqueue_active(&ep->wq))  
            // 通知sys_epoll_wait , 呼叫回撥函式喚醒sys_epoll_wait 程序  
        {  
            wake_up_locked(&ep->wq);  
        }  
        // 先不通知呼叫eventpoll_poll 的程序  
        if (waitqueue_active(&ep->poll_wait)) {  
            pwake++;  
        }  
    }  

    spin_unlock_irqrestore(&ep->lock, flags);  

    atomic_long_inc(&ep->user->epoll_watches);  

    if (pwake)  
        // 安全通知呼叫eventpoll_poll 的程序  
    {  
        ep_poll_safewake(&ep->poll_wait);  
    }  

    return 0;  

    error_remove_epi:  
    spin_lock(&tfile->f_lock);  
    // 刪除檔案上的 epi  
    if (ep_is_linked(&epi->fllink)) {  
        list_del_init(&epi->fllink);  
    }  
    spin_unlock(&tfile->f_lock);  

    // 從紅黑樹中刪除  
    rb_erase(&epi->rbn, &ep->rbr);  

    error_unregister:  
    // 從檔案的wait_queue 中刪除, 釋放epitem 關聯的所有eppoll_entry  
    ep_unregister_pollwait(ep, epi);  

    spin_lock_irqsave(&ep->lock, flags);  
    if (ep_is_linked(&epi->rdllink)) {  
        list_del_init(&epi->rdllink);  
    }  
    spin_unlock_irqrestore(&ep->lock, flags);  

    // 釋放epi  
    kmem_cache_free(epi_cache, epi);  

    return error;  
}

3.ep_eventpoll_poll()

static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait)  
{  
    int pollflags;  
    struct eventpoll *ep = file->private_data;  
    // 插入到wait_queue  
    poll_wait(file, &ep->poll_wait, wait);  
    // 掃描就緒的檔案列表, 呼叫每個檔案上的poll 檢測是否真的就緒,  
    // 然後複製到使用者空間  
    // 檔案列表中有可能有epoll檔案, 呼叫poll的時候有可能會產生遞迴,  
    // 呼叫所以用ep_call_nested 包裝一下, 防止死迴圈和過深的呼叫  
    pollflags = ep_call_nested(&poll_readywalk_ncalls, EP_MAX_NESTS,  
                               ep_poll_readyevents_proc, ep, ep, current);  
    // static struct nested_calls poll_readywalk_ncalls;  
    return pollflags != -1 ? pollflags : 0;  
}  

4.poll_wait()

// 通用的poll_wait 函式, 檔案的f_ops->poll 通常會呼叫此函式  
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)  
{  
    if (p && p->_qproc && wait_address) {  
        // 呼叫_qproc 在wait_address 上新增節點和回撥函式  
        // 呼叫 poll_table_struct 上的函式指標向wait_address新增節點, 並設定節點的func  
        // (如果是select或poll 則是 __pollwait, 如果是 epoll 則是 ep_ptable_queue_proc),  
        p->_qproc(filp, wait_address, p);  
    }  
}

5.ep_ptable_queue_proc()

/* 
 * 該函式在呼叫f_op->poll()時會被呼叫.
 * 也就是epoll主動poll某個fd時, 用來將epitem與指定的fd關聯起來的.
 * 關聯的辦法就是使用等待佇列(waitqueue)
 */
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                 poll_table *pt)
{
    struct epitem *epi = ep_item_from_epqueue(pt);
    struct eppoll_entry *pwq;
    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
        /* 初始化等待佇列, 指定ep_poll_callback為喚醒時的回撥函式,
         * 當我們監聽的fd發生狀態改變時, 也就是佇列頭被喚醒時,
         * 指定的回撥函式將會被呼叫. */
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
        pwq->whead = whead;
        pwq->base = epi;
        /* 將剛分配的等待佇列成員加入到頭中, 頭是由fd持有的 */
        add_wait_queue(whead, &pwq->wait);
        list_add_tail(&pwq->llink, &epi->pwqlist);
        /* nwait記錄了當前epitem加入到了多少個等待佇列中,
         * 我認為這個值最大也只會是1... */
        epi->nwait++;
    } else {
        /* We have to signal that an error occurred */
        epi->nwait = -1;
    }
}

6.ep_poll_callback()

//回撥函式, 當我們監聽的fd發生狀態改變時, 它會被呼叫.
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
    int pwake = 0;
    unsigned long flags;
    //從等待佇列獲取epitem.需要知道哪個程序掛載到這個裝置
    struct epitem *epi = ep_item_from_wait(wait);
    struct eventpoll *ep = epi->ep;//獲取
    spin_lock_irqsave(&ep->lock, flags);

    if (!(epi->event.events & ~EP_PRIVATE_BITS))
        goto out_unlock;

    /* 沒有我們關心的event... */
    if (key && !((unsigned long) key & epi->event.events))
        goto out_unlock;

    /* 
     * 這裡看起來可能有點費解, 其實幹的事情比較簡單:
     * 如果該callback被呼叫的同時, epoll_wait()已經返回了,
     * 也就是說, 此刻應用程式有可能已經在迴圈獲取events,
     * 這種情況下, 核心將此刻發生event的epitem用一個單獨的連結串列
     * 鏈起來, 不發給應用程式, 也不丟棄, 而是在下一次epoll_wait
     * 時返回給使用者.
     */
    if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
        if (epi->next == EP_UNACTIVE_PTR) {
            epi->next = ep->ovflist;
            ep->ovflist = epi;
        }
        goto out_unlock;
    }

    /* 將當前的epitem放入ready list */
    if (!ep_is_linked(&epi->rdllink))
        list_add_tail(&epi->rdllink, &ep->rdllist);

    /* 喚醒epoll_wait... */
    if (waitqueue_active(&ep->wq))
        wake_up_locked(&ep->wq);
    /* 如果epollfd也在被poll, 那就喚醒佇列裡面的所有成員. */
    if (waitqueue_active(&ep->poll_wait))
        pwake++;
        out_unlock:
    spin_unlock_irqrestore(&ep->lock, flags);

    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);
    return 1;
}

7.ep_remove()

static int ep_remove(struct eventpoll *ep, struct epitem *epi)  
{  
    unsigned long flags;  
    struct file *file = epi->ffd.file;  

    /* 
     * Removes poll wait queue hooks. We _have_ to do this without holding 
     * the "ep->lock" otherwise a deadlock might occur. This because of the 
     * sequence of the lock acquisition. Here we do "ep->lock" then the wait 
     * queue head lock when unregistering the wait queue. The wakeup callback 
     * will run by holding the wait queue head lock and will call our callback 
     * that will try to get "ep->lock". 
     */  
    ep_unregister_pollwait(ep, epi);  

    /* Remove the current item from the list of epoll hooks */  
    spin_lock(&file->f_lock);  
    if (ep_is_linked(&epi->fllink))  
        list_del_init(&epi->fllink);  
    spin_unlock(&file->f_lock);  

    rb_erase(&epi->rbn, &ep->rbr);  

    spin_lock_irqsave(&ep->lock, flags);  
    if (ep_is_linked(&epi->rdllink))  
        list_del_init(&epi->rdllink);  
    spin_unlock_irqrestore(&ep->lock, flags);  

    /* At this point it is safe to free the eventpoll item */  
    kmem_cache_free(epi_cache, epi);  

    atomic_long_dec(&ep->user->epoll_watches);  

    return 0;  
}  

8.ep_modify()

static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_event *event)  
{  
    int pwake = 0;  
    unsigned int revents;  
    poll_table pt;  

    init_poll_funcptr(&pt, NULL);  

    /* 
     * Set the new event interest mask before calling f_op->poll(); 
     * otherwise we might miss an event that happens between the 
     * f_op->poll() call and the new event set registering. 
     */  
    epi->event.events = event->events;  
    pt._key = event->events;  
    epi->event.data = event->data; /* protected by mtx */  

    /* 
     * Get current event bits. We can safely use the file* here because 
     * its usage count has been increased by the caller of this function. 
     */  
    revents = epi->ffd.file->f_op->poll(epi->ffd.file, &pt);  

    /* 
     * If the item is "hot" and it is not registered inside the ready 
     * list, push it inside. 
     */  
    if (revents & event->events) {  
        spin_lock_irq(&ep->lock);  
        if (!ep_is_linked(&epi->rdllink)) {  
            list_add_tail(&epi->rdllink, &ep->rdllist);  

            /* Notify waiting tasks that events are available */  
            if (waitqueue_active(&ep->wq))  
                wake_up_locked(&ep->wq);  
            if (waitqueue_active(&ep->poll_wait))  
                pwake++;  
        }  
        spin_unlock_irq(&ep->lock);  
    }  

    /* We have to call this outside the lock */  
    if (pwake)  
        ep_poll_safewake(&ep->poll_wait);  

    return 0;  
}  

epoll_wait()

1.epoll_wait()

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
        int, maxevents, int, timeout)
{
    int error;
    struct file *file;
    struct eventpoll *ep;
    /* The maximum number of event must be greater than zero */
    if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
        return -EINVAL;
    /* Verify that the area passed by the user is writeable */
    /* 這個地方有必要說明一下:
     * 核心對應用程式採取的策略是"絕對不信任",
     * 所以核心跟應用程式之間的資料互動大都是copy, 不允許(也時候也是不能...)指標引用.
     * epoll_wait()需要核心返回資料給使用者空間, 記憶體由使用者程式提供,
     * 所以核心會用一些手段來驗證這一段記憶體空間是不是有效的.
     */
    if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
        error = -EFAULT;
        goto error_return;
    }
    /* Get the "struct file *" for the eventpoll file */
    error = -EBADF;
    /* 獲取epollfd的struct file, epollfd也是檔案嘛 */
    file = fget(epfd);
    if (!file)
        goto error_return;

    error = -EINVAL;
    /* 檢查一下它是不是一個真正的epollfd... */
    if (!is_file_epoll(file))
        goto error_fput;

    /* 獲取eventpoll結構 */
    ep = file->private_data;

    /* 等待事件到來~~ */
    error = ep_poll(ep, events, maxevents, timeout);
    error_fput:
    fput(file);
    error_return:
    return error;
}

2.ep_poll()

/* 這個函式真正將執行epoll_wait的程序帶入睡眠狀態... */
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
           int maxevents, long timeout)
{
    int res, eavail;
    unsigned long flags;
    long jtimeout;
    wait_queue_t wait;//等待佇列

    /* 計算睡覺時間, 毫秒要轉換為HZ */
    jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
        MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
retry:
    spin_lock_irqsave(&ep->lock, flags);
    res = 0;
    /* 如果ready list不為空, 就不睡了, 直接幹活... */
    if (list_empty(&ep->rdllist)) {

        /* OK, 初始化一個等待佇列, 準備直接把自己掛起,
         * 注意current是一個巨集, 代表當前程序 */
        init_waitqueue_entry(&wait, current);//初始化等待佇列,wait表示當前程序
        __add_wait_queue_exclusive(&ep->wq, &wait);//掛載到ep結構的等待佇列
        for (;;) {
            /* 將當前程序設定位睡眠, 但是可以被訊號喚醒的狀態,
             * 注意這個設定是"將來時", 我們此刻還沒睡! */
            set_current_state(TASK_INTERRUPTIBLE);
            /* 如果這個時候, ready list裡面有成員了,
             * 或者睡眠時間已經過了, 就直接不睡了... */
            if (!list_empty(&ep->rdllist) || !jtimeout)
                break;
            /* 如果有訊號產生, 也起床... */
            if (signal_pending(current)) {
                res = -EINTR;
                break;
            }
            /* 啥事都沒有,解鎖, 睡覺... */
            spin_unlock_irqrestore(&ep->lock, flags);
            /* jtimeout這個時間後, 會被喚醒,
             * ep_poll_callback()如果此時被呼叫,
             * 那麼我們就會直接被喚醒, 不用等時間了... 
             * 再次強調一下ep_poll_callback()的呼叫時機是由被監聽的fd
             * 的具體實現, 比如socket或者某個裝置驅動來決定的,
             * 因為等待佇列頭是他們持有的, epoll和當前程序
             * 只是單純的等待...
             **/
            jtimeout = schedule_timeout(jtimeout);//睡覺
            spin_lock_irqsave(&ep->lock, flags);
        }
        __remove_wait_queue(&ep->wq, &wait);
        /* OK 我們醒來了... */
        set_current_state(TASK_RUNNING);
    }
    /* Is it worth to try to dig for events ? */
    eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
    spin_unlock_irqrestore(&ep->lock, flags);

    /* 如果一切正常, 有event發生, 就開始準備資料copy給使用者空間了... */
    if (!res && eavail &&
        !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
        goto retry;
    return res;
}

3.ep_send_events()

//呼叫p_scan_ready_list()
static int ep_send_events(struct eventpoll *ep,
              struct epoll_event __user *events, int maxevents)
{
    struct ep_send_events_data esed;
    esed.maxevents = maxevents;
    esed.events = events;
    return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}

4.ep_scan_ready_list()

//由ep_send_events()呼叫本函式
static int ep_scan_ready_list(struct eventpoll *ep,
                  int (*sproc)(struct eventpoll *,
                       struct list_head *, void *),
                  void *priv)
{
    int error, pwake = 0;
    unsigned long flags;
    struct epitem *epi, *nepi;
    LIST_HEAD(txlist);

    mutex_lock(&ep->mtx);

    spin_lock_irqsave(&ep->lock, flags);
    /* 這一步要注意, 首先, 所有監聽到events的epitem都鏈到rdllist上了,
     * 但是這一步之後, 所有的epitem都轉移到了txlist上, 而rdllist被清空了,
     * 要注意哦, rdllist已經被清空了! */
    list_splice_init(&ep->rdllist, &txlist);
    /* ovflist, 在ep_poll_callback()裡面我解釋過, 此時此刻我們不希望
     * 有新的event加入到ready list中了, 儲存後下次再處理... */
    ep->ovflist = NULL;
    spin_unlock_irqrestore(&ep->lock, flags);

    /* 在這個回撥函式裡面處理每個epitem
     * sproc 就是 ep_send_events_proc, 下面會註釋到. */
    error = (*sproc)(ep, &txlist, priv);
    spin_lock_irqsave(&ep->lock, flags);

    /* 現在我們來處理ovflist, 這些epitem都是我們在傳遞資料給使用者空間時
     * 監聽到了事件. */
    for (nepi = ep->ovflist; (epi = nepi) != NULL;
         nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {

        /* 將這些直接放入readylist */
        if (!ep_is_linked(&epi->rdllink))
            list_add_tail(&epi->rdllink, &ep->rdllist);
    }

    ep->ovflist = EP_UNACTIVE_PTR;

    /* 上一次沒有處理完的epitem, 重新插入到ready list */
    list_splice(&txlist, &ep->rdllist);
    /* ready list不為空, 直接喚醒... */
    if (!list_empty(&ep->rdllist)) {

        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }
    spin_unlock_irqrestore(&ep->lock, flags);
    mutex_unlock(&ep->mtx);
    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);
    return error;
}

其他函式

1.ep_send_events_proc()

/* 該函式作為callbakc在ep_scan_ready_list()中被呼叫
 * head是一個連結串列, 包含了已經ready的epitem,
 * 這個不是eventpoll裡面的ready list, 而是上面函式中的txlist.
 */
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
                   void *priv)
{
    struct ep_send_events_data *esed = priv;
    int eventcnt;
    unsigned int revents;
    struct epitem *epi;
    struct epoll_event __user *uevent;

    /* 掃描整個連結串列... */
    for (eventcnt = 0, uevent = esed->events;
         !list_empty(head) && eventcnt < esed->maxevents;) {
        /* 取出第一個成員 */
        epi = list_first_entry(head, struct epitem, rdllink);
        /* 然後從連結串列裡面移除 */
        list_del_init(&epi->rdllink);
        /* 讀取events, 
         * 注意events我們ep_poll_callback()裡面已經取過一次了, 為啥還要再取?
         * 1. 我們當然希望能拿到此刻的最新資料, events是會變的~
         * 2. 不是所有的poll實現, 都通過等待佇列傳遞了events, 有可能某些驅動壓根沒傳
         * 必須主動去讀取. */
        revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
            epi->event.events;
        if (revents) {
            /* 將當前的事件和使用者傳入的資料都copy給使用者空間,
             * 就是epoll_wait()後應用程式能讀到的那一堆資料. */
            if (__put_user(revents, &uevent->events) ||
                __put_user(epi->event.data, &uevent->data)) {
                list_add(&epi->rdllink, head);
                return eventcnt ? eventcnt : -EFAULT;
            }
            eventcnt++;
            uevent++;
            if (epi->event.events & EPOLLONESHOT)
                epi->event.events &= EP_PRIVATE_BITS;
            else if (!(epi->event.events & EPOLLET)) {
                /* 嘿嘿, EPOLLET和非ET的區別就在這一步之差呀~
                 * 如果是ET, epitem是不會再進入到readly list,
                 * 除非fd再次發生了狀態改變, ep_poll_callback被呼叫.
                 * 如果是非ET, 不管你還有沒有有效的事件或者資料,
                 * 都會被重新插入到ready list, 再下一次epoll_wait
                 * 時, 會立即返回, 並通知給使用者空間. 當然如果這個
                 * 被監聽的fds確實沒事件也沒資料了, epoll_wait會返回一個0,
                 * 空轉一次.
                 */
                list_add_tail(&epi->rdllink, &ep->rdllist);
            }
        }
    }
    return eventcnt;
}

2.ep_free()

/* ep_free在epollfd被close時呼叫,
 * 釋放一些資源而已, 比較簡單 */
static void ep_free(struct eventpoll *ep)
{
    struct rb_node *rbp;
    struct epitem *epi;
    /* We need to release all tasks waiting for these file */
    if (waitqueue_active(&ep->poll_wait))
        ep_poll_safewake(&ep->poll_wait);

    mutex_lock(&epmutex);

    for (rbp = rb_first(&ep->rbr); rbp; rbp = rb_next(rbp)) {
        epi = rb_entry(rbp, struct epitem, rbn);
        ep_unregister_pollwait(ep, epi);
    }

    /* 之所以在關閉epollfd之前不需要呼叫epoll_ctl移除已經新增的fd,
     * 是因為這裡已經做了... */
    while ((rbp = rb_first(&ep->rbr)) != NULL) {
        epi = rb_entry(rbp, struct epitem, rbn);
        ep_remove(ep, epi);
    }
    mutex_unlock(&epmutex);
    mutex_destroy(&ep->mtx);
    free_uid(ep->user);
    kfree(ep);
}

函式的關係調用圖

這裡寫圖片描述

函式主要功能

1.epoll_create

  • 從slab快取中建立一個eventpoll物件,並且建立一個匿名的fd跟fd對應的file物件,而eventpoll物件儲存在struct file結構的private指標中,並且返回,

  • 該fd對應的file operations只是實現了poll跟release操作,建立eventpoll物件的初始化操作
    獲取當前使用者資訊,是不是root,最大監聽fd數目等並且儲存到eventpoll物件中

  • 初始化等待佇列,初始化就緒連結串列,初始化紅黑樹的頭結點

2.epoll_ctl

  • 將epoll_event結構拷貝到核心空間中,並且判斷加入的fd是否支援poll結(epoll,poll,selectI/O多路複用必須支援poll操作).

  • 從epfd->file->privatedata獲取event_poll物件,根據op區分是新增刪除還是修改,

  • 首先在eventpoll結構中的紅黑樹查詢是否已經存在了相對應的fd,沒找到就支援插入操作,否則報重複的錯誤,還有修改,刪除操作。

  • 插入操作時,會建立一個與fd對應的epitem結構,並且初始化相關成員,並指定呼叫poll_wait時的回撥函式用於資料就緒時喚醒程序,(其內部,初始化裝置的等待佇列,將該程序註冊到等待佇列)完成這一步,

  • epitem就跟這個socket關聯起來了, 當它有狀態變化時,會通過ep_poll_callback()來通知.

  • 最後呼叫加入的fd的fileoperation->poll函式(最後會呼叫poll_wait操作)用於完註冊操作,將epitem結構新增到紅黑樹中。

3.epoll_wait

  • 計算睡眠時間(如果有),判斷eventpoll物件的連結串列是否為空,不為空那就幹活不睡明.並且初始化一個等待佇列,把自己掛上去,設定自己的程序狀態

  • 若是可睡眠狀態.判斷是否有訊號到來(有的話直接被中斷醒來,),如果沒有那就呼叫schedule_timeout進行睡眠,

  • 如果超時或者被喚醒,首先從自己初始化的等待佇列刪除,然後開始拷貝資源給使用者空間了

  • 拷貝資源則是先把就緒事件連結串列轉移到中間連結串列,然後挨個遍歷拷貝到使用者空間,並且挨個判斷其是否為水平觸發,是的話再次插入到就緒連結串列

疑問

1.epoll_create中的size引數有什麼作用?

答:size這個引數其實沒有任何用處,它只是為了保持相容,因為之前的fd使用hash表儲存,size表示hash表的大小,而現在使用紅黑樹儲存,所以size就沒用了。

2.LT和ET的區別(原始碼級別)?

答:在原始碼中,兩種模式的區別是一個if判斷語句,通過ep_send_events_proc()函式實現,如果沒有標上EPOLLET(即預設的LT)且“事件被關注”的fd就會被重新放回了rdllist。那麼下次epoll_wait當然會又把rdllist裡的fd拿來拷給使用者了。



本人才疏學淺,若有錯,請指出,謝謝!
如果你有更好的建議,可以留言我們一起討論,共同進步!
衷心的感謝您能耐心的讀完本篇博文!