1. 程式人生 > >Libevent原始碼分析(五)--- evbuffer的基本操作

Libevent原始碼分析(五)--- evbuffer的基本操作

之前幾節分析了libevent底層的結構和執行機制,接下來的幾節將會分析Bufferevents,Bufferevents在event的基礎上加入了資料快取邏輯,使得事件和資料結合在一起。libevent的bufferevent有六種型別,分別是:bufferevent_async,bufferevent_filter,bufferevent_openssl,bufferevent_pair,bufferevent_ratelim和bufferevent_sock。其中最常用的是bufferevent_sock型別。

evbuffer

每一個bufferevent 都有兩個evbuffer 分被作為讀寫快取,evbuffer 處理真正的資料,下面是evbuffer的定義:

struct evbuffer {
    /** The first chain in this buffer's linked list of chains. */
    struct evbuffer_chain *first;
    /** The last chain in this buffer's linked list of chains. */
    struct evbuffer_chain *last;

    /** Pointer to the next pointer pointing at the 'last_with_data' chain.
     *
     * To unpack:
     *
     * The last_with_data chain is the last chain that has any data in it.
     * If all chains in the buffer are empty, it is the first chain.
     * If the buffer has no chains, it is NULL.
     *
     * The last_with_datap pointer points at _whatever 'next' pointer_
     * points at the last_with_datap chain.  If the last_with_data chain
     * is the first chain, or it is NULL, then the last_with_datap pointer
     * is &buf->first.
     */
struct evbuffer_chain **last_with_datap; /** Total amount of bytes stored in all chains.*/ size_t total_len; /** Number of bytes we have added to the buffer since we last tried to * invoke callbacks. */ size_t n_add_for_cb; /** Number of bytes we have removed from the buffer since we last * tried to invoke callbacks. */
size_t n_del_for_cb; #ifndef _EVENT_DISABLE_THREAD_SUPPORT /** A lock used to mediate access to this buffer. */ void *lock; #endif /** True iff we should free the lock field when we free this * evbuffer. */ unsigned own_lock : 1; /** True iff we should not allow changes to the front of the buffer * (drains or prepends). */ unsigned freeze_start : 1; /** True iff we should not allow changes to the end of the buffer * (appends) */ unsigned freeze_end : 1; /** True iff this evbuffer's callbacks are not invoked immediately * upon a change in the buffer, but instead are deferred to be invoked * from the event_base's loop. Useful for preventing enormous stack * overflows when we have mutually recursive callbacks, and for * serializing callbacks in a single thread. */ unsigned deferred_cbs : 1; #ifdef WIN32 /** True iff this buffer is set up for overlapped IO. */ unsigned is_overlapped : 1; #endif /** Zero or more EVBUFFER_FLAG_* bits */ ev_uint32_t flags; /** Used to implement deferred callbacks. */ struct deferred_cb_queue *cb_queue; /** A reference count on this evbuffer. When the reference count * reaches 0, the buffer is destroyed. Manipulated with * evbuffer_incref and evbuffer_decref_and_unlock and * evbuffer_free. */ int refcnt; /** A deferred_cb handle to make all of this buffer's callbacks * invoked from the event loop. */ struct deferred_cb deferred; /** A doubly-linked-list of callback functions */ TAILQ_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks; /** The parent bufferevent object this evbuffer belongs to. * NULL if the evbuffer stands alone. */ struct bufferevent *parent; };

evbuffer包含一個evbuffer_chain連結串列,資料都是儲存在各個evbuffer_chain中,evbuffer有三個指標:first,last和last_with_datap,分別指向evbuffer_chain列表的第一個元素,最後一個元素,以及有資料的最後一個元素;total_len表示資料的總長度,n_add_for_cb和n_del_for_cb記錄回撥之間資料的變化;lock和own_lock用於鎖相關;freeze_start和freeze_end用於標記首尾evbuffer_chain是否鎖定;deferred_cbs用於標記是否使用延遲迴調;is_overlapped標記是否使用iocp;flag用於設定狀態,目前可以設定的值只有0和EVBUFFER_FLAG_DRAINS_TO_FD,後者標記是否用於bufferevent_sock型別。cb_queue指向event_base中的defer_queue,deferred作為一個延遲迴調會加入到的defer_queue中,deferred觸發後會呼叫所有的callbacks,最後parent變數設定evbuffer對應的bufferEvent。

下面是evbuffer_chain的定義:

struct evbuffer_chain {
    /** points to next buffer in the chain */
    struct evbuffer_chain *next;

    /** total allocation available in the buffer field. */
    size_t buffer_len;

    /** unused space at the beginning of buffer or an offset into a
     * file for sendfile buffers. */
    ev_misalign_t misalign;

    /** Offset into buffer + misalign at which to start writing.
     * In other words, the total number of bytes actually stored
     * in buffer. */
    size_t off;

    /** Set if special handling is required for this chain */
    unsigned flags;
#define EVBUFFER_MMAP       0x0001  /**< memory in buffer is mmaped */
#define EVBUFFER_SENDFILE   0x0002  /**< a chain used for sendfile */
#define EVBUFFER_REFERENCE  0x0004  /**< a chain with a mem reference */
#define EVBUFFER_IMMUTABLE  0x0008  /**< read-only chain */
    /** a chain that mustn't be reallocated or freed, or have its contents
     * memmoved, until the chain is un-pinned. */
#define EVBUFFER_MEM_PINNED_R   0x0010
#define EVBUFFER_MEM_PINNED_W   0x0020
#define EVBUFFER_MEM_PINNED_ANY (EVBUFFER_MEM_PINNED_R|EVBUFFER_MEM_PINNED_W)
    /** a chain that should be freed, but can't be freed until it is
     * un-pinned. */
#define EVBUFFER_DANGLING   0x0040

    /** Usually points to the read-write memory belonging to this
     * buffer allocated as part of the evbuffer_chain allocation.
     * For mmap, this can be a read-only buffer and
     * EVBUFFER_IMMUTABLE will be set in flags.  For sendfile, it
     * may point to NULL.
     */
    unsigned char *buffer;
};

evbuffer_chain比較簡單,next用於連結串列,buffer_len代表evbuffer_chain的總長度,misalign標記偏移,off記錄當前資料的長度,flags是一個狀態標記位,接下來的一組巨集定義給出了flags可以設定的狀態,最後的buffer指向了資料快取。

evbuffer的相關操作非常多,初次接觸可能比較亂,下面是他比較重要的幾個方法:

evbuffer_chain_new & evbuffer_chain_free

static struct evbuffer_chain * evbuffer_chain_new(size_t size)
{
    struct evbuffer_chain *chain;
    size_t to_alloc;

    size += EVBUFFER_CHAIN_SIZE;

    /* get the next largest memory that can hold the buffer */
    to_alloc = MIN_BUFFER_SIZE;
    while (to_alloc < size)
        to_alloc <<= 1;

    /* we get everything in one chunk */
    if ((chain = mm_malloc(to_alloc)) == NULL)
        return (NULL);

    memset(chain, 0, EVBUFFER_CHAIN_SIZE);

    chain->buffer_len = to_alloc - EVBUFFER_CHAIN_SIZE;

    /* this way we can manipulate the buffer to different addresses,
     * which is required for mmap for example.
     */
    chain->buffer = EVBUFFER_CHAIN_EXTRA(u_char, chain);

    return (chain);
}

static inline void evbuffer_chain_free(struct evbuffer_chain *chain)
{
    if (CHAIN_PINNED(chain)) {
        chain->flags |= EVBUFFER_DANGLING;
        return;
    }
    if (chain->flags & (EVBUFFER_MMAP|EVBUFFER_SENDFILE|
        EVBUFFER_REFERENCE)) {
        if (chain->flags & EVBUFFER_REFERENCE) {
            struct evbuffer_chain_reference *info =
                EVBUFFER_CHAIN_EXTRA(
                    struct evbuffer_chain_reference,
                    chain);
            if (info->cleanupfn)
                (*info->cleanupfn)(chain->buffer,
                    chain->buffer_len,
                    info->extra);
        }
#ifdef _EVENT_HAVE_MMAP
        if (chain->flags & EVBUFFER_MMAP) {
            struct evbuffer_chain_fd *info =
                EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd,
                chain);
            if (munmap(chain->buffer, chain->buffer_len) == -1)
                event_warn("%s: munmap failed", __func__);
            if (close(info->fd) == -1)
                event_warn("%s: close(%d) failed",
                    __func__, info->fd);
        }
#endif
#ifdef USE_SENDFILE
        if (chain->flags & EVBUFFER_SENDFILE) {
            struct evbuffer_chain_fd *info =
                EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd,
                chain);
            if (close(info->fd) == -1)
                event_warn("%s: close(%d) failed",
                    __func__, info->fd);
        }
#endif
    }

    mm_free(chain);
}

這裡注意的是呼叫free方法時flag可能有三個狀態,EVBUFFER_REFERENCE代表這是一個引用型別的evbuffer_chain,這個標記在evbuffer_add_reference中設定:

int evbuffer_add_reference(struct evbuffer *outbuf,
    const void *data, size_t datlen,
    evbuffer_ref_cleanup_cb cleanupfn, void *extra)
{
    struct evbuffer_chain *chain;
    struct evbuffer_chain_reference *info;
    int result = -1;

    // evbuffer_chain_new方法申請了evbuffer_chain_reference大小的空間,真正的資料是傳入的data
    chain = evbuffer_chain_new(sizeof(struct evbuffer_chain_reference));
    if (!chain)
        return (-1);
    chain->flags |= EVBUFFER_REFERENCE | EVBUFFER_IMMUTABLE;
    chain->buffer = (u_char *)data;
    chain->buffer_len = datlen;
    chain->off = datlen;

    info = EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_reference, chain);
    info->cleanupfn = cleanupfn;
    info->extra = extra;

    EVBUFFER_LOCK(outbuf);
    if (outbuf->freeze_end) {
        /* don't call chain_free; we do not want to actually invoke
         * the cleanup function */
        mm_free(chain);
        goto done;
    }
    evbuffer_chain_insert(outbuf, chain);
    outbuf->n_add_for_cb += datlen;

    evbuffer_invoke_callbacks(outbuf);

    result = 0;
done:
    EVBUFFER_UNLOCK(outbuf);

    return result;
}

當使用evbuffer_add_reference時,evbuffer_chain指向一塊已經存在的記憶體,evbuffer_chain只需要申請一個evbuffer_chain_reference大小的變數湧來儲存cleanupfn和extra引數即可。EVBUFFER_MMAP主要用與mmap操作,EVBUFFER_SENDFILE則直接用於傳送檔案,這兩個引數後面還會詳細分析。

evbuffer_add & evbuffer_remove

evbuffer作為讀寫快取,它的操作基本都是成對出現的,比如下面的一組add和remove:

int evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen)
{
    struct evbuffer_chain *chain, *tmp;
    const unsigned char *data = data_in;
    size_t remain, to_alloc;
    int result = -1;

    EVBUFFER_LOCK(buf);

    if (buf->freeze_end) {
        goto done;
    }

    chain = buf->last;

    /* If there are no chains allocated for this buffer, allocate one
     * big enough to hold all the data. */
    if (chain == NULL) {
        chain = evbuffer_chain_new(datlen);
        if (!chain)
            goto done;
        evbuffer_chain_insert(buf, chain);
    }

    if ((chain->flags & EVBUFFER_IMMUTABLE) == 0) {
        remain = (size_t)(chain->buffer_len - chain->misalign - chain->off);
        if (remain >= datlen) {
            /* there's enough space to hold all the data in the
             * current last chain */
            memcpy(chain->buffer + chain->misalign + chain->off,
                data, datlen);
            chain->off += datlen;
            buf->total_len += datlen;
            buf->n_add_for_cb += datlen;
            goto out;
        } else if (!CHAIN_PINNED(chain) &&
            evbuffer_chain_should_realign(chain, datlen)) {
            /* we can fit the data into the misalignment */
            evbuffer_chain_align(chain);

            memcpy(chain->buffer + chain->off, data, datlen);
            chain->off += datlen;
            buf->total_len += datlen;
            buf->n_add_for_cb += datlen;
            goto out;
        }
    } else {
        /* we cannot write any data to the last chain */
        remain = 0;
    }

    /* we need to add another chain */
    to_alloc = chain->buffer_len;
    if (to_alloc <= EVBUFFER_CHAIN_MAX_AUTO_SIZE/2)
        to_alloc <<= 1;
    if (datlen > to_alloc)
        to_alloc = datlen;
    tmp = evbuffer_chain_new(to_alloc);
    if (tmp == NULL)
        goto done;

    if (remain) {
        memcpy(chain->buffer + chain->misalign + chain->off,
            data, remain);
        chain->off += remain;
        buf->total_len += remain;
        buf->n_add_for_cb += remain;
    }

    data += remain;
    datlen -= remain;

    memcpy(tmp->buffer, data, datlen);
    tmp->off = datlen;
    evbuffer_chain_insert(buf, tmp);
    buf->n_add_for_cb += datlen;

out:
    evbuffer_invoke_callbacks(buf);
    result = 0;
done:
    EVBUFFER_UNLOCK(buf);
    return result;
}
/** Helper: realigns the memory in chain->buffer so that misalign is 0. */
static void
evbuffer_chain_align(struct evbuffer_chain *chain)
{
    EVUTIL_ASSERT(!(chain->flags & EVBUFFER_IMMUTABLE));
    EVUTIL_ASSERT(!(chain->flags & EVBUFFER_MEM_PINNED_ANY));
    memmove(chain->buffer, chain->buffer + chain->misalign, chain->off);
    chain->misalign = 0;
}

#define MAX_TO_COPY_IN_EXPAND 4096
#define MAX_TO_REALIGN_IN_EXPAND 2048

/** Helper: return true iff we should realign chain to fit datalen bytes of
    data in it. */
static int
evbuffer_chain_should_realign(struct evbuffer_chain *chain,
    size_t datlen)
{
    return chain->buffer_len - chain->off >= datlen &&
        (chain->off < chain->buffer_len / 2) &&
        (chain->off <= MAX_TO_REALIGN_IN_EXPAND);
}

evbuffer_add比較簡單,需要注意的是呼叫evbuffer_add要確保last指向的即使最後一塊有資料的chain,另外 evbuffer非常注重效率和空間利用率的平衡,evbuffer_chain_should_realign就是判斷當前的chain是否需要移動的,只有滿足條件移動資料才有意義。evbuffer_chain_should_realign的判定條件移動後的容量足夠datlen長度,並且當前資料長度不超過總長度的1/2並且小於MAX_TO_REALIGN_IN_EXPAND。下面是remove函式:

/* Reads data from an event buffer and drains the bytes read */
int
evbuffer_remove(struct evbuffer *buf, void *data_out, size_t datlen)
{
    ev_ssize_t n;
    EVBUFFER_LOCK(buf);
    n = evbuffer_copyout(buf, data_out, datlen);
    if (n > 0) {
        if (evbuffer_drain(buf, n)<0)
            n = -1;
    }
    EVBUFFER_UNLOCK(buf);
    return (int)n;
}

ev_ssize_t
evbuffer_copyout(struct evbuffer *buf, void *data_out, size_t datlen)
{
    /*XXX fails badly on sendfile case. */
    struct evbuffer_chain *chain;
    char *data = data_out;
    size_t nread;
    ev_ssize_t result = 0;

    EVBUFFER_LOCK(buf);

    chain = buf->first;

    if (datlen >= buf->total_len)
        datlen = buf->total_len;

    if (datlen == 0)
        goto done;

    if (buf->freeze_start) {
        result = -1;
        goto done;
    }

    nread = datlen;

    while (datlen && datlen >= chain->off) {
        memcpy(data, chain->buffer + chain->misalign, chain->off);
        data += chain->off;
        datlen -= chain->off;

        chain = chain->next;
        EVUTIL_ASSERT(chain || datlen==0);
    }

    if (datlen) {
        EVUTIL_ASSERT(chain);
        memcpy(data, chain->buffer + chain->misalign, datlen);
    }

    result = nread;
done:
    EVBUFFER_UNLOCK(buf);
    return result;
}

evbuffer_remove呼叫兩個函式,evbuffer_copyout和evbuffer_drain,前者主要是從evbuffer中拷貝出datlen大小的資料到data_out中,後者從evbuffer中移除指定大小的資料。evbuffer_drain時evbuffer中比較常用的函式之一:

int evbuffer_drain(struct evbuffer *buf, size_t len)
{
    struct evbuffer_chain *chain, *next;
    size_t remaining, old_len;
    int result = 0;

    EVBUFFER_LOCK(buf);
    old_len = buf->total_len;

    if (old_len == 0)
        goto done;

    if (buf->freeze_start) {
        result = -1;
        goto done;
    }

    if (len >= old_len && !HAS_PINNED_R(buf)) {
        len = old_len;
        for (chain = buf->first; chain != NULL; chain = next) {
            next = chain->next;
            evbuffer_chain_free(chain);
        }

        ZERO_CHAIN(buf);
    } else {
        if (len >= old_len)
            len = old_len;

        buf->total_len -= len;
        remaining = len;
        for (chain = buf->first;
             remaining >= chain->off;
             chain = next) {
            next = chain->next;
            remaining -= chain->off;

            if (chain == *buf->last_with_datap) {
                buf->last_with_datap = &buf->first;
            }
            if (&chain->next == buf->last_with_datap)
                buf->last_with_datap = &buf->first;

            if (CHAIN_PINNED_R(chain)) {
                EVUTIL_ASSERT(remaining == 0);
                chain->misalign += chain->off;
                chain->off = 0;
                break;
            } else
                evbuffer_chain_free(chain);
        }

        buf->first = chain;
        if (chain) {
            chain->misalign += remaining;
            chain->off -= remaining;
        }
    }

    buf->n_del_for_cb += len;
    /* Tell someone about changes in this buffer */
    evbuffer_invoke_callbacks(buf);

done:
    EVBUFFER_UNLOCK(buf);
    return result;
}

EVBUFFER_MEM_PINNED_R標記主要用於iocp,使用方式也將在iocp的章節詳細分析。

evbuffer_read & evbuffer_write

看完add和remove,接下來的一隊時read和write,這兩個函式主要用於套接字的讀寫:

int evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
{
    struct evbuffer_chain **chainp;
    int n;
    int result;

#ifdef USE_IOVEC_IMPL
    int nvecs, i, remaining;
#else
    struct evbuffer_chain *chain;
    unsigned char *p;
#endif

    EVBUFFER_LOCK(buf);

    if (buf->freeze_end) {
        result = -1;
        goto done;
    }

    n = get_n_bytes_readable_on_socket(fd);
    if (n <= 0 || n > EVBUFFER_MAX_READ)
        n = EVBUFFER_MAX_READ;
    if (howmuch < 0 || howmuch > n)
        howmuch = n;

#ifdef USE_IOVEC_IMPL
    /* Since we can use iovecs, we're willing to use the last
     * NUM_READ_IOVEC chains. */
    if (_evbuffer_expand_fast(buf, howmuch, NUM_READ_IOVEC) == -1) {
        result = -1;
        goto done;
    } else {
        IOV_TYPE vecs[NUM_READ_IOVEC];
#ifdef _EVBUFFER_IOVEC_IS_NATIVE
        nvecs = _evbuffer_read_setup_vecs(buf, howmuch, vecs,
            NUM_READ_IOVEC, &chainp, 1);
#else
        /* We aren't using the native struct iovec.  Therefore,
           we are on win32. */
        struct evbuffer_iovec ev_vecs[NUM_READ_IOVEC];
        nvecs = _evbuffer_read_setup_vecs(buf, howmuch, ev_vecs, 2,
            &chainp, 1);

        for (i=0; i < nvecs; ++i)
            WSABUF_FROM_EVBUFFER_IOV(&vecs[i], &ev_vecs[i]);
#endif

#ifdef WIN32
        {
            DWORD bytesRead;
            DWORD flags=0;
            if (WSARecv(fd, vecs, nvecs, &bytesRead, &flags, NULL, NULL)) {
                /* The read failed. It might be a close,
                 * or it might be an error. */
                if (WSAGetLastError() == WSAECONNABORTED)
                    n = 0;
                else
                    n = -1;
            } else
                n = bytesRead;
        }
#else
        n = readv(fd, vecs, nvecs);
#endif
    }

#else /*!USE_IOVEC_IMPL*/
    /* If we don't have FIONREAD, we might waste some space here */
    /* XXX we _will_ waste some space here if there is any space left
     * over on buf->last. */
    if ((chain = evbuffer_expand_singlechain(buf, howmuch)) == NULL) {
        result = -1;
        goto done;
    }

    /* We can append new data at this point */
    p = chain->buffer + chain->misalign + chain->off;

#ifndef WIN32
    n = read(fd, p, howmuch);
#else
    n = recv(fd, p, howmuch, 0);
#endif
#endif /* USE_IOVEC_IMPL */

    if (n == -1) {
        result = -1;
        goto done;
    }
    if (n == 0) {
        result = 0;
        goto done;
    }

#ifdef USE_IOVEC_IMPL
    remaining = n;
    for (i=0; i < nvecs; ++i) {
        ev_ssize_t space = (ev_ssize_t) CHAIN_SPACE_LEN(*chainp);
        if (space < remaining) {
            (*chainp)->off += space;
            remaining -= (int)space;
        } else {
            (*chainp)->off += remaining;
            buf->last_with_datap = chainp;
            break;
        }
        chainp = &(*chainp)->next;
    }
#else
    chain->off += n;
    advance_last_with_data(buf);
#endif
    buf->total_len += n;
    buf->n_add_for_cb += n;

    /* Tell someone about changes in this buffer */
    evbuffer_invoke_callbacks(buf);
    result = n;
done:
    EVBUFFER_UNLOCK(buf);
    return result;
}

evbuffer_read函式受限通過get_n_bytes_readable_on_socket獲取當前最大的可讀入資料,然後根據USE_IOVEC_IMPL巨集定義判定是否使用IOVEC方式傳送。IOVEC方式可以一次傳送多段資料,和evbuffer中的chain搭配使用可以減少系統呼叫次數,提高效率。_evbuffer_expand_fast函式擴充evbuffer的chain,保證尾部的n(或者小於n)個chain有至少datlen大小的空餘空間可供寫入資料。

int _evbuffer_expand_fast(struct evbuffer *buf, size_t datlen, int n)
{
    struct evbuffer_chain *chain = buf->last, *tmp, *next;
    size_t avail;
    int used;

    ASSERT_EVBUFFER_LOCKED(buf);
    EVUTIL_ASSERT(n >= 2);

    if (chain == NULL || (chain->flags & EVBUFFER_IMMUTABLE)) {
        /* There is no last chunk, or we can't touch the last chunk.
         * Just add a new chunk. */
        chain = evbuffer_chain_new(datlen);
        if (chain == NULL)
            return (-1);

        evbuffer_chain_insert(buf, chain);
        return (0);
    }

    used = 0; /* number of chains we're using space in. */
    avail = 0; /* how much space they have. */
    /* How many bytes can we stick at the end of buffer as it is?  Iterate
     * over the chains at the end of the buffer, tring to see how much
     * space we have in the first n. */
    for (chain = *buf->last_with_datap; chain; chain = chain->next) {
        if (chain->off) {
            size_t space = (size_t) CHAIN_SPACE_LEN(chain);
            EVUTIL_ASSERT(chain == *buf->last_with_datap);
            if (space) {
                avail += space;
                ++used;
            }
        } else {
            /* No data in chain; realign it. */
            chain->misalign = 0;
            avail += chain->buffer_len;
            ++used;
        }
        if (avail >= datlen) {
            /* There is already enough space.  Just return */
            return (0);
        }
        if (used == n)
            break;
    }

    /* There wasn't enough space in the first n chains with space in
     * them. Either add a new chain with enough space, or replace all
     * empty chains with one that has enough space, depending on n. */
    if (used < n) {
        /* The loop ran off the end of the chains before it hit n
         * chains; we can add another. */
        EVUTIL_ASSERT(chain == NULL);

        tmp = evbuffer_chain_new(datlen - avail);
        if (tmp == NULL)
            return (-1);

        buf->last->next = tmp;
        buf->last = tmp;
        /* (we would only set last_with_data if we added the first
         * chain. But if the buffer had no chains, we would have
         * just allocated a new chain earlier) */
        return (0);
    } else {
        /* Nuke _all_ the empty chains. */
        int rmv_all = 0; /* True iff we removed last_with_data. */
        chain = *buf->last_with_datap;
        if (!chain->off) {
            EVUTIL_ASSERT(chain == buf->first);
            rmv_all = 1;
            avail = 0;
        } else {
            avail = (size_t) CHAIN_SPACE_LEN(chain);
            chain = chain->next;
        }
        for (; chain; chain = next) {
            next = chain->next;
            EVUTIL_ASSERT(chain->off == 0);
            evbuffer_chain_free(chain);
        }
        tmp = evbuffer_chain_new(datlen - avail);
        if (tmp == NULL) {
            if (rmv_all) {
                ZERO_CHAIN(buf);
            } else {
                buf->last = *buf->last_with_datap;
                (*buf->last_with_datap)->next = NULL;
            }
            return (-1);
        }
        if (rmv_all) {
            buf->first = buf->last = tmp;
            buf->last_with_datap = &buf->first;
        } else {
            (*buf->last_with_datap)->next = tmp;
            buf->last = tmp;
        }
        return (0);
    }
}

_evbuffer_read_setup_vecs函式用於填充evbuffer_iovec結構體,之後就可以通過readv或者WSARecv讀取資料了。如果不是使用IOVEC則需要呼叫evbuffer_expand_singlechain和_evbuffer_expand_fast作用相似,

/* Expands the available space in the event buffer to at least datlen, all in
 * a single chunk.  Return that chunk. */
static struct evbuffer_chain *
evbuffer_expand_singlechain(struct evbuffer *buf, size_t datlen)
{
    struct evbuffer_chain *chain, **chainp;
    struct evbuffer_chain *result = NULL;
    ASSERT_EVBUFFER_LOCKED(buf);

    chainp = buf->last_with_datap;

    /* XXX If *chainp is no longer writeable, but has enough space in its
     * misalign, this might be a bad idea: we could still use *chainp, not
     * (*chainp)->next. */
    if (*chainp && CHAIN_SPACE_LEN(*chainp) == 0)
        chainp = &(*chainp)->next;

    /* 'chain' now points to the first chain with writable space (if any)
     * We will either use it, realign it, replace it, or resize it. */
    chain = *chainp;

    if (chain == NULL ||
        (chain->flags & (EVBUFFER_IMMUTABLE|EVBUFFER_MEM_PINNED_ANY))) {
        /* We can't use the last_with_data chain at all.  Just add a
         * new one that's big enough. */
        goto insert_new;
    }

    /* If we can fit all the data, then we don't have to do anything */
    if (CHAIN_SPACE_LEN(chain) >= datlen) {
        result = chain;
        goto ok;
    }

    /* If the chain is completely empty, just replace it by adding a new
     * empty chain. */
    if (chain->off == 0) {
        goto insert_new;
    }

    /* If the misalignment plus the remaining space fulfills our data
     * needs, we could just force an alignment to happen.  Afterwards, we
     * have enough space.  But only do this if we're saving a lot of space
     * and not moving too much data.  Otherwise the space savings are
     * probably offset by the time lost in copying.
     */
    if (evbuffer_chain_should_realign(chain, datlen)) {
        evbuffer_chain_align(chain);
        result = chain;
        goto ok;
    }

    /* At this point, we can either resize the last chunk with space in
     * it, use the next chunk after it, or   If we add a new chunk, we waste
     * CHAIN_SPACE_LEN(chain) bytes in the former last chunk.  If we
     * resize, we have to copy chain->off bytes.
     */

    /* Would expanding this chunk be affordable and worthwhile? */
    if (CHAIN_SPACE_LEN(chain) < chain->buffer_len / 8 ||
        chain->off > MAX_TO_COPY_IN_EXPAND) {
        /* It's not worth resizing this chain. Can the next one be
         * used? */
        if (chain->next && CHAIN_SPACE_LEN(chain->next) >= datlen) {
            /* Yes, we can just use the next chain (which should
             * be empty. */
            result = chain->next;
            goto ok;
        } else {
            /* No; append a new chain (which will free all
             * terminal empty chains.) */
            goto insert_new;
        }
    } else {
        /* Okay, we're going to try to resize this chain: Not doing so
         * would waste at least 1/8 of its current allocation, and we
         * can do so without having to copy more than
         * MAX_TO_COPY_IN_EXPAND bytes. */
        /* figure out how much space we need */
        size_t length = chain->off + datlen;
        struct evbuffer_chain *tmp = evbuffer_chain_new(length);
        if (tmp == NULL)
            goto err;

        /* copy the data over that we had so far */
        tmp->off = chain->off;
        memcpy(tmp->buffer, chain->buffer + chain->misalign,
            chain->off);
        /* fix up the list */
        EVUTIL_ASSERT(*chainp == chain);
        result = *chainp = tmp;

        if (buf->last == chain)
            buf->last = tmp;

        tmp->next = chain->next;
        evbuffer_chain_free(chain);
        goto ok;
    }

insert_new:
    result = evbuffer_chain_insert_new(buf, datlen);
    if (!result)
        goto err;
ok:
    EVUTIL_ASSERT(result);
    EVUTIL_ASSERT(CHAIN_SPACE_LEN(result) >= datlen);
err:
    return result;
}

evbuffer_expand_singlechain會呼叫evbuffer_chain_should_realign來檢視是否可以在把last_with_data指向的chain通過移動來擴充套件空間,如果不可以則看是否需要把last_with_data指向的chain移動到新的chain中,判斷依據是該chain的空間利用率不足八分之一併且總的長度小於MAX_TO_COPY_IN_EXPAND。
看完read,接下來分析write:

int
evbuffer_write(struct evbuffer *buffer, evutil_socket_t fd)
{
    return evbuffer_write_atmost(buffer, fd, -1);
}
int
evbuffer_write_atmost(struct evbuffer *buffer, evutil_socket_t fd,
    ev_ssize_t howmuch)
{
    int n = -1;

    EVBUFFER_LOCK(buffer);

    if (buffer->freeze_start) {
        goto done;
    }

    if (howmuch < 0 || (size_t)howmuch > buffer->total_len)
        howmuch = buffer->total_len;

    if (howmuch > 0) {
#ifdef USE_SENDFILE
        struct evbuffer_chain *chain = buffer->first;
        if (chain != NULL && (chain->flags & EVBUFFER_SENDFILE))
            n = evbuffer_write_sendfile(buffer, fd, howmuch);
        else {
#endif
#ifdef USE_IOVEC_IMPL
        n = evbuffer_write_iovec(buffer, fd, howmuch);
#elif defined(WIN32)
        /* XXX(nickm) Don't disable this code until we know if
         * the WSARecv code above works. */
        void *p = evbuffer_pullup(buffer, howmuch);
        n = send(fd, p, howmuch, 0);
#else
        void *p = evbuffer_pullup(buffer, howmuch);
        n = write(fd, p, howmuch);
#endif
#ifdef USE_SENDFILE
        }
#endif
    }

    if (n > 0)
        evbuffer_drain(buffer, n);

done:
    EVBUFFER_UNLOCK(buffer);
    return (n);
}

evbuffer_write_atmost函式有三種情況,第一種是chain->flags 有 EVBUFFER_SENDFILE標記時,呼叫evbuffer_write_sendfile方法:

#ifdef USE_SENDFILE
static inline int
evbuffer_write_sendfile(struct evbuffer *buffer, evutil_socket_t fd,
    ev_ssize_t howmuch)
{
    struct evbuffer_chain *chain = buffer->first;
    struct evbuffer_chain_fd *info =
        EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd, chain);
#if defined(SENDFILE_IS_MACOSX) || defined(SENDFILE_IS_FREEBSD)
    int res;
    off_t len = chain->off;
#elif defined(SENDFILE_IS_LINUX) || defined(SENDFILE_IS_SOLARIS)
    ev_ssize_t res;
    off_t offset = chain->misalign;
#endif

    ASSERT_EVBUFFER_LOCKED(buffer);

#if defined(SENDFILE_IS_MACOSX)
    res = sendfile(info->fd, fd, chain->misalign, &len, NULL, 0);
    if (res == -1 && !EVUTIL_ERR_RW_RETRIABLE(errno))
        return (-1);

    return (len);
#elif defined(SENDFILE_IS_FREEBSD)
    res = sendfile(info->fd, fd, chain->misalign, chain->off, NULL, &len, 0);
    if (res == -1 && !EVUTIL_ERR_RW_RETRIABLE(errno))
        return (-1);

    return (len);
#elif defined(SENDFILE_IS_LINUX)
    /* TODO(niels): implement splice */
    res = sendfile(fd, info->fd, &offset, chain->off);
    if (res == -1 && EVUTIL_ERR_RW_RETRIABLE(errno)) {
        /* if this is EAGAIN or EINTR return 0; otherwise, -1 */
        return (0);
    }
    return (res);
#elif defined(SENDFILE_IS_SOLARIS)
    {
        const off_t offset_orig = offset;
        res = sendfile(fd, info->fd, &offset, chain->off);
        if (res == -1 && EVUTIL_ERR_RW_RETRIABLE(errno)) {
            if (offset - offset_orig)
                return offset - offset_orig;
            /* if this is EAGAIN or EINTR and no bytes were
             * written, return 0 */
            return (0);
        }
        return (res);
    }
#endif
}
#endif

該函式主要是在支援sendfile的系統上直接傳送檔案內容到套接字中,EVBUFFER_SENDFILE之前分析過。
第二種情況是呼叫evbuffer_write_iovec:

#ifdef USE_IOVEC_IMPL
static inline int
evbuffer_write_iovec(struct evbuffer *buffer, evutil_socket_t fd,
    ev_ssize_t howmuch)
{
    IOV_TYPE iov[NUM_WRITE_IOVEC];
    struct evbuffer_chain *chain = buffer->first;
    int n, i = 0;

    if (howmuch < 0)
        return -1;

    ASSERT_EVBUFFER_LOCKED(buffer);
    /* XXX make this top out at some maximal data length?  if the
     * buffer has (say) 1MB in it, split over 128 chains, there's
     * no way it all gets written in one go. */
    while (chain != NULL && i < NUM_WRITE_IOVEC && howmuch) {
#ifdef USE_SENDFILE
        /* we cannot write the file info via writev */
        if (chain->flags & EVBUFFER_SENDFILE)
            break;
#endif
        iov[i].IOV_PTR_FIELD = (void *) (chain->buffer + chain->misalign);
        if ((size_t)howmuch >= chain->off) {
            /* XXXcould be problematic when windows supports mmap*/
            iov[i++].IOV_LEN_FIELD = (IOV_LEN_TYPE)chain->off;
            howmuch -= chain->off;
        } else {
            /* XXXcould be problematic when windows supports mmap*/
            iov[i++].IOV_LEN_FIELD = (IOV_LEN_TYPE)howmuch;
            break;
        }
        chain = chain->next;
    }
    if (! i)
        return 0;
#ifdef WIN32
    {
        DWORD bytesSent;
        if (WSASend(fd, iov, i, &bytesSent, 0, NULL, NULL))
            n = -1;
        else
            n = bytesSent;
    }
#else
    n = writev(fd, iov, i);
#endif
    return (n);
}
#endif

iovec可以結合chain使用,減少系統呼叫。
第三種情況是隻能使用send或者wtire傳送單塊的記憶體資料,這時需要呼叫evbuffer_pullup將evbuff前面size長度的資料放置到一塊chain中:

unsigned char *evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size)
{
    struct evbuffer_chain *chain, *next, *tmp, *last_with_data;
    unsigned char *buffer, *result = NULL;
    ev_ssize_t remaining;
    int removed_last_with_data = 0;
    int removed_last_with_datap = 0;

    EVBUFFER_LOCK(buf);
    chain = buf->first;

    if (size < 0)
        size = buf->total_len;
    /* if size > buf->total_len, we cannot guarantee to the user that she
     * is going to have a long enough buffer afterwards; so we return
     * NULL */
    if (size == 0 || (size_t)size > buf->total_len)
        goto done;

    /* No need to pull up anything; the first size bytes are
     * already here. */
    if (chain->off >= (size_t)size) {
        result = chain->buffer + chain->misalign;
        goto done;
    }

    /* Make sure that none of the chains we need to copy from is pinned. */
    remaining = size - chain->off;
    EVUTIL_ASSERT(remaining >= 0);
    for (tmp=chain->next; tmp; tmp=tmp->next) {
        if (CHAIN_PINN