1. 程式人生 > >libevent實現http server

libevent實現http server

    libevent 是一個事件觸發的網路庫,適用於 windows、linux、bsd 、Android 等多種平臺,內部使用 select、epoll、kqueue 、完成埠等系統呼叫管理事件機制。著名分散式快取軟體 memcached 也是 libevent based 。

    最近在學習 libevent ,之前基於 libevent 實現了一個 http client ,沒有用到 bufferevent 。這次實現了一個 http server ,很簡單,只支援 GET 方法,不支援 Range 請求,但完全自己實現,是一個完整可用的示例。這裡使用 libevent-2.1.3-alpha 。

    我關於 libevent 的其它文章,列在這裡供參考:

    使用 libevent 實現一個 http server ,有這麼幾個步驟:

  1. 監聽
  2. 啟動事件迴圈
  3. 接受連線
  4. 解析 http 請求
  5. 迴應客戶端

    關於監聽, libevent 提供了 evconnlistener ,使用起來非常簡單,通過一些設定,呼叫 evconnlistener_new_bind 即可完成一個服務端 socket 的建立,可以參考官方文件Connection Listeners 。下面是啟動 server 的程式碼:

int start_http_server(struct event_base *evbase)
{
    int bind_times = 0;
    struct sockaddr_in sin;

    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
#ifdef WIN32
    sin.sin_addr.S_un.S_addr = inet_addr(g_host);
#else
    sin.sin_addr.s_addr = inet_addr(g_host);
#endif
    sin.sin_port = htons(g_port);

trybind:
    g_listener = evconnlistener_new_bind(
                evbase, _accept_connection, 0,
                LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_EXEC|LEV_OPT_DEFERRED_ACCEPT, -1,
                (struct sockaddr*)&sin, sizeof(sin));
    if (!g_listener)
    {
        if(bind_times++ >=3)
        {
            printf("couldn\'t create listener\n");
            return 1;
        }
        else
        {
            sin.sin_port = 0;
            goto trybind;
        }
    }
    else if(bind_times > 0)
    {
        socklen_t len = sizeof(sin);
        getsockname(evconnlistener_get_fd(g_listener),
                    (struct sockaddr*)&sin, &len);
        g_port = ntohs(sin.sin_port);
    }
    evconnlistener_set_error_cb(g_listener, _accept_error_cb);

    return 0;
}

    關於事件迴圈,event_base_new 可以建立一個 event_base 例項, event_base_loop 可以進入事件迴圈。下面是 main() 函式中關於事件迴圈的程式碼:

    g_evbase = event_base_new();

    if( 0 == start_http_server(g_evbase) )
    {
        event_base_loop(g_evbase, EVLOOP_NO_EXIT_ON_EMPTY);
        printf("httpserver exit now.\n");
    }
    else
    {
        printf("httpserver, start server failed\n");
    }

    event_base_free(g_evbase);
    上面的程式碼中,啟動事件迴圈時傳遞了一個標誌 EVLOOP_NO_EXIT_ON_EMPTY ,對於伺服器程式,這是必須的,否則在沒有待處理事件時,事件迴圈會立即退出。

    通過給 evconnlistener 設定一些回撥,就可以接受連線、處理錯誤。下面是相關程式碼:

static void _accept_connection(struct evconnlistener *listener,
                               evutil_socket_t fd, struct sockaddr *addr
                               , int socklen, void * ctx)
{
    char address[64];
    struct http_connection *conn;
    struct sockaddr_in sin;
    short port = 0;
    /* get address and port*/
    memcpy(&sin, addr, sizeof(sin));
    sprintf(address, "%s", inet_ntoa(sin.sin_addr));
    port = ntohs(sin.sin_port);
#ifdef HTTP_SERVER_DEBUG
    printf("httpserver, accept one connection from %s:%d\n", address, port);
#endif
    conn = new_http_connection(evconnlistener_get_base(listener),
                               fd,
                               address,
                               port);
}

static void _accept_error_cb(struct evconnlistener *listener, void *ctx)
{
    int err = EVUTIL_SOCKET_ERROR();
    printf("httpserver, got an error %d (%s) on the listener.\n"
              , err, evutil_socket_error_to_string(err));
}
    在建立呼叫 evconnlistener_new_bind 時我們傳入了 _accept_connection 函式,當有連線進來時,_accept_connection 呼叫 new_http_connection 函式來處理。錯誤處理回撥 _accept_error_cb 是通過 evconnlistener_set_error_cb 設定的,在上面的錯誤處理回撥函式中,我們僅僅是輸出了一條日誌。

    解析 http 請求,這裡還是使用 《使用http_parser解析URL》一文中提到的http_parser 。先看下 new_http_connection 函式的實現:

struct http_connection * new_http_connection(
                                             struct event_base *evbase,
                                             evutil_socket_t fd,
                                             char *address, int port)
{
    struct http_connection * conn = (struct http_connection*)malloc(sizeof(struct http_connection));
    conn->evbase = evbase;
    conn->fd = fd;
    conn->peer_address = strdup(address);
    conn->peer_port = port;
    conn->tv_timeout.tv_sec = 10;
    conn->tv_timeout.tv_usec = 0;

    conn->bev = bufferevent_socket_new(evbase, fd, BEV_OPT_CLOSE_ON_FREE);
    bufferevent_setcb(conn->bev, _read_callback, _write_callback, _event_callback, conn);
    bufferevent_enable(conn->bev, EV_READ|EV_TIMEOUT);
    bufferevent_set_timeouts(conn->bev, &conn->tv_timeout, &conn->tv_timeout);

    conn->parser_settings.on_message_begin = onHttpMessageBegin;
    conn->parser_settings.on_url = onHttpUrl;
    conn->parser_settings.on_header_field = onHttpHeaderField;
    conn->parser_settings.on_header_value = onHttpHeaderValue;
    conn->parser_settings.on_headers_complete = onHttpHeadersComplete;
    conn->parser_settings.on_body = onHttpBody;
    conn->parser_settings.on_message_complete = onHttpMessageComplete;
    conn->cur_header_tag = 0;
    conn->cur_tag_cap = 0;
    conn->cur_tag_size = 0;
    conn->cur_header_value = 0;
    conn->cur_value_cap = 0;
    conn->cur_value_size = 0;
    conn->header_tags = 0;
    conn->header_size = 0;
    conn->header_capacity = 0;
    conn->header_values = 0;
    conn->fp = 0;
    conn->data_size = 0;
    conn->remain = 0;
    conn->method = 0;
    conn->path = 0;
    conn->query_string = 0;
    conn->status_code = 0;

    conn->parser.data = conn;
    http_parser_init(&conn->parser, HTTP_REQUEST);

    return conn;
}
    上面的程式碼根據傳入的 socket 描述符和 event_base 完成了傳入連線的配置工作。主要有幾部分:
  • 建立 bufferevent
  • 設定讀寫回調
  • 配置 http_parser_setting ,主要是一些回撥函式,http_parser 分析資料後酌情呼叫
  • 初始化 http_parser,呼叫 http_parser_init,注意傳入型別是 HTTP_REQUEST

    上面程式碼中的結構體 struct http_connection 儲存了一個連線相關的所有資料,其定義如下:

struct http_connection {
    struct http_parser parser;
    struct http_parser_settings parser_settings;
    char *cur_header_tag;
    int cur_tag_cap;
    int cur_tag_size;
    char *cur_header_value;
    int cur_value_cap;
    int cur_value_size;
    char buffer[BUFFER_SIZE];

    struct event_base *evbase;
    evutil_socket_t fd;
    struct bufferevent *bev;
    struct timeval tv_timeout;

    char *peer_address;
    int peer_port;
    int state;
    unsigned write_enabled:1;
    unsigned user_stop:1;

    /* request info */
    const char * method;
    char * path;
    char * query_string;
    char version[4];
    char **header_tags;
    char **header_values;
    int header_capacity;
    int header_size;

    /* response info */
    FILE *fp;
    long remain;
    int data_size;
    int status_code;
};
    需要說明的是,這裡只是個示例, http 請求、響應、連線處理全部放在了一起,看起來比較方面。

    關於 http 頭部、資料解析, http_parser 會為我們做好一切,我們只要儲存即可。

    關於 http 響應,需要我們自己構建狀態行、必要的頭部資訊(如 Content-Length )。

    所有這些,請參考文後的程式碼。

    迴應客戶端的這裡使用 bufferevent socket 。 libevent 抽象了一種緩衝機制,來給大多數應用場景提供方便,關於 bufferevents ,請參考官方文件(http://www.wangafu.net/~nickm/libevent-book/Ref6_bufferevent.html)。

    使用 bufferevent socket 處理連線非常簡單,只需要設定讀寫回調即可,這在上面已經提到,不再贅述。

    到這裡為止,關於一個 http server 的所有事情就說完了。下面是 http_connection.c 的所有程式碼( new_http_connection 函式的程式碼在前面),可以正常執行。為使程式碼比較清晰,這裡關於錯誤、封裝、解耦等都簡化處理了。

#define HTTP_HEADER_BEGIN        0x1
#define HTTP_HEADER_COMPLETE     0x2
#define HTTP_MESSAGE_BEGIN       0x4
#define HTTP_MESSAGE_COMPLETE    0x8


#define STATUS_CODE(code, str) case code: return str;

static const char * _status_string(int code)
{
    switch(code)
    {
    STATUS_CODE(100, "Continue")
    STATUS_CODE(101, "Switching Protocols")
    STATUS_CODE(102, "Processing")                 // RFC 2518) obsoleted by RFC 4918
    STATUS_CODE(200, "OK")
    STATUS_CODE(201, "Created")
    STATUS_CODE(202, "Accepted")
    STATUS_CODE(203, "Non-Authoritative Information")
    STATUS_CODE(204, "No Content")
    STATUS_CODE(205, "Reset Content")
    STATUS_CODE(206, "Partial Content")
    STATUS_CODE(207, "Multi-Status")               // RFC 4918
    STATUS_CODE(300, "Multiple Choices")
    STATUS_CODE(301, "Moved Permanently")
    STATUS_CODE(302, "Moved Temporarily")
    STATUS_CODE(303, "See Other")
    STATUS_CODE(304, "Not Modified")
    STATUS_CODE(305, "Use Proxy")
    STATUS_CODE(307, "Temporary Redirect")
    STATUS_CODE(400, "Bad Request")
    STATUS_CODE(401, "Unauthorized")
    STATUS_CODE(402, "Payment Required")
    STATUS_CODE(403, "Forbidden")
    STATUS_CODE(404, "Not Found")
    STATUS_CODE(405, "Method Not Allowed")
    STATUS_CODE(406, "Not Acceptable")
    STATUS_CODE(407, "Proxy Authentication Required")
    STATUS_CODE(408, "Request Time-out")
    STATUS_CODE(409, "Conflict")
    STATUS_CODE(410, "Gone")
    STATUS_CODE(411, "Length Required")
    STATUS_CODE(412, "Precondition Failed")
    STATUS_CODE(413, "Request Entity Too Large")
    STATUS_CODE(414, "Request-URI Too Large")
    STATUS_CODE(415, "Unsupported Media Type")
    STATUS_CODE(416, "Requested Range Not Satisfiable")
    STATUS_CODE(417, "Expectation Failed")
    STATUS_CODE(418, "I\"m a teapot")              // RFC 2324
    STATUS_CODE(422, "Unprocessable Entity")       // RFC 4918
    STATUS_CODE(423, "Locked")                     // RFC 4918
    STATUS_CODE(424, "Failed Dependency")          // RFC 4918
    STATUS_CODE(425, "Unordered Collection")       // RFC 4918
    STATUS_CODE(426, "Upgrade Required")           // RFC 2817
    STATUS_CODE(500, "Internal Server Error")
    STATUS_CODE(501, "Not Implemented")
    STATUS_CODE(502, "Bad Gateway")
    STATUS_CODE(503, "Service Unavailable")
    STATUS_CODE(504, "Gateway Time-out")
    STATUS_CODE(505, "HTTP Version not supported")
    STATUS_CODE(506, "Variant Also Negotiates")    // RFC 2295
    STATUS_CODE(507, "Insufficient Storage")       // RFC 4918
    STATUS_CODE(509, "Bandwidth Limit Exceeded")
    STATUS_CODE(510, "Not Extended")                // RFC 2774
    }

    return 0;
}

static void _prepare_response(struct http_connection *conn);
static void _disable_write(struct http_connection *conn);
static void _enable_write(struct http_connection *conn);
static void _close_socket(struct http_connection * conn);
static void _peacefull_close(struct http_connection * conn);
static void _send_response_header(struct http_connection *conn);

/*
* http_parser callback
*/
static int onHttpMessageBegin(http_parser *parser)
{
    struct http_connection * conn = (struct http_connection *)parser->data;
    conn->state |= HTTP_MESSAGE_BEGIN;
    return 0;
}

static int onHttpUrl(http_parser *parser, const char *at, size_t length)
{
    struct http_connection *conn = (struct http_connection *)parser->data;
    int i= 0;

    conn->path = (char*)malloc(length+1);
    strncpy(conn->path, at, length);
    conn->path[length] = 0;

    for(; i < length && at[i] != '?'; i++);
    if(i < length)
    {
        /* got query string */
        i++;
        if(i < length)
        {
            int qlen = length - i;
            conn->query_string = (char*)malloc(qlen + 1);
            strncpy(conn->query_string, at+i, qlen);
            conn->query_string[qlen] = 0;
        }
    }
    return 0;
}

static inline void check_insert_header(http_parser *parser, struct http_connection *conn)
{
    /*
     * insert the header we parsed previously
     * into the header map
     */
    if( (conn->cur_header_tag && conn->cur_header_tag[0] != 0) &&
            (conn->cur_header_value && conn->cur_header_value[0] != 0))
    {
        if(!conn->header_tags ||
                conn->header_size == conn->header_capacity)
        {
            conn->header_capacity += 8;
            conn->header_tags = (char**)realloc(conn->header_tags,sizeof(char*)*conn->header_capacity);
            conn->header_values = (char**)realloc(conn->header_tags, sizeof(char*)*conn->header_capacity);
        }

        conn->header_tags[conn->header_size] = conn->cur_header_tag;
        conn->header_values[conn->header_size++] = conn->cur_header_value;

        /*
         *  clear header value. this sets up a nice
         * feedback loop where the next time
         * HeaderValue is called, it can simply append
        */

        conn->cur_header_tag = 0;
        conn->cur_tag_cap = 0;
        conn->cur_tag_size = 0;
        conn->cur_header_value = 0;
        conn->cur_value_cap = 0;
        conn->cur_value_size = 0;
    }
}

static void check_dynamic_string(char **str, int *cap, int size, int add_size)
{
    if(!*str || size + add_size >= *cap)
    {
        *cap = size + add_size + 64;
        *str = (char*)realloc(*str, *cap);
    }
}

static int onHttpHeaderField(http_parser *parser, const char *at, size_t length)
{
    struct http_connection * conn = (struct http_connection *)parser->data;

    check_insert_header(parser, conn);
    check_dynamic_string(&conn->cur_header_tag,
                         &conn->cur_tag_cap, conn->cur_tag_size, length);
    strncpy(conn->cur_header_tag + conn->cur_tag_size, at, length);

    return 0;
}

static int onHttpHeaderValue(http_parser *parser, const char *at, size_t length)
{
    struct http_connection * conn = (struct http_connection *)parser->data;
    check_dynamic_string(&conn->cur_header_value,
                         &conn->cur_value_cap, conn->cur_value_size, length);
    strncpy(conn->cur_header_value + conn->cur_value_size, at, length);

    return 0;
}

static int onHttpHeadersComplete(http_parser *parser)
{
    printf("server, http_connection, onHttpHeadersComplete\n");
    if(parser)
    {
        struct http_connection * conn = (struct http_connection *)parser->data;
        if(conn)
        {
#ifdef HTTP_SERVER_DEBUG
            int i = 0;
#endif
            check_insert_header(parser, conn);

            conn->state |= HTTP_HEADER_COMPLETE;

            conn->method = http_method_str((enum http_method)conn->parser.method);
            sprintf(conn->version, "%.1d.%.1d", conn->parser.http_major,
                    conn->parser.http_minor);

        #ifdef HTTP_SERVER_DEBUG
            printf("server,http_connection, %d headers\n", conn->header_size);
            for(; i < conn->header_size; i++)
            {
                printf("header key %s value %s\n", conn->header_tags[i], conn->header_values[i]);
            }
        #endif
        }
    }
    return 0;
}

static int onHttpBody(http_parser *parser, const char *at, size_t length)
{
    /* TODO: implement */
    return 0;
}

static int onHttpMessageComplete(http_parser *parser)
{
    struct http_connection * conn = (struct http_connection *)parser->data;
    if(conn)
    {
        _prepare_response(conn);;
        conn->state |= HTTP_MESSAGE_COMPLETE;
        _enable_write(conn);
        _send_response_header(conn);
        switch(conn->status_code)
        {
        case 200:
        case 206:
            break;
        default:
            _disable_write(conn);
            _peacefull_close(conn);
            break;
        }
    }
    return 0;
}

static void _prepare_response(struct http_connection *conn)
{
    char *filename = conn->path;
    char *p;
    int i = 0;

    if(strncmp(conn->method, "GET", 3) != 0)
    {
        conn->status_code = 403;
        return;
    }

    while(*filename == '/') filename++;
    p = filename;
    while(*p != '?' && *p != 0) p++;
    if(*p == '?') *p = 0;

    conn->fp = fopen(filename, "rb");
    if(conn->fp)
    {
        fseek(conn->fp, 0, SEEK_END);
        conn->remain = ftell(conn->fp);
        fseek(conn->fp, 0, SEEK_SET);
        conn->status_code = 200;
        printf("open %s OK\n", filename);
    }
    else
    {
        conn->status_code = 404;
    }
}

static void _send_response_header(struct http_connection *conn)
{
    char * p = conn->buffer;
    p += sprintf(p, "HTTP/1.1 %d %s\r\n",
                         conn->status_code,
                         _status_string(conn->status_code));
    if(conn->status_code == 200)
    {
        p += sprintf(p, "Content-Length: %d\r\n"
                     "Content-Type: application/octet-stream\r\n",
                     conn->remain);
    }

    p += sprintf(p, "\r\n");
    //printf("response headers: %s\n", conn->buffer);

    bufferevent_write(conn->bev, conn->buffer, p - conn->buffer);
}

static void _close_socket(struct http_connection * conn)
{
    if(conn && conn->fd != 0)
    {
        if(conn->bev)
        {
            bufferevent_free(conn->bev);
            conn->bev = 0;
        }
        evutil_closesocket(conn->fd);
        conn->fd = 0;
    }
}


static void _peacefull_close(struct http_connection * conn)
{
    if( evbuffer_get_length(bufferevent_get_output(conn->bev)) == 0)
    {
        printf("http_connection, all data sent, close connection(%s:%d)\n"
                  , conn->peer_address, conn->peer_port);

        /* delete this connection */
        delete_http_connection(conn);
        return;
    }
    else
    {
        printf("http_connection, wait bufferevent_socket to flush data\n");
    }
}

static void _event_callback(struct bufferevent *bev, short what, void *ctx)
{
    struct http_connection * conn = (struct http_connection *)ctx;
    if( (what & (BEV_EVENT_READING | BEV_EVENT_TIMEOUT)) == (BEV_EVENT_READING | BEV_EVENT_TIMEOUT))
    {
        /* TODO: check socket alive */
    }
    else if((what &(BEV_EVENT_WRITING | BEV_EVENT_TIMEOUT)) == (BEV_EVENT_WRITING | BEV_EVENT_TIMEOUT))
    {
        /* TODO: check socket alive */
    }
    else if(what & BEV_EVENT_ERROR)
    {
        /* TODO: error notify */
        printf( "http_connection, %s:%d, error - %s\n", conn->peer_address,
                   conn->peer_port,
                   evutil_socket_error_to_string( evutil_socket_geterror(conn->fd) ) );
        _close_socket(conn);
    }
}

static void _disable_write(struct http_connection *conn)
{
    if(conn->write_enabled)
    {
        bufferevent_disable(conn->bev, EV_WRITE|EV_TIMEOUT);
        conn->write_enabled = 0;
    }
}

static void _enable_write(struct http_connection *conn)
{
    if(!conn->write_enabled)
    {
        bufferevent_enable(conn->bev, EV_WRITE | EV_TIMEOUT);
        conn->write_enabled = 1;
    }
}


// default write callback
static void _write_callback(struct bufferevent *bev, void * args)
{
    struct http_connection * conn = (struct http_connection *)args;
    if(conn->fp)
    {
        if(feof(conn->fp))
        {
            printf("http_connection, call peacefull_close via EOF\n");
            _peacefull_close(conn);
        }
        else
        {
            int to_read = BUFFER_SIZE;
            if(to_read > conn->remain) to_read = conn->remain;
            conn->data_size = fread(conn->buffer, 1, to_read, conn->fp);
            conn->remain -= conn->data_size;
#ifdef HTTP_SERVER_DEBUG
            printf("http_connection, read %d bytes\n", conn->data_size);
#endif
            if(conn->data_size)bufferevent_write(bev, conn->buffer, conn->data_size);

        }
    }
    else
    {
        printf("http_connection, call peacefull_close via fp NULL\n");
        _peacefull_close(conn);
    }
}

// default read callback
static void _read_callback(struct bufferevent *bev, void * args)
{
    struct http_connection * conn = (struct http_connection *)args;
    int n;
    while( (n = bufferevent_read(bev, conn->buffer, BUFFER_SIZE)) > 0 )
    {
        http_parser_execute(&conn->parser, &conn->parser_settings, conn->buffer, n);
    }
}

void delete_http_connection(struct http_connection *conn)
{
    int i = 0;
    _disable_write(conn);
    _close_socket(conn);
    /* free resources */
    if(conn->peer_address)free(conn->peer_address);
    if(conn->fp)fclose(conn->fp);
    if(conn->cur_header_tag)free(conn->cur_header_tag);
    if(conn->cur_header_value)free(conn->cur_header_value);
    if(conn->path)free(conn->path);
    if(conn->query_string)free(conn->query_string);
    for(; i < conn->header_size; i++)
    {
        free(conn->header_tags[i]);
        free(conn->header_values[i]);
    }
    free(conn->header_tags);
    free(conn->header_values);
    free(conn);
}