1. 程式人生 > >Libevent原始碼分析(六)--- bufferevent

Libevent原始碼分析(六)--- bufferevent

上一節說過,libevent提供六種bufferevent型別,後面會詳細分析其中的兩個:bufferevent_sock和bufferevent_async.下面是bufferevent的詳細定義:

struct bufferevent {
    /** Event base for which this bufferevent was created. */
    struct event_base *ev_base;
    /** Pointer to a table of function pointers to set up how this
        bufferevent behaves. */
const struct bufferevent_ops *be_ops; /** A read event that triggers when a timeout has happened or a socket is ready to read data. Only used by some subtypes of bufferevent. */ struct event ev_read; /** A write event that triggers when a timeout has happened or a socket is ready to write data. Only used by some subtypes of bufferevent. */
struct event ev_write; /** An input buffer. Only the bufferevent is allowed to add data to this buffer, though the user is allowed to drain it. */ struct evbuffer *input; /** An input buffer. Only the bufferevent is allowed to drain data from this buffer, though the user is allowed to add it. */
struct evbuffer *output; struct event_watermark wm_read; struct event_watermark wm_write; bufferevent_data_cb readcb; bufferevent_data_cb writecb; /* This should be called 'eventcb', but renaming it would break * backward compatibility */ bufferevent_event_cb errorcb; void *cbarg; struct timeval timeout_read; struct timeval timeout_write; /** Events that are currently enabled: currently EV_READ and EV_WRITE are supported. */ short enabled; };

其中ev_base指向bufferevent所屬的event_base ,ev_read,ev_write是讀寫事件,timeout_read和timeout_write是讀寫超時時間,readcb,writecb和errorcb分別是讀回撥,寫回調和事件回撥,因為版本相容問題使用errorcb命名。input和output是evbuffer型別的讀寫快取。enabled表示支援的事件,目前只有EV_READ 和EV_WRITE。wm_read和wm_write代表讀寫快取的伐值,be_ops是bufferevent_ops型別變數,它的定義如下:

struct bufferevent_ops {
    /** The name of the bufferevent's type. */
    const char *type;
    /** At what offset into the implementation type will we find a
        bufferevent structure?

        Example: if the type is implemented as
        struct bufferevent_x {
           int extra_data;
           struct bufferevent bev;
        }
        then mem_offset should be offsetof(struct bufferevent_x, bev)
    */
    off_t mem_offset;

    /** Enables one or more of EV_READ|EV_WRITE on a bufferevent.  Does
        not need to adjust the 'enabled' field.  Returns 0 on success, -1
        on failure.
     */
    int (*enable)(struct bufferevent *, short);

    /** Disables one or more of EV_READ|EV_WRITE on a bufferevent.  Does
        not need to adjust the 'enabled' field.  Returns 0 on success, -1
        on failure.
     */
    int (*disable)(struct bufferevent *, short);

    /** Free any storage and deallocate any extra data or structures used
        in this implementation.
     */
    void (*destruct)(struct bufferevent *);

    /** Called when the timeouts on the bufferevent have changed.*/
    int (*adj_timeouts)(struct bufferevent *);

    /** Called to flush data. */
    int (*flush)(struct bufferevent *, short, enum bufferevent_flush_mode);

    /** Called to access miscellaneous fields. */
    int (*ctrl)(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);

};

bufferevent_ops 的實現和eventop比較類似,都是定義了一組指標,不同的bufferevent實現可以自定義這些具體操作。比如bufferevent_sock的對應定義:

const struct bufferevent_ops bufferevent_ops_socket = {
    "socket",
    evutil_offsetof(struct bufferevent_private, bev),
    be_socket_enable,
    be_socket_disable,
    be_socket_destruct,
    be_socket_adj_timeouts,
    be_socket_flush,
    be_socket_ctrl,
};

接下來分析一下buffevent_sock的實現方式。

struct bufferevent *
bufferevent_new(evutil_socket_t fd,
    bufferevent_data_cb readcb, bufferevent_data_cb writecb,
    bufferevent_event_cb eventcb, void *cbarg)
{
    struct bufferevent *bufev;

    if (!(bufev = bufferevent_socket_new(NULL, fd, 0)))
        return NULL;

    bufferevent_setcb(bufev, readcb, writecb, eventcb, cbarg);

    return bufev;
}

struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
    int options)
{
    struct bufferevent_private *bufev_p;
    struct bufferevent *bufev;

#ifdef WIN32
    if (base && event_base_get_iocp(base))
        return bufferevent_async_new(base, fd, options);
#endif

    if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL)
        return NULL;

    if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket,
                    options) < 0) {
        mm_free(bufev_p);
        return NULL;
    }
    bufev = &bufev_p->bev;
    evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);

    event_assign(&bufev->ev_read, bufev->ev_base, fd,
        EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
    event_assign(&bufev->ev_write, bufev->ev_base, fd,
        EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

    evbuffer_freeze(bufev->input, 0);
    evbuffer_freeze(bufev->output, 1);

    return bufev;
}

bufferevent_socket_new方法用於建立一個bufferevent_socket型別的bufferevent。如果是在win32環境下則會預設呼叫iocp的對應實現,即bufferevent_async_new。該方法還會建立一個bufferevent_private變數,下面是他的定義:

struct bufferevent_private {
    /** The underlying bufferevent structure. */
    struct bufferevent bev;

    /** Evbuffer callback to enforce watermarks on input. */
    struct evbuffer_cb_entry *read_watermarks_cb;

    /** If set, we should free the lock when we free the bufferevent. */
    unsigned own_lock : 1;
    /** Flag: set if we have deferred callbacks and a read callback is
     * pending. */
    unsigned readcb_pending : 1;
    /** Flag: set if we have deferred callbacks and a write callback is
     * pending. */
    unsigned writecb_pending : 1;
    /** Flag: set if we are currently busy connecting. */
    unsigned connecting : 1;
    /** Flag: set if a connect failed prematurely; this is a hack for
     * getting around the bufferevent abstraction. */
    unsigned connection_refused : 1;
    /** Set to the events pending if we have deferred callbacks and
     * an events callback is pending. */
    short eventcb_pending;

    /** If set, read is suspended until one or more conditions are over.
     * The actual value here is a bitfield of those conditions; see the
     * BEV_SUSPEND_* flags above. */
    bufferevent_suspend_flags read_suspended;

    /** If set, writing is suspended until one or more conditions are over.
     * The actual value here is a bitfield of those conditions; see the
     * BEV_SUSPEND_* flags above. */
    bufferevent_suspend_flags write_suspended;

    /** Set to the current socket errno if we have deferred callbacks and
     * an events callback is pending. */
    int errno_pending;

    /** The DNS error code for bufferevent_socket_connect_hostname */
    int dns_error;

    /** Used to implement deferred callbacks */
    struct deferred_cb deferred;

    /** The options this bufferevent was constructed with */
    enum bufferevent_options options;

    /** Current reference count for this bufferevent. */
    int refcnt;

    /** Lock for this bufferevent.  Shared by the inbuf and the outbuf.
     * If NULL, locking is disabled. */
    void *lock;

    /** Rate-limiting information for this bufferevent */
    struct bufferevent_rate_limit *rate_limiting;
};

bufferevent_private結構的第一個變數是bufferevent型別的,其實每一個bufferevent都對應一個bufferevent_private變數,只是對使用者來說是透明的,使用者只需要瞭解bufferevent即可。bufferevent_private的變數基本都是用於記錄狀態。
繼續分析bufferevent_socket_new函式,建立bufferevent_private之後呼叫bufferevent_init_common函式,這個函式定義在bufferevent.c檔案中,該檔案中的函式都是各個bufferevent型別通用的函式。

int bufferevent_init_common(struct bufferevent_private *bufev_private,
    struct event_base *base,
    const struct bufferevent_ops *ops,
    enum bufferevent_options options)
{
    struct bufferevent *bufev = &bufev_private->bev;

    if (!bufev->input) {
        if ((bufev->input = evbuffer_new()) == NULL)
            return -1;
    }

    if (!bufev->output) {
        if ((bufev->output = evbuffer_new()) == NULL) {
            evbuffer_free(bufev->input);
            return -1;
        }
    }

    bufev_private->refcnt = 1;
    bufev->ev_base = base;

    /* Disable timeouts. */
    evutil_timerclear(&bufev->timeout_read);
    evutil_timerclear(&bufev->timeout_write);

    bufev->be_ops = ops;

    /*
     * Set to EV_WRITE so that using bufferevent_write is going to
     * trigger a callback.  Reading needs to be explicitly enabled
     * because otherwise no data will be available.
     */
    bufev->enabled = EV_WRITE;

#ifndef _EVENT_DISABLE_THREAD_SUPPORT
    if (options & BEV_OPT_THREADSAFE) {
        if (bufferevent_enable_locking(bufev, NULL) < 0) {
            /* cleanup */
            evbuffer_free(bufev->input);
            evbuffer_free(bufev->output);
            bufev->input = NULL;
            bufev->output = NULL;
            return -1;
        }
    }
#endif
    if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
        == BEV_OPT_UNLOCK_CALLBACKS) {
        event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
        return -1;
    }
    if (options & BEV_OPT_DEFER_CALLBACKS) {
        if (options & BEV_OPT_UNLOCK_CALLBACKS)
            event_deferred_cb_init(&bufev_private->deferred,
                bufferevent_run_deferred_callbacks_unlocked,
                bufev_private);
        else
            event_deferred_cb_init(&bufev_private->deferred,
                bufferevent_run_deferred_callbacks_locked,
                bufev_private);
    }

    bufev_private->options = options;

    evbuffer_set_parent(bufev->input, bufev);
    evbuffer_set_parent(bufev->output, bufev);

    return 0;
}

bufferevent_init_common比較簡單,需要注意 bufferEvent預設開啟EV_WRITE,EV_READ需要手動開啟,如果需要支援多執行緒則為bufferevent建立鎖,另外在目前的版本中,UNLOCK_CALLBACKS需要DEFER_CALLBACKS作為前置條件。

繼續分析bufferevent_socket_new函式:

    evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
    event_assign(&bufev->ev_read, bufev->ev_base, fd,
        EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
    event_assign(&bufev->ev_write, bufev->ev_base, fd,
        EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);

呼叫bufferevent_init_common之後會將bufev->output新增EVBUFFER_FLAG_DRAINS_TO_FD標記,這樣output就支援從檔案讀取資料了,接下來設定讀寫事件回掉,最後新增bufferevent_socket_outbuf_cb作為output的call,當output中有資料時呼叫bufferevent_socket_outbuf_cb,這樣就可以不必一直監聽寫事件,bufferevent_writecb在資料全部寫出之後可以移除寫事件, 然後在bufferevent_socket_outbuf_cb中判斷是否需要新增事件。

初始化bufferevent之後需要呼叫be_socket_setfd設定網路套接字來監聽讀寫事件:

static void
be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd)
{
    BEV_LOCK(bufev);
    EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket);

    event_del(&bufev->ev_read);
    event_del(&bufev->ev_write);

    event_assign(&bufev->ev_read, bufev->ev_base, fd,
        EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
    event_assign(&bufev->ev_write, bufev->ev_base, fd,
        EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);

    if (fd >= 0)
        bufferevent_enable(bufev, bufev->enabled);

    BEV_UNLOCK(bufev);
}
int bufferevent_enable(struct bufferevent *bufev, short event)
{
    struct bufferevent_private *bufev_private =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    short impl_events = event;
    int r = 0;

    _bufferevent_incref_and_lock(bufev);
    if (bufev_private->read_suspended)
        impl_events &= ~EV_READ;
    if (bufev_private->write_suspended)
        impl_events &= ~EV_WRITE;

    bufev->enabled |= event;

    if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
        r = -1;

    _bufferevent_decref_and_unlock(bufev);
    return r;
}

be_socket_setfd中設定的套接字必須是已經建立好連線的socket。如果是client端,bufferevent_sock提供了bufferevent_socket_connect用於連線套接字,但是如果是server端,libevent沒有提供listen的相關函式,不過libevent提供了listener.c檔案可以用於監聽連線。

int
bufferevent_socket_connect(struct bufferevent *bev,
    struct sockaddr *sa, int socklen)
{
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bev, struct bufferevent_private, bev);

    evutil_socket_t fd;
    int r = 0;
    int result=-1;
    int ownfd = 0;

    _bufferevent_incref_and_lock(bev);

    if (!bufev_p)
        goto done;

    fd = bufferevent_getfd(bev);
    if (fd < 0) {
        if (!sa)
            goto done;
        fd = socket(sa->sa_family, SOCK_STREAM, 0);
        if (fd < 0)
            goto done;
        if (evutil_make_socket_nonblocking(fd)<0)
            goto done;
        ownfd = 1;
    }
    if (sa) {
#ifdef WIN32
        if (bufferevent_async_can_connect(bev)) {
            bufferevent_setfd(bev, fd);
            r = bufferevent_async_connect(bev, fd, sa, socklen);
            if (r < 0)
                goto freesock;
            bufev_p->connecting = 1;
            result = 0;
            goto done;
        } else
#endif
        r = evutil_socket_connect(&fd, sa, socklen);
        if (r < 0)
            goto freesock;
    }
#ifdef WIN32
    /* ConnectEx() isn't always around, even when IOCP is enabled.
     * Here, we borrow the socket object's write handler to fall back
     * on a non-blocking connect() when ConnectEx() is unavailable. */
    if (BEV_IS_ASYNC(bev)) {
        event_assign(&bev->ev_write, bev->ev_base, fd,
            EV_WRITE|EV_PERSIST, bufferevent_writecb, bev);
    }
#endif
    bufferevent_setfd(bev, fd);
    if (r == 0) {
        if (! be_socket_enable(bev, EV_WRITE)) {
            bufev_p->connecting = 1;
            result = 0;
            goto done;
        }
    } else if (r == 1) {
        /* The connect succeeded already. How very BSD of it. */
        result = 0;
        bufev_p->connecting = 1;
        event_active(&bev->ev_write, EV_WRITE, 1);
    } else {
        /* The connect failed already.  How very BSD of it. */
        bufev_p->connection_refused = 1;
        bufev_p->connecting = 1;
        result = 0;
        event_active(&bev->ev_write, EV_WRITE, 1);
    }

    goto done;

freesock:
    _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
    if (ownfd)
        evutil_closesocket(fd);
    /* do something about the error? */
done:
    _bufferevent_decref_and_unlock(bev);
    return result;
}

win32的connect使用iocp實現,將在下一節iocp章節詳細分析,這裡只看其他平臺的實現方式。evutil_socket_connect是libevent提供的工具函式,它的返回值有三種情況:0代表正在連線,此時需要呼叫be_socket_enable啟用寫事件。當連線成功時會呼叫bufferevent_writecb回掉。1代表已經連線成功,此時呼叫event_active直接手動觸發bufferevent_writecb事件處理連線。-1代表連線被拒絕,此時同樣呼叫event_active處理連線失敗情況。
下面是bufferevent_writecb的具體實現:

static void
bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    int res = 0;
    short what = BEV_EVENT_WRITING;
    int connected = 0;
    ev_ssize_t atmost = -1;

    _bufferevent_incref_and_lock(bufev);

    if (event == EV_TIMEOUT) {
        /* Note that we only check for event==EV_TIMEOUT. If
         * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the
         * timeout, since a read has occurred */
        what |= BEV_EVENT_TIMEOUT;
        goto error;
    }
    if (bufev_p->connecting) {
        int c = evutil_socket_finished_connecting(fd);
        /* we need to fake the error if the connection was refused
         * immediately - usually connection to localhost on BSD */
        if (bufev_p->connection_refused) {
          bufev_p->connection_refused = 0;
          c = -1;
        }

        if (c == 0)
            goto done;

        bufev_p->connecting = 0;
        if (c < 0) {
            event_del(&bufev->ev_write);
            event_del(&bufev->ev_read);
            _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR);
            goto done;
        } else {
            connected = 1;
#ifdef WIN32
            if (BEV_IS_ASYNC(bufev)) {
                event_del(&bufev->ev_write);
                bufferevent_async_set_connected(bufev);
                _bufferevent_run_eventcb(bufev,
                        BEV_EVENT_CONNECTED);
                goto done;
            }
#endif
            _bufferevent_run_eventcb(bufev,
                    BEV_EVENT_CONNECTED);
            if (!(bufev->enabled & EV_WRITE) ||
                bufev_p->write_suspended) {
                event_del(&bufev->ev_write);
                goto done;
            }
        }
    }

    atmost = _bufferevent_get_write_max(bufev_p);

    if (bufev_p->write_suspended)
        goto done;

    if (evbuffer_get_length(bufev->output)) {
        evbuffer_unfreeze(bufev->output, 1);
        res = evbuffer_write_atmost(bufev->output, fd, atmost);
        evbuffer_freeze(bufev->output, 1);
        if (res == -1) {
            int err = evutil_socket_geterror(fd);
            if (EVUTIL_ERR_RW_RETRIABLE(err))
                goto reschedule;
            what |= BEV_EVENT_ERROR;
        } else if (res == 0) {
            /* eof case
               XXXX Actually, a 0 on write doesn't indicate
               an EOF. An ECONNRESET might be more typical.
             */
            what |= BEV_EVENT_EOF;
        }
        if (res <= 0)
            goto error;

        _bufferevent_decrement_write_buckets(bufev_p, res);
    }

    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }

    /*
     * Invoke the user callback if our buffer is drained or below the
     * low watermark.
     */
    if ((res || !connected) &&
        evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
        _bufferevent_run_writecb(bufev);
    }

    goto done;

 reschedule:
    if (evbuffer_get_length(bufev->output) == 0) {
        event_del(&bufev->ev_write);
    }
    goto done;

 error:
    bufferevent_disable(bufev, EV_WRITE);
    _bufferevent_run_eventcb(bufev, what);

 done:
    _bufferevent_decref_and_unlock(bufev);
}

bufferevent_writecb需要處理兩件事,一個是連線邏輯,一個是傳送資料邏輯,注意當處理連線邏輯時,如果當前沒有啟用EV_WRITE或者處於write_suspended狀態,需要刪除寫監聽,因為當前的寫監聽事件只用於處理連線。另外當處理資料邏輯時如果output的資料長度為0,同樣需要刪除寫事件,因為output的callback在有資料之後會重新新增寫事件:

static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf,
    const struct evbuffer_cb_info *cbinfo,
    void *arg)
{
    struct bufferevent *bufev = arg;
    struct bufferevent_private *bufev_p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);

    if (cbinfo->n_added &&
        (bufev->enabled & EV_WRITE) &&
        !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
        !bufev_p->write_suspended) {
        /* Somebody added data to the buffer, and we would like to
         * write, and we were not writing.  So, start writing. */
        if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
            /* Should we log this? */
        }
    }
}

bufferevent_writecb和bufferevent_readcb還需要呼叫使用者自定義的讀寫事件:

void
_bufferevent_run_writecb(struct bufferevent *bufev)
{
    /* Requires that we hold the lock and a reference */
    struct bufferevent_private *p =
        EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
    if (bufev->writecb == NULL)
        return;
    if (p->options & BEV_OPT_DEFER_CALLBACKS) {
        p->writecb_pending = 1;
        if (!p->deferred.queued)
            SCHEDULE_DEFERRED(p);
    } else {
        bufev->writecb(bufev, bufev->cbarg);
    }
}
#define SCHEDULE_DEFERRED(bevp)                     \
    do {                                \
        bufferevent_incref(&(bevp)->bev);           \
        event_deferred_cb_schedule(             \
            event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \
            &(bevp)->deferred);             \
    } while (0)

如果bufferevent的選項包含BEV_OPT_DEFER_CALLBACKS則需要使用延遲呼叫方式,延遲呼叫是在event_base的loop迴圈統一處理的。p->deferred.queued如果沒有被設定過,則加入到ev_base的延遲呼叫佇列。bufferevent_init_common中設定過defered對應的回掉:

    if (options & BEV_OPT_DEFER_CALLBACKS) {
        if (options & BEV_OPT_UNLOCK_CALLBACKS)
            event_deferred_cb_init(&bufev_private->deferred,
                bufferevent_run_deferred_callbacks_unlocked,
                bufev_private);
        else
            event_deferred_cb_init(&bufev_private->deferred,
                bufferevent_run_deferred_callbacks_locked,
                bufev_private);
    }

這兩個回掉只是使用鎖的方式不同:

static void
bufferevent_run_deferred_callbacks_locked(struct deferred_cb *_, void *arg)
{
    struct bufferevent_private *bufev_private = arg;
    struct bufferevent *bufev = &bufev_private->bev;

    BEV_LOCK(bufev);
    if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
        bufev->errorcb) {
        /* The "connected" happened before any reads or writes, so
           send it first. */
        bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
        bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg);
    }
    if (bufev_private->readcb_pending && bufev->readcb) {
        bufev_private->readcb_pending = 0;
        bufev->readcb(bufev, bufev->cbarg);
    }
    if (bufev_private->writecb_pending && bufev->writecb) {
        bufev_private->writecb_pending = 0;
        bufev->writecb(bufev, bufev->cbarg);
    }
    if (bufev_private->eventcb_pending && bufev->errorcb) {
        short what = bufev_private->eventcb_pending;
        int err = bufev_private->errno_pending;
        bufev_private->eventcb_pending = 0;
        bufev_private->errno_pending = 0;
        EVUTIL_SET_SOCKET_ERROR(err);
        bufev->errorcb(bufev, what, bufev->cbarg);
    }
    _bufferevent_decref_and_unlock(bufev);
}

static void
bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg)
{
    struct bufferevent_private *bufev_private = arg;
    struct bufferevent *bufev = &bufev_private->bev;

    BEV_LOCK(bufev);
#define UNLOCKED(stmt) \
    do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0)

    if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
        bufev->errorcb) {
        /* The "connected" happened before any reads or writes, so
           send it first. */
        bufferevent_event_cb errorcb = bufev->errorcb;
        void *cbarg = bufev->cbarg;
        bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
        UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg));
    }
    if (bufev_private->readcb_pending && bufev->readcb) {
        bufferevent_data_cb readcb = bufev->readcb;
        void *cbarg = bufev->cbarg;
        bufev_private->readcb_pending = 0;
        UNLOCKED(readcb(bufev, cbarg));
    }
    if (bufev_private->writecb_pending && bufev->writecb) {
        bufferevent_data_cb writecb = bufev->writecb;
        void *cbarg = bufev->cbarg;
        bufev_private->writecb_pending = 0;
        UNLOCKED(writecb(bufev, cbarg));
    }
    if (bufev_private->eventcb_pending && bufev->errorcb) {
        bufferevent_event_cb errorcb = bufev->errorcb;
        void *cbarg = bufev->cbarg;
        short what = bufev_private->eventcb_pending;
        int err = bufev_private->errno_pending;
        bufev_private->eventcb_pending = 0;
        bufev_private->errno_pending = 0;
        EVUTIL_SET_SOCKET_ERROR(err);
        UNLOCKED(errorcb(bufev,what,cbarg));
    }
    _bufferevent_decref_and_unlock(bufev);
#undef UNLOCKED
}

以上就是bufferevent_sock的實現方式,下一節我們將分析IOCP的實現方式。