1. 程式人生 > >Linux簡單高併發模型——Epoll + 執行緒池

Linux簡單高併發模型——Epoll + 執行緒池

版權宣告:本文為博主原創文章,遵循 CC 4.0 by-sa 版權協議,轉載請附上原文出處連結和本宣告。
本文連結:https://blog.csdn.net/qq_25425023/article/details/70199133
首先是一個locker.h的檔案,封裝了訊號量、互斥量、條件變數。

線上程池中的任務佇列需要互斥量的保護,當任務佇列中有任務到達時,需要喚醒一個等待pthread_cond_wait()的執行緒,執行緒池停止時,需要喚醒所以的執行緒,呼叫的是pthread_cond_broadcast()。

locker.h檔案:

#ifndef _LOCKER_H_
#define _LOCKER_H_
 
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
 
/*訊號量的類*/
class sem_locker
{
private:
    sem_t m_sem;
 
public:
    //初始化訊號量
    sem_locker()
    {
    if(sem_init(&m_sem, 0, 0) != 0)
        printf("sem init error\n");
    }
    //銷燬訊號量
    ~sem_locker()
    {
    sem_destroy(&m_sem);
    }
 
    //等待訊號量
    bool wait()
    {
    return sem_wait(&m_sem) == 0;
    }
    //新增訊號量
    bool add()
    {
    return sem_post(&m_sem) == 0;
    }
};
 
 
/*互斥 locker*/
class mutex_locker
{
private:
    pthread_mutex_t m_mutex;
 
public:
    mutex_locker()
    {
        if(pthread_mutex_init(&m_mutex, NULL) != 0)
        printf("mutex init error!");
    }
    ~mutex_locker()
    {
    pthread_mutex_destroy(&m_mutex);
    }
 
    bool mutex_lock()  //lock mutex
    {
    return pthread_mutex_lock(&m_mutex) == 0;
    }
    bool mutex_unlock()   //unlock
    {
    return pthread_mutex_unlock(&m_mutex) == 0;
    }
};
 
/*條件變數 locker*/
class cond_locker
{
private:
    pthread_mutex_t m_mutex;
    pthread_cond_t m_cond;
 
public:
    // 初始化 m_mutex and m_cond
    cond_locker()
    {
    if(pthread_mutex_init(&m_mutex, NULL) != 0)
        printf("mutex init error");
    if(pthread_cond_init(&m_cond, NULL) != 0)
    {   //條件變數初始化是被,釋放初始化成功的mutex
        pthread_mutex_destroy(&m_mutex);
        printf("cond init error");
    }
    }
    // destroy mutex and cond
    ~cond_locker()
    {
    pthread_mutex_destroy(&m_mutex);
    pthread_cond_destroy(&m_cond);
    }
    //等待條件變數
    bool wait()
    {
    int ans = 0;
    pthread_mutex_lock(&m_mutex);
    ans = pthread_cond_wait(&m_cond, &m_mutex);
    pthread_mutex_unlock(&m_mutex);
    return ans == 0;
    }
    //喚醒等待條件變數的執行緒
    bool signal()
    {
    return pthread_cond_signal(&m_cond) == 0;
    }
 
    //喚醒all等待條件變數的執行緒
    bool broadcast()
    {
            return pthread_cond_broadcast(&m_cond) == 0;
    }
};
 
#endif


thread_pool.h檔案。

建立threadnum個執行緒,並呼叫pthread_detach()分離執行緒,執行緒結束,自動回收資源。(前面的一篇部落格的執行緒池有bug,不完整,執行緒池退出時,不能讓所有的執行緒正常退出)

#ifndef _PTHREAD_POOL_
#define _PTHREAD_POOL_
 
#include "locker.h"
#include <queue>
#include <stdio.h>
#include <exception>
#include <errno.h>
#include <pthread.h>
#include <iostream>
 
template<class T>
class threadpool
{
private:
    int thread_number;  //執行緒池的執行緒數
    //int max_task_number;  //任務佇列中的最大任務數
    pthread_t *all_threads;   //執行緒陣列
    std::queue<T *> task_queue; //任務佇列
    mutex_locker queue_mutex_locker;  //互斥鎖
    //sem_locker queue_sem_locker;   //訊號量
    cond_locker queue_cond_locker; //cond
    bool is_stop; //是否結束執行緒
public:
    threadpool(int thread_num = 20);
    ~threadpool();
    bool append_task(T *task);  //新增任務
    void start();              //執行緒池開啟
    void stop();               //執行緒池關閉
private:
    //執行緒執行的函式。執行run()函式
    static void *worker(void *arg);
    void run();
    T *getTask();   //獲取任務
};
 
template <class T>
threadpool<T>::threadpool(int thread_num):
    thread_number(thread_num),is_stop(false), all_threads(NULL)
{       //建構函式
    if(thread_num <= 0)
    printf("threadpool can't init because thread_number = 0");
 
    all_threads = new pthread_t[thread_number];
    if(all_threads == NULL)
        printf("can't init threadpool because thread array can't new");
}
 
template <class T>
threadpool<T>::~threadpool()
{
    delete []all_threads;
    stop();
}
 
template <class T>
void threadpool<T>::stop() //執行緒池停止
{
        is_stop = true;
        //queue_sem_locker.add();
        queue_cond_locker.broadcast();
}
 
template <class T>
void threadpool<T>::start()  //執行緒池啟動
{
    for(int i = 0; i < thread_number; ++i)
    {
    //printf("create the %dth pthread\n", i);
    if(pthread_create(all_threads + i, NULL, worker, this) != 0)
    {//建立執行緒失敗,清除成功申請的資源並丟擲異常
        delete []all_threads;
        throw std::exception();
    }
    if(pthread_detach(all_threads[i]))
    {//將執行緒設定為脫離執行緒,失敗則清除成功申請的資源並丟擲異常
        delete []all_threads;
        throw std::exception();
    }
    }
}
//新增任務進入任務佇列
template <class T>
bool threadpool<T>::append_task(T *task)   //新增任務
{   //獲取互斥鎖
    queue_mutex_locker.mutex_lock();
    
    bool is_signal = task_queue.empty();
    //新增進入佇列
    task_queue.push(task);
    queue_mutex_locker.mutex_unlock();
    //喚醒等待任務的執行緒
    if(is_signal)
    {
            queue_cond_locker.signal();
    }
    return true;
}
 
template <class T>
void *threadpool<T>::worker(void *arg)  //執行緒工作函式
{
    threadpool *pool = (threadpool *)arg;
    pool->run();
    return pool;
}
 
template <class T>
T* threadpool<T>::getTask()   //從任務佇列中獲取任務
{
    T *task = NULL;
    queue_mutex_locker.mutex_lock();
    if(!task_queue.empty())
    {
        task = task_queue.front();
        task_queue.pop();
    }
    queue_mutex_locker.mutex_unlock();
    return task;
}
 
template <class T>
void threadpool<T>::run()
{
    while(!is_stop){
        T *task = getTask();
        if(task == NULL)  //佇列為空,等待
                queue_cond_locker.wait();
        else              //執行任務
                task->doit();
    }
    //for test
    //printf("exit%d\n", (unsigned long)pthread_self());
}
 
#endif
 
 

封裝了epoll。
EpollServer.h中的BaseTask.h和Task.h應該放在另外一個檔案中的。這裡圖個方便,哈哈。

#ifndef _EPOLL_SERVER_H_
#define _EPOLL_SERVER_H_
 
#include <sys/socket.h>
#include <sys/types.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <sys/epoll.h>
//#include <pthread.h>
 
#include "thread_pool.h"
 
#define MAX_EVENT 1024   //epoll_events的最大個數
#define MAX_BUFFER 2048  //Buffer的最大位元組
 
class BaseTask
{
public:
    virtual void doit() = 0;
};
 
class Task : public BaseTask
{
private:
    int sockfd;
    char order[MAX_BUFFER];
public:
    Task(char *str, int fd) : sockfd(fd)
    {
        memset(order, '\0', MAX_BUFFER);
        strcpy(order, str);
    }
    void doit()  //任務的執行函式
    {
        //do something of the order
        //printf("%s\n", order);
        snprintf(order, MAX_BUFFER - 1, "somedata\n");
        write(sockfd, order, strlen(order));
    }
};
 
class EpollServer
{
private:
    bool is_stop;   //是否停止epoll_wait的標誌
    int threadnum;   //執行緒數目
    int sockfd;     //監聽的fd
    int port;      //埠
    int epollfd;    //Epoll的fd
    threadpool<BaseTask> *pool;   //執行緒池的指標
    //char address[20];
    epoll_event events[MAX_EVENT];  //epoll的events陣列
    struct sockaddr_in bindAddr;   //繫結的sockaddr
 
public://建構函式
    EpollServer()
    {}
    EpollServer(int ports, int thread) : is_stop(false) , threadnum(thread) ,
        port(ports), pool(NULL)
    {
    }
    ~EpollServer()  //析構
    {
        delete pool;
    }
 
    void init();
 
    void epoll();
 
    static int setnonblocking(int fd)  //將fd設定稱非阻塞
    {
        int old_option = fcntl(fd, F_GETFL);
        int new_option = old_option | O_NONBLOCK;
        fcntl(fd, F_SETFL, new_option);
        return old_option;
    }
 
    static void addfd(int epollfd, int sockfd, bool oneshot)  //向Epoll中新增fd
    {//oneshot表示是否設定稱同一時刻,只能有一個執行緒訪問fd,資料的讀取都在主執行緒中,所以呼叫都設定成false
        epoll_event event;
        event.data.fd = sockfd;
        event.events = EPOLLIN | EPOLLET;
        if(oneshot)
        {
            event.events |= EPOLLONESHOT;
        }
        epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event); //新增fd
        EpollServer::setnonblocking(sockfd);
    }
 
};
 
void EpollServer::init()   //EpollServer的初始化
{
    bzero(&bindAddr, sizeof(bindAddr));
    bindAddr.sin_family = AF_INET;
    bindAddr.sin_port = htons(port);
    bindAddr.sin_addr.s_addr = htonl(INADDR_ANY);
        //建立Socket
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd < 0)
    {
        printf("EpollServer socket init error\n");
        return;
    }
    int ret = bind(sockfd, (struct sockaddr *)&bindAddr, sizeof(bindAddr));
    if(ret < 0)
    {
        printf("EpollServer bind init error\n");
        return;
    }
    ret = listen(sockfd, 10);
    if(ret < 0)
    {
        printf("EpollServer listen init error\n");
        return;
    }
        //create Epoll
    epollfd = epoll_create(1024);
    if(epollfd < 0)
    {
        printf("EpollServer epoll_create init error\n");
        return;
    }
    pool = new threadpool<BaseTask>(threadnum);  //建立執行緒池
}
 
void EpollServer::epoll()
{
    pool->start();   //執行緒池啟動
    //
    addfd(epollfd, sockfd, false);
    while(!is_stop)
    {//呼叫epoll_wait
        int ret = epoll_wait(epollfd, events, MAX_EVENT, -1);
        if(ret < 0)  //出錯處理
        {
            printf("epoll_wait error\n");
            break;
        }
        for(int i = 0; i < ret; ++i)
        {
            int fd = events[i].data.fd;
            if(fd == sockfd)  //新的連線到來
            {
                struct sockaddr_in clientAddr;
                socklen_t len = sizeof(clientAddr);
                int confd = accept(sockfd, (struct sockaddr *)
                    &clientAddr, &len);
 
                EpollServer::addfd(epollfd, confd, false);
            }
            else if(events[i].events & EPOLLIN)  //某個fd上有資料可讀
            {
                char buffer[MAX_BUFFER];
        readagain:    memset(buffer, 0, sizeof(buffer));
                int ret = read(fd, buffer, MAX_BUFFER - 1);
                if(ret == 0)  //某個fd關閉了連線,從Epoll中刪除並關閉fd
                {
                    struct epoll_event ev;
                    ev.events = EPOLLIN;
                    ev.data.fd = fd;
                    epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
                    shutdown(fd, SHUT_RDWR);
                    printf("%d logout\n", fd);
                    continue;
                }
                else if(ret < 0)//讀取出錯,嘗試再次讀取
                {
                    if(errno == EAGAIN)    
                    {
                        printf("read error! read again\n");
                        goto readagain;
                            break;
                    }
                }
                else//成功讀取,向執行緒池中新增任務
                {
                    BaseTask *task = new Task(buffer, fd);
                    pool->append_task(task);
                }
            }
            else
            {
                printf("something else had happened\n");
            }
        }
    }
    close(sockfd);//結束。
 
    pool->stop();
}
 
#endif


接下來是簡單的Demo的測試。

#include "EpollServer.h"
 
int main(int argc, char const *argv[])
{
    if(argc != 3)
    {
        printf("usage %s port threadnum\n", argv[0]);
        return -1;
    }
    int port = atoi(argv[1]);
    if(port == 0)
    {
        printf("port must be Integer\n");
        return -1;
    }
    int threadnum = atoi(argv[2]);
    if(port == 0)
    {
        printf("threadnum must be Integer\n");
        return -1;
    }
    EpollServer *epoll = new EpollServer(port, threadnum);
 
    epoll->init();
 
    epoll->epoll();
    return 0;
}


程式碼在Ubuntu中編譯通過。下次再來更新能夠支援併發量的多少。

-------------------------------------------------------------------------------------------------


 ———————————————— 
版權宣告:本文為CSDN博主「XD灬」的原創文章,遵循CC 4.0 by-sa版權協議,轉載請附上原文出處連結及本宣告。
原文連結:https://blog.csdn.net/qq_25425023/art