1. 程式人生 > >典型C/S模式___執行緒池實現

典型C/S模式___執行緒池實現

一、多執行緒和執行緒池的區別

執行緒池 多執行緒
同時啟動若干個執行緒,持續存在,時刻準備處理請求 來一個請求啟動一個執行緒
響應時間 響應時間較長

二、執行緒池實現思路

  • 主執行緒生成客戶端套接字(資源);函式執行緒使用客戶端套接字(資源),沒有可用資源時,函式執行緒必須阻塞。
  • 將客戶端套接字放入佇列和從佇列中取出客戶端套接字這兩個動作必須一次全部完成,不能被打斷,不能同時發生。比如說:主執行緒正向佇列放入客戶端套接字而函式執行緒正在從佇列中取用客戶端套接字,不能同時發生;又或者好幾個函式執行緒都正從佇列中取客戶端套接字,不能同時發生,不然可能取得的套接字是同一個。 

通過空一個位置就可以實現不新增標記區分隊空和隊滿的情況: rear始終指向空閒的位置。

 2.1 伺服器端

#include<sys/socket.h> //socket()bind()listen()accept()recv()send()
#include<arpa/inet.h> //inet_addr()
#include<netinet/in.h> //htons()
#include<pthread.h> //pthread_create()
#include<assert.h> //assert()
#include<stdio.h> //printf()
#include<semaphore.h> //sem_init() sem_post() sem_wait()
#include<unistd.h> //close()
#include<string.h> //memset()
#define NUM 8    //佇列大小為8個元素

pthread_mutex_t mutex; //互斥鎖

typedef struct Queue
{
    int cliLink[NUM];
    int front;
    int rear;
}Queue;

Queue que; //存放客戶端套接字的佇列
sem_t sem;    //訊號量,用於記錄套接字資源數目

void initQue(); //初始化佇列que
int insert(int c); //將客戶端套接字放入佇列que中
int getCli(); //從佇列que中取出套接字
void *fun(void *arg); //執行緒回撥函式

int main()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    assert(-1 != sockfd);

    struct sockaddr_in ser,cli;
    memset(&ser, 0, sizeof(ser));
    ser.sin_family = AF_INET;
    ser.sin_port = htons(6000);
    ser.sin_addr.s_addr = inet_addr("127.0.0.1");
    
    int res = bind(sockfd, (struct sockaddr*)&ser, sizeof(ser));
    assert(-1 != res);

    listen(sockfd, 5); //維護5個連線請求

    //啟動三3個執行緒並且持續存在處理連線請求
    for(int i = 0; i < 3; ++i)
    {
	    pthread_t tid;
	    pthread_create(&tid, NULL, fun, 0); //使用預設attri 不帶引數
    }

    initQue(); //初始化佇列que
    sem_init(&sem, 0, 0); //執行緒級訊號量 初始值為0

    while(1)
    {
	    int len = sizeof(cli);
	    int c = accept(sockfd, (struct sockaddr*)&cli, &len);
	    //生成客戶端套接字
	    if(c < 0)
	        continue;
	    if(insert(c) == -1) //佇列已滿 不能再容納更多的套接字
	    {
	        close(c);
	        continue;
	    }
	    sem_post(&sem); //v操作
    }

    close(sockfd); //關閉套接字
}

void initQue() //佇列初始化 不含任何元素
{
    for(int i = 0; i < NUM; ++i)
	que.cliLink[i] = -1;
    que.front = que.rear = 0;
}

//將客戶端套接字放入維護的佇列中 成功返回0 失敗返回-1
int insert(int c)
{
    int res;
    pthread_mutex_lock(&mutex);
    
    if(que.front != (que.rear+1)%NUM)
    {
	    que.cliLink[que.rear] = c;
	    que.rear = (que.rear+1) % NUM;
	    res = c;
    }else //佇列已滿
	{
        res = -1;
    }

    pthread_mutex_unlock(&mutex);
    return res == -1 ? -1 : 0; //佇列已滿 插入失敗,佇列未滿,插入成功
}

//從佇列中取用套接字描述符
int getCli()
{
    pthread_mutex_lock(&mutex);
    
    int c = que.cliLink[que.front];
    que.front = (que.front + 1) % NUM;

    pthread_mutex_unlock(&mutex);
    return c;
}

//回撥函式格式固定
void *fun(void* arg)
{
    while(1)
    {
	    sem_wait(&sem); //驚群現象
	    int c = getCli();
	    while(1)
	    {
            char buf[128] = {'\0'};
	        int n = recv(c, buf, 127, 0);
	    
	        if(n <= 0 || 0 == strcmp(buf, "end"))
	        {
		        printf("client is terminated!\n");
		        close(c);
		        break;
	        }

	        printf("%d %s\n", c, buf);
	        send(c, "OK", 2, 0);
	    }
    }
}

2.2 客戶端

#include<stdio.h> //printf()
#include<assert.h> //assert()
#include<sys/socket.h> //socket()connect()recv()send()
#include<netinet/in.h> //inet_addr()
#include<arpa/inet.h> //htons()
#include<string.h> //strlen()

int main()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    assert(-1 != sockfd);
    
    struct sockaddr_in ser;
    ser.sin_family = AF_INET;
    ser.sin_port = htons(6000);
    ser.sin_addr.s_addr = inet_addr("127.0.0.1");
    
    int res = connect(sockfd, (struct sockaddr*)&ser, sizeof(ser));
    assert(-1 != res);

    while(1)
    {
	    printf("please input: ");
	    fflush(stdout);
	    char buf[128] = {'\0'};
	    scanf("%s", buf);

	    send(sockfd, buf, strlen(buf), 0);
	
	    char s2[128] = {'\0'};
	    recv(sockfd, s2, 127, 0);
	    printf("%d: %s\n", sockfd, s2);
	    
	    if(0 == strcmp(buf, "end"))
	    {
            close(sockfd);
	        break;
	    }
    }
}

2.3 執行結果

執行結果說明:執行緒池中常備三個函式執行緒用於處理連線請求,當第四個客戶端的連線請求傳送給伺服器的時候,伺服器會接收的,因為通過listen()函式設定的維護的連線數目為5,但是沒有函式執行緒空閒下來處理新的客戶端套接字。任何一個客戶端結束,4號客戶端馬上就會有返回。

在3號客戶端結束後,4號客戶端立刻就有了返回。因為空閒的函式執行緒會一直去檢查佇列中是否存在套接字。這就是驚群現象。