1. 程式人生 > >基本套接字程式設計(6) -- 執行緒篇

基本套接字程式設計(6) -- 執行緒篇

1. 執行緒

傳統Unix模型中,當一個程序需要另一個實體來完成某事,它就fork一個子程序來處理。Unix上大多數網路伺服器程式便是以建立多個子程序的方式實現的:父程序accept一個連線,fork一個子程序,該子程序處理與該連線對端的客戶之間的通訊。 儘管,這種正規化多年來一直用的不錯,但是fork呼叫依然存在一些問題: (1)fork是昂貴的,fork要把父程序的記憶體影像複製到子程序,並在子程序中複製所有描述符,等等。當今的實現使用寫時複製技術,用於避免在子程序切實需要自己的副本之前把父程序的資料空間複製到子程序。然而,即使有這樣的優勢,fork仍然是昂貴的; (2)fork返回之後,父子程序之間資訊的傳遞需要程序間通訊(IPC)機制。呼叫fork之前父程序向尚未存在的子程序傳遞資訊相當容易,因為子程序將從父程序資料空間及所有描述符的一個副本開始程序,然而從子程序向父程序返回資訊缺非常吃力; 針對以上的問題,執行緒有助於解決,執行緒也稱為輕量級程序。

1.1 執行緒共享資訊

同一程序內的所有執行緒共享以下資訊:
  • 相同的全域性記憶體,包含全域性變數
  • 程序指令
  • 大多數資料
  • 開啟的檔案(即描述符)
  • 訊號處理函式和訊號處置
  • 當前工作目錄
  • 使用者ID和組ID

1.2 執行緒獨立資訊

  • 執行緒ID
  • 暫存器集合,包括程式計數器和棧指標
  • 棧(用於存放區域性變數和返回地址)
  • errno
  • 訊號掩碼
  • 優先順序
本文總結於《Unix網路程式設計 -- 卷一》第26章,講解的為POSIX執行緒也稱為pthread。

2. 基本執行緒函式

Pthreads API在ANSI/IEEE POSIX 1003.1 – 1995標準中定義。不像MPI,該標準不是免費的,必須向IEEE購買。  Pthreads API中的函式可以非正式的劃分為三大類: 
  • 執行緒管理(Thread management): 第一類函式直接用於執行緒:建立(creating),分離(detaching),連線(joining)等等。包含了用於設定和查詢執行緒屬性(可連線,排程屬性等)的函式。
  • 互斥量(Mutexes): 第二類函式是用於執行緒同步的,稱為互斥量(mutexes),是"mutual exclusion"的縮寫。Mutex函式提供了建立,銷燬,鎖定和解鎖互斥量的功能。同時還包括了一些用於設定或修改互斥量屬性的函式。 
  • 條件變數(Condition variables):第三類函式處理共享一個互斥量的執行緒間的通訊,基於程式設計師指定的條件。這類函式包括指定的條件變數的建立,銷燬,等待和受信(signal)。設定查詢條件變數屬性的函式也包含其中。 
命名約定:執行緒庫中的所有識別符號都以pthread開頭 

2.1 執行緒建立 -- pthread_create函式

最初,main函式包含了一個預設的執行緒。其它執行緒則需要程式設計師顯式地建立。 pthread_create 建立一個新執行緒並使之執行起來。該函式可以在程式的任何地方呼叫。 
#include <pthread.h>
int pthread_create(pthread_t *tid , const pthread_attr_t *attr , void *(*func)(void *) , void *arg);
<span style="white-space:pre">							</span>返回:若成功則為0 , 若出錯則為正的Exxx值
pthread_create引數: 
  • tid:返回一個不透明的,唯一的新執行緒識別符號。 
  • attr:不透明的執行緒屬性物件。可以指定一個執行緒屬性物件,或者NULL為預設值。 
  • func:執行緒將會執行一次的C函式。 
  • arg: 傳遞給func單個引數,傳遞時必須轉換成指向void的指標型別。沒有引數傳遞時,可設定為NULL。 
一個程序可以建立的執行緒最大數量取決於系統實現。 一旦建立,執行緒就稱為peers,可以建立其它執行緒。執行緒之間沒有指定的結構和依賴關係。 
注: Q:一個執行緒被建立後,怎麼知道作業系統何時排程該執行緒使之執行? 
A:除非使用了Pthreads的排程機制,否則執行緒何時何地被執行取決於作業系統的實現。強壯的程式應該不依賴於執行緒執行的順序。

2.2 執行緒匯合 -- pthread_join函式

我們可以通過呼叫pthread_join等待一個給定執行緒終止。對比執行緒和Unix程序,pthread_create類似於fork,pthread_join類似於waitpid。
#include <pthread.h>
int pthread_join(pthead_t *tid , void **status);
						返回:若成功則為0,若出錯則為正的Exxx值
我們必須指定要等待執行緒的tid,Pthread沒辦法等待任意一個執行緒(類似指定waitpid程序Id引數-1)。 如果status指標非空,來自所等待執行緒的返回值(一個指向某個物件的指標)將存入由status指向的位置。

2.3 執行緒分離 -- pthread_detach函式

一個執行緒是可匯合的(joinable預設值)或者是脫離的(detached)。當一個可匯合的執行緒終止時,它的執行緒ID和退出狀態將留存到另一個執行緒對它呼叫pthread_join。脫離的執行緒卻像守護程序,當它們終止時,所有相關資源都被釋放,我們不能等待它們終止。如果一個執行緒需要知道另一個執行緒什麼時候終止,那就最好保持第二個執行緒的可匯合狀態。 pthread_detach函式將指定的執行緒轉變為脫離狀態。
#include <pthread.h>
int pthread_detach(pthead_t *tid);
						返回:若成功則為0,若出錯則為正的Exxx值
如果一個執行緒想讓自身脫離,則呼叫:pthread_detach(pthread_self());

注:執行緒匯合與分離

(1)連線:  “連線”是一種線上程間完成同步的方法。 

  • pthread_join()函式阻賽呼叫執行緒知道threadid所指定的執行緒終止。 
  • 如果在目標執行緒中呼叫pthread_exit(),程式設計師可以在主執行緒中獲得目標執行緒的終止狀態。 
  • 連線執行緒只能用pthread_join()連線一次。若多次呼叫就會發生邏輯錯誤。 
  • 兩種同步方法,互斥量(mutexes)和條件變數(condition variables)。 
(2)可連線(Joinable or Not)? 
當一個執行緒被建立,它有一個屬性定義了它是可連線的(joinable)還是分離的(detached)。只有是可連線的執行緒才能被連線(joined),若果建立的執行緒是分離的,則不能連線。 POSIX標準的最終草案指定了執行緒必須建立成可連線的。然而,並非所有實現都遵循此約定。 
使用pthread_create()的attr引數可以顯式的建立可連線或分離的執行緒,典型四步如下: 
  • 宣告一個pthread_attr_t資料型別的執行緒屬性變數 
  • 用 pthread_attr_init()初始化改屬性變數 
  • 用pthread_attr_setdetachstate()設定可分離狀態屬性 
  • 完了後,用pthread_attr_destroy()釋放屬性所佔用的庫資源 
(3)分離(Detaching): 
  • pthread_detach()可以顯式用於分離執行緒,儘管建立時是可連線的。
  • 沒有與pthread_detach()功能相反的函式 。
(4)建議: 
  • 若執行緒需要連線,考慮建立時顯式設定為可連線的。因為並非所有建立執行緒的實現都是將執行緒建立為可連線的。 
  • 若事先知道執行緒從不需要連線,考慮建立執行緒時將其設定為可分離狀態。一些系統資源可能需要釋放。 

2.4 執行緒ID -- pthread_self函式

每一個執行緒都有一個所屬程序內標識自身的ID。執行緒ID由pthread_create返回,pthread_join使用它,而函式pthread_self則是獲取自身的執行緒ID。
#include <pthread.h>
int pthread_self(void);
<h2>						返回:呼叫執行緒的ID
2.5 執行緒退出 -- pthread_exit函式</h2>
讓一個執行緒終止的方法之一是呼叫pthread_exit函式:
#include <pthread.h>
void pthread_self(void *status);
						不返回到呼叫者
如果本執行緒未曾脫離,它的執行緒ID和退出狀態將一直留存到呼叫程序內的某個其它執行緒對它呼叫pthread_join。 指標status不能指向區域性於呼叫執行緒的物件,因為執行緒終止時這樣的物件也消失。 讓一個執行緒終止的另外兩個方法:
  • 啟動執行緒的函式(即pthread_create()的第三個引數)可以返回。既然該函式必須宣告成返回一個void指標,它的返回值就是相應執行緒的終止狀態。
  • 如果程序的main函式返回或者任何執行緒呼叫了exit,整個程序就終止,其中包括它的任何執行緒。

3. 多執行緒TCP客戶/伺服器聊天程式例項

上面例項介紹了利用多執行緒技術實現的TCP客戶/伺服器回射程式例項,下面介紹利用多執行緒技術實現TCP客戶端/伺服器聊天小程式。 該程式實現客戶端與伺服器端通訊,當任意一端傳送退出指令exit時,程式結束。

3.1 config.h

/*
 * config.h 包含該tcp/ip套接字程式設計所需要的基本標頭檔案,與server.c client.c位於同一目錄下
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <pthread.h>

const int MAX_LINE = 2048;
const int PORT = 6001;
const int BACKLOG = 10;
const int LISTENQ = 6666;
const int MAX_CONNECT = 20;

3.2 server.c

/*
*  伺服器端程式碼實現
*/

#include "config.h"

/*處理接收客戶端訊息函式*/
void *recv_message(void *fd)
{
	int sockfd = *(int *)fd;
	while(1)
	{
		char buf[MAX_LINE];
		memset(buf , 0 , MAX_LINE);
		int n;
		if((n = recv(sockfd , buf , MAX_LINE , 0)) == -1)
		{
			perror("recv error.\n");
			exit(1);
		}//if
		buf[n] = '\0';		
		//若收到的是exit字元,則代表退出通訊
		if(strcmp(buf , "byebye.") == 0)
		{
			printf("Client closed.\n");
			close(sockfd);
			exit(1);
		}//if

		printf("\nClient: %s\n", buf);
	}//while
}

int main()
{

	//宣告套接字
	int listenfd , connfd;
	socklen_t clilen;
	//宣告執行緒ID
	pthread_t recv_tid , send_tid;

	//定義地址結構
	struct sockaddr_in servaddr , cliaddr;
	
	/*(1) 建立套接字*/
	if((listenfd = socket(AF_INET , SOCK_STREAM , 0)) == -1)
	{
		perror("socket error.\n");
		exit(1);
	}//if

	/*(2) 初始化地址結構*/
	bzero(&servaddr , sizeof(servaddr));
	servaddr.sin_family = AF_INET;
	servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
	servaddr.sin_port = htons(PORT);

	/*(3) 繫結套接字和埠*/
	if(bind(listenfd , (struct sockaddr *)&servaddr , sizeof(servaddr)) < 0)
	{
		perror("bind error.\n");
		exit(1);
	}//if

	/*(4) 監聽*/
	if(listen(listenfd , LISTENQ) < 0)
	{
		perror("listen error.\n");
		exit(1);
	}//if

	/*(5) 接受客戶請求,並建立執行緒處理*/

	clilen = sizeof(cliaddr);
	if((connfd = accept(listenfd , (struct sockaddr *)&cliaddr , &clilen)) < 0)
	{
		perror("accept error.\n");
		exit(1);
	}//if

	printf("server: got connection from %s\n", inet_ntoa(cliaddr.sin_addr));

	/*建立子執行緒處理該客戶連結接收訊息*/
	if(pthread_create(&recv_tid , NULL , recv_message, &connfd) == -1)
	{
		perror("pthread create error.\n");
		exit(1);
	}//if

	/*處理伺服器傳送訊息*/
	char msg[MAX_LINE];
	memset(msg , 0 , MAX_LINE);
	while(fgets(msg , MAX_LINE , stdin) != NULL)	
	{	
		if(strcmp(msg , "exit\n") == 0)
		{
			printf("byebye.\n");
			memset(msg , 0 , MAX_LINE);
			strcpy(msg , "byebye.");
			send(connfd , msg , strlen(msg) , 0);
			close(connfd);
			exit(0);
		}//if

		if(send(connfd , msg , strlen(msg) , 0) == -1)
		{
			perror("send error.\n");
			exit(1);
		}//if		
	}//while
}

3.3 client.c

/*
* 客戶端程式碼
*/
#include "config.h"

/*處理接收伺服器訊息函式*/
void *recv_message(void *fd)
{
	int sockfd = *(int *)fd;
	while(1)
	{
		char buf[MAX_LINE];
		memset(buf , 0 , MAX_LINE);
		int n;
		if((n = recv(sockfd , buf , MAX_LINE , 0)) == -1)
		{
			perror("recv error.\n");
			exit(1);
		}//if
		buf[n] = '\0';
		
		//若收到的是exit字元,則代表退出通訊
		if(strcmp(buf , "byebye.") == 0)
		{
			printf("Server is closed.\n");
			close(sockfd);
			exit(0);
		}//if

		printf("\nServer: %s\n", buf);
	}//while
}


int main(int argc , char **argv)
{
	/*宣告套接字和連結伺服器地址*/
    int sockfd;
	pthread_t recv_tid , send_tid;
    struct sockaddr_in servaddr;

    /*判斷是否為合法輸入*/
    if(argc != 2)
    {
        perror("usage:tcpcli <IPaddress>");
        exit(1);
    }//if

    /*(1) 建立套接字*/
    if((sockfd = socket(AF_INET , SOCK_STREAM , 0)) == -1)
    {
        perror("socket error");
        exit(1);
    }//if

    /*(2) 設定連結伺服器地址結構*/
    bzero(&servaddr , sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    if(inet_pton(AF_INET , argv[1] , &servaddr.sin_addr) < 0)
    {
        printf("inet_pton error for %s\n",argv[1]);
        exit(1);
    }//if

    /*(3) 傳送連結伺服器請求*/
    if( connect(sockfd , (struct sockaddr *)&servaddr , sizeof(servaddr)) < 0)
    {
        perror("connect error");
        exit(1);
    }//if	

	/*建立子執行緒處理該客戶連結接收訊息*/
	if(pthread_create(&recv_tid , NULL , recv_message, &sockfd) == -1)
	{
		perror("pthread create error.\n");
		exit(1);
	}//if	

	/*處理客戶端傳送訊息*/
	char msg[MAX_LINE];
	memset(msg , 0 , MAX_LINE);
	while(fgets(msg , MAX_LINE , stdin) != NULL)	
	{
		if(strcmp(msg , "exit\n") == 0)
		{
			printf("byebye.\n");
			memset(msg , 0 , MAX_LINE);
			strcpy(msg , "byebye.");
			send(sockfd , msg , strlen(msg) , 0);
			close(sockfd);
			exit(0);
		}//if
		if(send(sockfd , msg , strlen(msg) , 0) == -1)
		{
			perror("send error.\n");
			exit(1);
		}//if
	
		
	}//while
}

4.4 執行結果

伺服器端: