1. 程式人生 > >libevent程式碼閱讀(13)——epoll的事件等待以及分發過程

libevent程式碼閱讀(13)——epoll的事件等待以及分發過程

在前面的章節我們提到event_base_loop中會呼叫具體的io複用機制的事件等待以及分發函式evsel->dispatch,然後呼叫event_process_active處理已經啟用的事件。

對於epoll而言,dispatch就是epoll_dispatch,我們看一下它的執行流程

// 事件分發
static int
epoll_dispatch(struct event_base *base, struct timeval *tv)
{
	struct epollop *epollop = base->evbase;
	struct epoll_event *events = epollop->events;
	int i, res;
	long timeout = -1;

	if (tv != NULL) {
		timeout = evutil_tv_to_msec(tv);
		if (timeout < 0 || timeout > MAX_EPOLL_TIMEOUT_MSEC) {
			/* Linux kernels can wait forever if the timeout is
			 * too big; see comment on MAX_EPOLL_TIMEOUT_MSEC. */
			timeout = MAX_EPOLL_TIMEOUT_MSEC;
		}
	}

	// 暫時不用管下面兩個函式的呼叫
	epoll_apply_changes(base);
	event_changelist_remove_all(&base->changelist, base);

	EVBASE_RELEASE_LOCK(base, th_base_lock);

	// 等待事件發生,res存放事件發生的個數
	res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);

	EVBASE_ACQUIRE_LOCK(base, th_base_lock);

	// 出錯
	if (res == -1) {
		if (errno != EINTR) {
			event_warn("epoll_wait");
			return (-1);
		}

		return (0);
	}

	event_debug(("%s: epoll_wait reports %d", __func__, res));
	EVUTIL_ASSERT(res <= epollop->nevents);

	// 遍歷所有觸發的事件
	for (i = 0; i < res; i++)
	{
		int what = events[i].events;
		short ev = 0;

		if (what & (EPOLLHUP|EPOLLERR))
		{
			// 有錯誤發生
			ev = EV_READ | EV_WRITE;
		}
		else
		{
			// 可讀
			if (what & EPOLLIN)
				ev |= EV_READ;

			// 可寫
			if (what & EPOLLOUT)
				ev |= EV_WRITE;
		}

		if (!ev)
			continue;

		// 啟用io事件對映表中的事件處理器
		evmap_io_active(base, events[i].data.fd, ev | EV_ET);
	}

	// 如果所有的事件都被觸發了,表示事件陣列的大小還是太小了,需要擴充套件事件陣列的大小
	if (res == epollop->nevents && epollop->nevents < MAX_NEVENT) {
		/* We used all of the event space this time.  We should
		   be ready for more events next time. */
		int new_nevents = epollop->nevents * 2;
		struct epoll_event *new_events;

		new_events = mm_realloc(epollop->events,
		    new_nevents * sizeof(struct epoll_event));
		if (new_events) {
			epollop->events = new_events;
			epollop->nevents = new_nevents;
		}
	}

	return (0);
}

我們看到epoll_dispatch中呼叫了一個evmap_io_active函式,這個函式會將啟用的事件插入到已啟用的事件列表中
/*
 * 啟用io事件對映表中的事件,然後插入到已啟用事件佇列中
 */
void
evmap_io_active(struct event_base *base, evutil_socket_t fd, short events)
{
	struct event_io_map *io = &base->io;
	struct evmap_io *ctx;
	struct event *ev;

#ifndef EVMAP_USE_HT
	EVUTIL_ASSERT(fd < io->nentries);
#endif
	GET_IO_SLOT(ctx, io, fd, evmap_io);

	EVUTIL_ASSERT(ctx);
	TAILQ_FOREACH(ev, &ctx->events, ev_io_next) {
		if (ev->ev_events & events)
			event_active_nolock(ev, ev->ev_events & events, 1);
	}
}
event_active_nolock函式的過程如下:
/*
 * 非阻塞的啟用事件,並將它存放於己啟用事件列表中
 */
void
event_active_nolock(struct event *ev, int res, short ncalls)
{
	struct event_base *base;

	event_debug(("event_active: %p (fd "EV_SOCK_FMT"), res %d, callback %p",
			ev, EV_SOCK_ARG(ev->ev_fd), (int)res, ev->ev_callback));


	/* We get different kinds of events, add them together */
	// 如果本事件已經存在於已啟用事件列表中,那麼就返回
	if (ev->ev_flags & EVLIST_ACTIVE) {
		ev->ev_res |= res;
		return;
	}

	base = ev->ev_base;

	EVENT_BASE_ASSERT_LOCKED(base);

	ev->ev_res = res;

	// 如果當前事件的優先順序小於event_base的執行時優先順序
	// 就讓event_base跳過本次迴圈,進行下一次迴圈
	if (ev->ev_pri < base->event_running_priority)
		base->event_continue = 1;

	// 如果是訊號事件
	if (ev->ev_events & EV_SIGNAL) {
#ifndef _EVENT_DISABLE_THREAD_SUPPORT
		if (base->current_event == ev && !EVBASE_IN_THREAD(base)) {
			++base->current_event_waiters;
			EVTHREAD_COND_WAIT(base->current_event_cond, base->th_base_lock);
		}
#endif
		ev->ev_ncalls = ncalls;
		ev->ev_pncalls = NULL;
	}

	// 把被啟用的事件插入到已啟用事件列表
	event_queue_insert(base, ev, EVLIST_ACTIVE);

	if (EVBASE_NEED_NOTIFY(base))
		evthread_notify_base(base);
}

event_queue_insert函式才正真將事件插入到已啟用事件佇列中:
/*
 * 將事件處理器新增到各種事件佇列中
 * 將io事件處理器和訊號事件處理器插入註冊事件佇列
 * 將定時器插入通用定時器佇列或時間堆
 * 將被啟用的事件處理器新增到活動事件佇列中
 */
static void
event_queue_insert(struct event_base *base, struct event *ev, int queue)
{
	EVENT_BASE_ASSERT_LOCKED(base);

	// 避免重複插入
	if (ev->ev_flags & queue) {
		/* Double insertion is possible for active events */
		if (queue & EVLIST_ACTIVE)
			return;

		event_errx(1, "%s: %p(fd "EV_SOCK_FMT") already on queue %x", __func__,
				ev, EV_SOCK_ARG(ev->ev_fd), queue);
		return;
	}

	// 增加event_base擁有的事件處理器的數量
	if (~ev->ev_flags & EVLIST_INTERNAL)
		base->event_count++;

	// 標記該事件處理器已經被處理過
	ev->ev_flags |= queue;

	// 根據不同的事件插入到不同的佇列中
	switch (queue) {
	// 將io事件處理器或訊號事件處理器插入註冊事件佇列
	case EVLIST_INSERTED:
		TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
		break;
		// 將就緒事件處理器插入活動事件佇列
	case EVLIST_ACTIVE:
		base->event_count_active++;
		TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
				ev,ev_active_next);
		break;
		// 將定時器插入到通用定時器佇列或時間堆
	case EVLIST_TIMEOUT: {
		if (is_common_timeout(&ev->ev_timeout, base)) {
			struct common_timeout_list *ctl =
					get_common_timeout_list(base, &ev->ev_timeout);
			insert_common_timeout_inorder(ctl, ev);
		} else
			min_heap_push(&base->timeheap, ev);
		break;
	}
	default:
		event_errx(1, "%s: unknown queue %x", __func__, queue);
	}
}