1. 程式人生 > >Linux下的TCP/IP程式設計----執行緒及多執行緒服務端

Linux下的TCP/IP程式設計----執行緒及多執行緒服務端

之前有講過程序及多程序服務端的實現,現在我們來看看更為廣泛而且實用的執行緒及多執行緒服務端的實現。

那麼什麼是執行緒呢?

執行緒是作業系統能夠進行運算排程的最小單位,它被包涵在程序之中,是行程中的實際運作單位。一條執行緒指的是程序中一個單一順序的控制流,一個程序中可以並行多個執行緒,每條執行緒並行執行不同的任務。

這是比較正式的解釋,簡單點來說,執行緒就是程序的更進一步的細化。

由於程序代表的是一個正在執行的程式,所以程序的切換其實也就是程式的切換,而在切換過程中要移除當前執行程序在記憶體中的資料,移入將要執行的程序所需要的資料,所以程序的切換會消耗很多的系統資源,尤其是請求量大時,cpu必須來回切換不同的程序。而執行緒的出現解決了這一問題,執行緒是比程序更小的執行單元,一個程序中可以包含若干個執行緒,每一個執行緒有一個任務執行的順序。這樣,執行緒本身並不保有資源,而是在需要時向所在的程序進行申請,以滿足執行需要。這樣我們就可以用執行緒來實現多客戶端的訪問,每個訪問都放到一個執行緒中去執行。

執行緒的建立和銷燬:

int pthread_create(pthread_t * restrict thread , const pthread_attr_t * restrict attr, void * (* struct_routine)(void ) , void restrict arg):建立一個新執行緒

  • thread(執行緒):儲存新建立的執行緒ID的變數地址,用於區分
  • attr(引數):用於建立執行緒時指定一些引數,傳入null時使用預設引數
  • start_routine(函式指標):用於儲存新執行緒所執行的函式的地址,相當於新執行緒中main()函式的地址
  • arg(變數指標):第三個引數中函式所傳入的引數的地址值,即新執行緒中main()函式中引數的地址值

成功時返回0,失敗時返回其他值

int pthread_detach(pthread_t thread):執行緒銷燬

  • thread(執行緒ID):要銷燬的執行緒id

成功時返回0,失敗時返回其他值

int pthread_join(pthread_t thread, void ** status):執行緒聯合

  • thread(執行緒):該引數值id的執行緒終止時才會從該函式返回
  • status(結果):用於儲存執行緒main()函式返回值的地址

成功時返回0,失敗時返回其他值

pthread_join和pthread_detach的區別:

呼叫pthread_join函式的執行緒會進入等待狀態,直到指定id的執行緒結束,而且可以得到結束執行緒的返回值
而呼叫pthread_detach函式不會使得呼叫該函式的執行緒進入等待狀態,起作用也是可以釋放已經執行結束的執行緒所佔用的記憶體空間

執行緒同步:

由於執行緒所呼叫的資源是同一個程序內的資源,所以當執行緒數較多時可能會發生執行緒對資源的爭奪,若是不對其進行規則的約束,則有可能導致程式出錯,產生嚴重後果。例如:兩個執行緒同時對一個變數X進行++的運算,當兩個執行緒讀取到初始變數後,若執行緒A先執行,並已經給X執行了++,但是結果尚未寫回X中,此時執行緒B立即搶佔上了cpu,並且給X執行++,而且結果寫回X中,最後A又把結果寫回X中。在這個各流程中,執行緒A和B其實發生來資料的覆蓋,導致本應該+2的變數最後只+1,這就是在使用執行緒時會遇到的問題,而這也是執行緒同步的來源。而在程式中,容易發生執行緒同步問題的語句稱為臨界區。

解決執行緒同步常見的是兩種方式:互斥量和訊號量

互斥量:如名字所示,互斥,即不允許多個執行緒同時對臨界區進行訪問,當一個執行緒在訪問時,另一個必須要進行等待。只有當另一個執行緒釋放互斥量時,等待中的執行緒才能訪問資源。

訊號量:可以用來保證兩個或多個關鍵程式碼段不被併發呼叫。在進入一個關鍵程式碼段之前,執行緒必須獲取一個訊號量;一旦該關鍵程式碼段完成了,那麼該執行緒必須釋放訊號量。其它想進入該關鍵程式碼段的執行緒必須等待直到第一個執行緒釋放訊號量。

我們可以通過這兩種機制來很好的保證執行緒執行的同步。

int pthread_mutex_init(pthread_mutex_t * mutex , const pthread_mutexattr_t * attr):初始化互斥量

  • mutex(型號量):建立互斥訊號量時傳遞儲存互斥量的變數地址。
  • attr(互斥量屬性):傳入null時建立預設的互斥量

成功時返回0,失敗時返回其他值

int pthread_mutex_destroy(pthread_mutex_t * mutex)銷燬互斥量

  • mutex(互斥量):要銷燬的互斥量

成功時返回0,失敗時返回其他值

int pthread_mutex_lock(pthread_mutex_t * mutex):鎖定互斥量

  • mutex(訊號量):要操作的互斥量

成功時返回0,失敗時返回其他值

int pthread_mutex_unlock(pthread_mutex_t * mutex):解鎖互斥量

  • mutex(訊號量):要操作的互斥量

成功時返回0,失敗時返回其他值

例如:

pthread_mutex_lock(&mutex);
//臨界區開始
......
.....
..
//臨界區結束
pthread_mutex_unlock(&mutex);

int sem_init(sem_t * sem , int pthread , unsigned int value):初始化訊號量

  • sem(訊號量):建立訊號量時儲存訊號量的變數地址值
  • pthread(控制引數):若傳入0,則只能在一個程序中使用,若是其他值,則可以由多個訊號量共享
  • value(初始值):指定建立的訊號量的初始值

成功時返回0,失敗時返回其他值

int sem_destroy(sem_t * sem):銷燬訊號量

  • sem(訊號量):要銷燬的訊號量

成功時返回0,失敗時返回其他值

int sem_post(sem_t * sem)釋放訊號量

  • sem(訊號量):要釋放的訊號量

成功時返回0,失敗時返回其他值

int sem_wait(sem_t * sem)申請訊號量

  • sem(訊號量):要申請的訊號量

成功時返回0,失敗時返回其他值

例如:

sem_wait(&sem)//訊號量減為0
//臨界區域開始
........
.......
....
...
//臨界區域結束
sem_post(&sem)//訊號量增1

在瞭解了執行緒以及執行緒使用中需要注意的問題之後可以改寫一下服務端的程式,使其通過多執行緒的方式來實現併發訪問。

chat_setver.c

#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<semaphore.h>
#include<unistd.h>
#include<arpa/inet.h>
#include<sys/socket.h>

#define BUFF_SIZE 100
#define MAX_CLNT 256

//客戶端處理函式
void *handle_clent(void *arg);
//訊息傳送函式
void send_message(char * msg,int len);
//出錯處理函式
void error_handling(char *msg);

//定義全域性的客戶端socket變數
int clnt_cnt=0;
//定義全域性的socket陣列,儲存客戶端套接字
int clnt_socks[MAX_CLNT];
//定義的全域性的互斥資訊號量
pthread_mutex_t  mutx;

int main(int argc,char *argv[]){
    int server_sock,client_sock;

    struct sockaddr_in server_addr;
    struct sockaddr_in client_addr;

    int client_addr_size;
    //用於記錄執行緒id
    pthread_t t_id;
    if(argc!=2){
        printf("Usage : %s <port>\n",argv[0]);
        exit(1);
    }
    //初始化一個互斥訊號量
    pthread_mutex_init(&mutx,NULL);

    server_sock = socket(PF_INET,SOCK_STREAM,0);

    memset(&server_addr,0,sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(atoi(argv[1]));

    if(bind(server_sock,(struct sockaddr *)&server_addr,sizeof(server_addr)) == -1){
        error_handling("bind() error");
    }

    if(listen(server_sock,5) == -1){
        error_handling("listen() error");
    }

    while(1){
        client_addr_size = sizeof(client_addr);
        client_sock = accept(server_sock,(struct sockaddr *)&client_addr,&client_addr_size);

        //上鎖
        pthread_mutex_lock(&mutx);
        //將接收到的客戶端socket儲存到全域性變數中
        clnt_socks[clnt_cnt++] = client_sock;
        //建立一個新的執行緒,用於處理接收到的客戶端
        pthread_create(&t_id,NULL,handle_clent,(void *)&client_sock);
        //處理執行緒的結束
        pthread_detach(t_id);
        printf("Connected client IP: %s \n",inet_ntoa(client_addr.sin_addr));
    }

    close(server_sock);
return 0;
}

/**
用於處理接收到的客戶端
**/
void *handle_clent(void *arg){
    //將引數強轉成int型
    int clnt_sock = *((int *)arg);

    int str_len = 0 ,i;
    char msg[BUFF_SIZE];

    while((str_len = read(clnt_sock,msg,sizeof(msg))) != 0){
        send_message(msg,str_len);
    }
    //上鎖,修改全域性的socket陣列,去除當前的socket客戶端
    pthread_mutex_lock(&mutx);
    for(i=0;i<clnt_cnt;i++){
        if(clnt_sock == clnt_socks[i]){
            while(i++<clnt_cnt-1){
                clnt_socks[i] = clnt_socks[i+1];
            }
            break;
        }
    }
    //客戶端數量修改
    clnt_cnt--;
    pthread_mutex_unlock(&mutx);
    close(clnt_sock);
    return NULL;
    }

/**
    用於訊息處理,給所有已連線的客戶端傳送訊息
**/
    void send_message(char *msg,int len){
        int i;
        //上鎖,使用全域性的clnt_socks變數來輸出資料
        pthread_mutex_lock(&mutx);
        //遍歷客戶端陣列
        for(i = 0;i<clnt_cnt;i++){
            write(clnt_socks[i],msg,len);
        }
        //解鎖
        pthread_mutex_unlock(&mutx);
    }

void error_handling(char * message){
    fputs(message,stderr);
    fputc('\n',stderr);
    exit(1);
}

chat_client.c

#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<semaphore.h>
#include<unistd.h>
#include<arpa/inet.h>
#include<sys/socket.h>

#define BUFF_SIZE 100
#define NAME_SIZE 20


void * send_msg(void *arg);
void * recv_msg(void *arg);
void  error_handling(char *msg);

char name[NAME_SIZE]="[DEFAULT]";
char msg[BUFF_SIZE];

int main(int argc ,char * argv[]){
    int sock;
    struct sockaddr_in server_addr;
    pthread_t send_thread;
    pthread_t recv_thread;
    void *thread_return;

    if(argc != 4){
        printf("Usage : %s <port> <name>\n",argv[0]);
        exit(1);
    }

    sprintf(name,"[%s]",argv[3]);
    sock = socket(PF_INET,SOCK_STREAM,0);

    memset(&server_addr,0,sizeof(server_addr));
    server_addr.sin_family=AF_INET;
    server_addr.sin_addr.s_addr = inet_addr(argv[1]);
    server_addr.sin_port = htons(atoi(argv[2]));

    if(connect(sock,(struct sockaddr *)&server_addr,sizeof(server_addr))  == -1){
        error_handling("connect() error");
    }

    pthread_create(&send_thread,NULL,send_msg,(void *)&sock);
    pthread_create(&recv_thread,NULL,recv_msg,(void *)&sock);
    pthread_join(send_thread,&thread_return);
    pthread_join(recv_thread,&thread_return);

    close(sock);
return 0;
}

/**
用於傳送訊息的執行緒
**/
void * send_msg(void *arg){
    int sock=*(((int *)arg));
    char name_msg[NAME_SIZE+BUFF_SIZE];

    while(1){
        fgets(msg,BUFF_SIZE,stdin);
        if(!strcmp(msg,"q\n") || !strcmp(msg,"Q\n")){
            close(sock);
            exit(0);
        }
        sprintf(name_msg," %s %s",name,msg);
        write(sock,name_msg,strlen(name_msg));
    }
    return NULL;
}

/**
    用於接收資料的執行緒
**/
void * recv_msg(void *arg){
    int sock=*(((int *)arg));
    char name_msg [NAME_SIZE+BUFF_SIZE];
    int str_len;

    while(1){
        str_len=read(sock,name_msg,NAME_SIZE+BUFF_SIZE-1);
        if(str_len == -1){
            return (void *) -1;
        }
        name_msg[str_len] = 0;
        fputs(name_msg,stdout);
    }
    return NULL;

}

void error_handling(char * message){
    fputs(message,stderr);
    fputc('\n',stderr);
    exit(1);
}

最後編譯執行:

編譯客戶端:gcc chat_client.c -DREENTRANT -o chat_client -lpthread
編譯服務端: gcc chat_server.c -D_REENTRANT -o chat_server -lpthread

執行服務端:./chat_server 9090
執行客戶端:./chat_client 127.0.0.1 9090 wei

這樣就完成了一個簡易基於多執行緒的服務端/客戶端程式