1. 程式人生 > >【slighttpd】基於lighttpd架構的Server專案實戰(4)—簡單的echo伺服器

【slighttpd】基於lighttpd架構的Server專案實戰(4)—簡單的echo伺服器

轉載地址:https://blog.csdn.net/jiange_zh/article/details/50636536

在這一部分,我們將對上一篇中的master-worker進行拓展,成為一個簡單的echo伺服器。

這一步我們需要新增兩個類:Listener和Connection;

Listener的職責:

    1.建立監聽套接字;

    2.註冊監聽套接字事件;

    3.在監聽事件的回撥函式中進行accept並建立新連線;

其標頭檔案如下:

/*************************************************************************
    > File Name: listener.h
    > Author: Jiange
    > Mail: 
[email protected]
> Created Time: 2016年01月27日 星期三 19時46分34秒 ************************************************************************/ #ifndef _LISTENER_H #define _LISTENER_H #include <string> #include "event2/event.h" #include "event2/util.h" #include "util.h" class Worker; class Listener { public: Listener(const std::string &ip, unsigned short port); ~Listener(); bool InitListener(Worker *worker); void AddListenEvent(); static void ListenEventCallback(evutil_socket_t fd, short event, void *arg); Worker *listen_worker; evutil_socket_t listen_sockfd; struct sockaddr_in listen_addr; struct event *listen_event; uint64_t cnt_connection; }; #endif

 接下來看看具體實現:

/*************************************************************************
    > File Name: listener.cpp
    > Author: Jiange
    > Mail: [email protected] 
    > Created Time: 2016年01月27日 星期三 19時48分56秒
 ************************************************************************/
#include "listener.h"
#include "worker.h"
#include "connection.h"

#include <iostream>

Listener::Listener(const std::string &ip, unsigned short port)
{
    //ipv4
    listen_addr.sin_family = AF_INET;
    listen_addr.sin_addr.s_addr = inet_addr(ip.c_str());
    listen_addr.sin_port = htons(port);
    listen_event = NULL;
    cnt_connection = 0;
    std::cout << "Init listener" << std::endl;
}

Listener::~Listener()
{
    if (listen_event)
    {
        event_free(listen_event);
        close(listen_sockfd);
    }
    std::cout<< "Listener closed" << std::endl;
}

bool Listener::InitListener(Worker *worker)
{
    if (-1 == (listen_sockfd = socket(AF_INET, SOCK_STREAM, 0)))
    {
        return false;
    }
    
    //非阻塞
    evutil_make_socket_nonblocking(listen_sockfd);

    int reuse = 1;

    //重用
    setsockopt(listen_sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));

    if (0 != bind(listen_sockfd, (struct sockaddr *)&listen_addr, sizeof(listen_addr))) {
        return false;
    }
    if (0 != listen(listen_sockfd, 5)) {
        return false;
    }
    
    listen_worker = worker;
    return true;
}

/* 這裡單獨作為一個函式,而不是合併到上面函式中。
 * 因為InitListener是在fork之前呼叫的,此時
 * worker的w_base還未賦值;
 */
void Listener::AddListenEvent()
{
    //echo先從客戶端讀取資料,故此處監聽讀
    listen_event = event_new(listen_worker->w_base, listen_sockfd, EV_READ | EV_PERSIST, Listener::ListenEventCallback, this);
    event_add(listen_event, NULL);
}

void Listener::ListenEventCallback(evutil_socket_t sockfd, short event, void *arg)
{
    evutil_socket_t con_fd;
    struct sockaddr_in con_addr;
    socklen_t addr_len = sizeof(con_addr);
    if (-1 == (con_fd = accept(sockfd, (struct sockaddr *)&con_addr, &addr_len))) {
        return ;
    }

    Listener *listener = (Listener *)arg;
    Connection *con = new Connection();

    con->con_sockfd = con_fd;

    pid_t pid = getpid();
    std::cout << "listen accept: " << con->con_sockfd << " by process " << pid << std::endl;
    
    if (!con->InitConnection(listener->listen_worker))
    {
        Connection::FreeConnection(con);
        return ;
    }

    con->con_worker->con_map[con->con_sockfd] = con;
    ++listener->cnt_connection;
}

接下來看看Connection:

一個Connection例項即代表一個連線,它維護從listener的回撥函式那裡accept得到的套接字,並在該套接字上監聽讀寫事件,進行request和response。

/*************************************************************************
    > File Name: connection.h
    > Author: Jiange
    > Mail: [email protected] 
    > Created Time: 2016年01月27日 星期三 20時10分35秒
 ************************************************************************/
#ifndef _CONNECTION_H
#define _CONNECTION_H

#include <string>
#include <queue>

#include "event2/event.h"
#include "event2/util.h"

#include "util.h"

class Worker;

class Connection
{
    public:
        Connection();
        ~Connection();

        bool InitConnection(Worker *worker);

        static void ConEventCallback(evutil_socket_t fd, short event, void *arg);

        Worker *con_worker;

        evutil_socket_t con_sockfd;
        struct event *read_event;
        struct event *write_event;

        std::string con_inbuf;
        std::string con_intmp;
        std::string con_outbuf;

        static void FreeConnection(Connection *con);

    private:
        void WantRead();
        void NotWantRead();
        void WantWrite();
        void NotWantWrite();
};

#endif

這裡兩個event分別負責讀寫事件,比一個event效率高一些。

在回撥函式的設計中,本來打算使用兩個回撥函式:一個處理讀,一個處理寫。

不過其實可以合併到同一個回撥函式裡,所以還是在一個函式中處理,並增加4個函式,來進行監聽事件的切換,這樣做更有利於後面狀態機的拓展:

void Connection::WantRead()
{
    event_add(read_event, NULL); 
}

void Connection::NotWantRead()
{
    event_del(read_event);
}

void Connection::WantWrite()
{
    event_add(write_event, NULL);
}

void Connection::NotWantWrite() 
{
    event_del(write_event);
}

除以上函式外其他部分的具體實現:

/*************************************************************************
    > File Name: connection.cpp
    > Author: Jiange
    > Mail: [email protected] 
    > Created Time: 2016年01月28日 星期四 12時06分22秒
 ************************************************************************/

#include "connection.h"
#include "worker.h"

#include<iostream>

Connection::Connection()
{
    con_worker = NULL;

    read_event = NULL;
    write_event= NULL;
}

Connection::~Connection()
{
    if (read_event && write_event)
    {
        event_free(read_event);
        event_free(write_event);
        std::cout << con_sockfd << " closed" << std::endl;
        close(con_sockfd);
    }
}
/* 刪除worker中相應的con,並釋放該con */
void Connection::FreeConnection(Connection *con)
{
    Worker *worker = con->con_worker;

    if (con->read_event && con->write_event)
    {
        Worker::ConnectionMap::iterator con_iter = worker->con_map.find(con->con_sockfd);
        worker->con_map.erase(con_iter);
    }

    delete con;
}
bool Connection::InitConnection(Worker *worker)
{
    con_worker = worker;

    try
    {   
        //這裡不能開太大,會爆記憶體!
        //後期可能需要在記憶體的使用上進行優化~
        con_intmp.reserve(10 * 1024);
        con_inbuf.reserve(10 * 1024);
        con_outbuf.reserve(10 * 1024);

        evutil_make_socket_nonblocking(con_sockfd);
        //test:監聽讀事件,從客戶端讀,然後回顯
        read_event = event_new(con_worker->w_base, con_sockfd, EV_PERSIST | EV_READ, Connection::ConEventCallback, this);
        write_event = event_new(con_worker->w_base, con_sockfd, EV_PERSIST | EV_WRITE, Connection::ConEventCallback, this);
    }
    catch(std::bad_alloc)
    {
        std::cout << "InitConnection():bad_alloc" <<std::endl;
    }
    WantRead();

    return true;
}

/* 迴圈讀寫
 * 注意,在讀的時候,此處ret為0時,可能是空字串之類的
 * 所以在這裡暫不做處理
 */
void Connection::ConEventCallback(evutil_socket_t sockfd, short event, void *arg)
{

    Connection *con = (Connection*)arg;

    if (event & EV_READ) 
    {
        int cap = con->con_intmp.capacity();
        int ret = read(sockfd, &con->con_intmp[0], cap);

        if (ret == -1)
        {
            if (errno != EAGAIN && errno != EINTR)
            {
                FreeConnection(con);
                return;
            }
        }
        else if (ret == 0)
        {
            FreeConnection(con); 
            return;
        }
        else
        {
            con->con_inbuf.clear();
            con->con_inbuf.append(con->con_intmp.c_str(), ret);
        }
        con->con_outbuf = con->con_inbuf;
        con->NotWantRead();
        con->WantWrite();
    }

    if (event & EV_WRITE)
    {
        int ret = write(sockfd, con->con_outbuf.c_str(), con->con_outbuf.size());

        if (ret == -1)
        {
            if (errno != EAGAIN && errno != EINTR)
            {
                FreeConnection(con);
                return;
            }
        }
        con->NotWantWrite();
        con->WantRead();
    }   
}

介紹完listener和connection之後,我們需要相應地調整master和worker的程式碼:

1.修改兩者的建構函式:

Master::Master(const std::string &ip, unsigned short port)
    :worker(ip, port)
{
    //……
}

Worker::Worker(const std::string &ip, unsigned short port)
        :listener(ip, port)
{
    //……
}

2.在master中開始建立監聽套接字:

bool Master::StartMaster()
{
    std::cout << "Start Master" << std::endl;

    if (!worker.listener.InitListener(&worker))
    {
        return false;
    }
    //……
}

 3.在worker中增加listener和connection map成員:

class Master;
class Connection;

class Worker
{
    public:
        typedef std::map<evutil_socket_t, Connection*> ConnectionMap;
        //……
        Listener            listener;
        ConnectionMap       con_map;
};

4.worker解構函式釋放持有的連線:

Worker::~Worker()
{
    //……

    if (w_base)
    {
        ConnectionMap::iterator con_iter = con_map.begin();
        while (con_iter != con_map.end())
        {
            Connection *con = con_iter->second;
            delete con;
            ++con_iter;
        }
        event_base_free(w_base);
    }
}

5.worker增加監聽event事件:

void Worker::Run()
{
    w_base = event_base_new();
    listener.AddListenEvent();
    //……
    return;
}

6.修改main函式中的master構造;

最後,附上一個使用到的標頭檔案:

/*************************************************************************
    > File Name: util.h
    > Author: Jiange
    > Mail: [email protected] 
    > Created Time: 2016年01月28日 星期四 10時39分22秒
 ************************************************************************/

#ifndef _UTIL_H
#define _UTIL_H

#include <signal.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <stdint.h>

#endif

至此,我們簡單的echo伺服器就大功告成啦~

下面給出makefile:

THIRD_LIBS=-levent
LIBS=-ldl
CFLAGS=-I./include

master:src/master.o src/worker.o src/listener.o src/connection.o src/main.o
        g++ -g -o [email protected] src/master.o src/worker.o src/listener.o src/connection.o src/main.o $(THIRD_LIBS) $(LIBS)

src/master.o:src/master.cpp include/master.h
        g++ -g -o [email protected] -c $< $(CFLAGS)

src/worker.o:src/worker.cpp include/worker.h include/util.h
        g++ -g -o [email protected] -c $< $(CFLAGS)

src/listener.o:src/listener.cpp include/listener.h include/util.h
        g++ -g -o [email protected] -c $< $(CFLAGS)

src/connection.o:src/connection.cpp include/connection.h include/util.h
        g++ -g -o [email protected] -c $< $(CFLAGS)

src/main.o:src/main.cpp include/master.h
        g++ -g -o [email protected] -c $< $(CFLAGS)

clean:
        rm -f src/*.o master

我用python寫了個用於測試的客戶端:

#python2.7.6
#coding=utf-8

import socket

if __name__ == "__main__":
    sockfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sockfd.connect(('localhost', 8000))
    message = ""
    while 1:
        message = raw_input("Please input:")
        sockfd.send(message)
        message = sockfd.recv(8000)
        print message
    sockfd.close()

另外一個用於測試的:

#python2.7.6
#coding=utf-8

import socket
import time
import threading

http_request = "POST /test_server HTTP/1.1\r\nHost:test.py\r\nContent-Length:5\r\n\r\nHello"

def make_a_request():
    sockfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sockfd.connect(('localhost', 8000))
    sockfd.sendall(http_request)
    sockfd.recv(8000)
    sockfd.close()

if __name__ == "__main__":
    thread_list = []

    start_time = time.time()

    for i in range(0, 1000):
        thread = threading.Thread(target = make_a_request)
        thread_list.append(thread)
        thread.start()

    for thread in thread_list:
        thread.join()

    print "Time used for 1000 request: ", time.time() - start_time

另外,由於是多程序,所以需要測試一下併發下的運轉情況~ 
(可以使用webbench~)

本程式在github的原始碼

接下來,我們將引入狀態機機制,並開始進行一些http的請求與處理!