【執行緒的同步與互斥 (互斥量 條件變數 訊號量)】生產者與消費者模型
執行緒
執行緒是程序中的一個獨立的執行流,由環境(包括暫存器集和程式計數器)和一系列要執行的置零組成。所有程序至少有一個執行緒組成,多執行緒的程序包括多個執行緒,所有執行緒共享為程序分配的公共地址空間,所以文字段(Text Segment)和資料段(Datan Segment)都是共享的,如果定義一個函式,在各執行緒中都可以呼叫,定義一個全域性變數,在各個執行緒中都可以訪問到。
從邏輯上看,多執行緒就就是一個應用程式中。由多個執行部分同時執行,但作業系統並沒有將多個執行緒看做成多個獨立的應用實現程序的排程,管理以及資源分配,一個執行緒可以建立和撤銷另一個執行緒,同一個程序中的多個執行緒之間可以併發執行。
執行緒是系統排程的基本單位
程序是承擔分配資源的一個實體
各執行緒共享資源
- 資料段各文字段
- 對全域性變數的訪問
- 檔案描述符
- 每種訊號的處理方式(SIG_IGN,SIG_DFL或自定義的訊號處理函式)
- 當前的工作目錄
- 使用者的id和組id
各執行緒私有資源
1. 執行緒id
2. 上下文,包括暫存器的值,程式計數器和棧指標
3. 棧空間
4. error變數
5. 訊號遮蔽字
6. 排程優先順序
執行緒操作
- 建立執行緒
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,void *(*start_routine) (void *), void *arg);
第一個引數thread是用來儲存執行緒id,引數指向執行緒id的指標。如果在父子程序中建立多個執行緒,執行緒id值有可能相同,如果建立成功,在次函式中返回id,如果設定為NULL,將不會返回執行緒的標號符id
第二個引數 pthread_attr_t *attr用來設定執行緒屬性,一般設定為NULL。
第三個引數是執行緒執行的程式碼其實地址,即在此執行緒中執行那段程式碼
第四個引數是執行函式的地址。
如果執行成功,返回0,失敗返回-1.
2.執行緒終止
終止執行緒的三種情況
- 呼叫pthread_exit函式退出
- 呼叫pthread_cancel函式取消該執行緒
- 建立執行緒的程序退出或整個函式結束
- 其中一個執行緒執行了exec類函式執行了新的程序
#include <pthread.h>
void pthread_exit(void *retval);
該引數用來儲存執行緒退出狀態
3.執行緒等待
#include <pthread.h>
int pthread_join(pthread_t thread, void **retval);
呼叫該函式的執行緒將掛起等待,直到id的執行緒終止。當含糊是返回時,處於等待的執行緒資源被回收。成功返回0,失敗返回錯誤碼。
第一個引數為等待執行緒的id
第二個引數使用者自定義的指標,指向一個儲存等待執行緒的完整退出狀態的靜態區。
執行緒的同步與互斥
互斥量
多個執行緒同時訪問共享資料時會發生衝突。例如兩個執行緒對同一個變數進行+1操作,當但兩個執行緒同時進行+1操作時,最後結果只加了一次而非兩次。
互斥以排他的方式共享資料被併發執行。我們引入的互斥鎖。
互斥鎖
互斥鎖是一個二元變數,其狀態為開鎖與上鎖,將某個共享資源與某個特定的互斥鎖繫結後,對該共享資源的訪問如下
- 在訪問該資源前,首先申請該互斥鎖,如果該互斥鎖處於開鎖狀態,則申請到該鎖物件,並立即佔有該鎖使該鎖處於鎖定狀態,以防止其它執行緒訪問該資源。如果該互斥鎖處於鎖定狀態,預設阻塞等待。
- 只有鎖定該互斥鎖的程序才能釋放該互斥鎖。其它執行緒的釋放操作無效。
互斥鎖的基本操作
1.初始化和銷燬互斥鎖
定義互斥鎖
pthread_mutex_t lock;
初始化和銷燬互斥鎖
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
//第一個引數mutex是指向初始化互斥鎖的指標
//第二個引數mutexattr是指向屬性物件的指標,該屬性物件定義的初始化的互斥鎖的屬性。如果該指標為NULL,則使用預設的屬性
int pthread_mutex_destroy(pthread_mutex_t *mutex);
//銷燬互斥鎖,成功返回0,失敗返回錯誤編碼
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//PTHREAD_MUTEX_INITIALIZER,該巨集初始化靜態分配的互斥鎖。對於靜態分配初始化的互斥鎖,不需要呼叫pthread_mutex_init()函式
2.申請互斥鎖
如果一個執行緒要佔用一共享資源,其必須先給互斥鎖上鎖
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
//以阻塞方式獲得互斥鎖
int pthread_mutex_trylock(pthread_mutex_t *mutex);
//以非阻塞方式獲得互斥鎖
3.釋放互斥鎖
int pthread_mutex_unlock(pthread_mutex_t *mutex);
//釋放互斥鎖釋放操作只能由佔有改該互斥鎖的的執行緒來完成
條件變數
條件變數是執行緒的一種同步機制,條件變數給多個執行緒提供了一個會和的場所。
條件變數是用變數的形式來描述臨界資源的是否在被訪問,如果資源不能被訪問,則申請互斥鎖的執行流就會掛起到某個條件變數上,同時等待喚醒。
條件變數本身是被互斥量保護的,條件變數不能單獨使用,必須配合互斥鎖一起實現對資源的互斥訪問。
基本操作
1.初始化,銷燬條件變數
#include <pthread.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
//定義條件變數(全域性變數)
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
//初始化條件變數
//第一個引數是定義的條件變數的指標
//第二個引數是指向屬性物件的指標,該屬性物件定義要初始化的條件變數的特性,一般設定為NULL,設定為預設屬性。
int pthread_cond_destroy(pthread_cond_t *cond);
//銷燬條件變數
2.通知等待條件變數的執行緒
#include <pthread.h>
int pthread_cond_broadcast(pthread_cond_t *cond);
//用於喚醒等待出現與條件變數關聯的條件的所有執行緒
int pthread_cond_signal(pthread_cond_t *cond);
//用於喚醒等待出現與條件變數關聯的條件的第一個執行緒
3.等待條件變數
#include <pthread.h>
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
//在指定的時間範圍內等待條件變數
//第一個引數是要等條件變數的指標
//第二個引數是指向與條件變數cond關聯的互斥鎖的指標
//第三個引數是等待過期時的絕對時間,如果在此時間範圍內取到該條件變數函式返回。該時間為從1970-1-1:0:0:0以來的秒數,即一個絕對時間。
struct timespec
{
long ts_sec;
long ts_nsec;
};
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
//用來阻塞等待某個條件變數
//第一個引數是要等條件變數的指標
//第二個引數是指向與條件變數cond關聯的互斥鎖的指標
這兩個函式都包含一個互斥鎖,如果某執行緒因等待變數進入等待狀態,將隱含釋放其申請的互斥鎖,同樣再返回時,首先要申請到互斥鎖物件。
條件變數和互斥鎖協同工作
1. 鎖定互斥鎖
2.測試條件是否滿足
3.如果滿足,執行操作,完成後釋放互斥鎖
4.2如果不滿足,使用條件變數等待,當另一個執行緒使此條件滿足時,執行步驟3。
生產者與消費者模型
生產者與消費者模型,是一個解決多執行緒同步問題的。生產者和消費者共享一塊固定的緩衝區,生產者向這塊緩衝區生產資料,消費者從這塊緩衝區拿走資料。由於生產者執行緒和消費者執行緒共享同一個緩衝區,為了讀寫資料正確,使用時緩衝區佇列時要保證兩個執行緒互斥。生產者執行緒和消費者執行緒必須滿足:生產者寫入緩衝區的資料不能超過緩衝區的容量,消費者讀取的數目不能超過生產者寫入的數目。
當緩衝區為滿時,生產者執行緒不往緩衝區中寫資料
當緩衝區為空時,消費者執行緒不從緩衝區裡讀資料
對生產者與消費者模型的“三 二 一”原則
三種關係
生產者與生產者互斥關係
消費者與消費者互斥關係
生產者與消費者同步與互斥關係
二種角色
生產者 消費者
一種交易場所
共享的緩衝區
基於單鏈表的單生產者與單消費者模型的實現
//定義全域性變數的單鏈表,實現頭插頭刪等操作
//實現執行緒的同步與互斥定義了全域性的互斥鎖和條件變數
#include<stdio.h>
#include<pthreaod.h>
#include<stdlib.h>
typedef struct ListNode
{
int data;
struct ListNode* next;
}node_t,*node_p,**node_pp;
node_p head=NULL;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
node_p AllocNode(int x,node_p node)//申請結點
{
node_p temp=(node_p)malloc(sizeof(node_t));
if(temp==NULL)
{
perror("malloc");
exit(1);
}
temp->data=x;
temp->next=node;
return temp;
}
void InitList(node_pp node)//初始化
{
*node=AllocNode(0,NULL);
}
int IsEmpty(node_p node)//判空
{
return node->next==NULL?1:0;
}
void FreeNode(node_p node)//釋放結點
{
if(node!=NULL)
{
free(node);
node=NULL;
}
}
void PushFront(int d,node_p node)//頭插
{
node_p temp=AllocNode(d,NULL);
temp->next=node->next;
node->next=temp;
}
void PopFront(node_p node,int* out)//頭刪
{
if(!IsEmpty(node))
{
node_p p=node->next;
node->next=p->next;
*out=p->data;
FreeNode(p);
}
}
void Destory(node_p node)
{
int out=0;
while(!IsEmpty(node))
{
PopFront(node,&out);
}
free(node);
}
void ShowList(node_p node)
{
node_p cur=node->next;
while(!IsEmpty(cur))
{
printf("%d ",cur->data);
cur=cur->next;
}
printf("\n");
}
//作為生產者,生產資料,拿到鎖進行操作,往快取區裡放資料,在生產一個數據後,釋放鎖並喚醒在條件變數等到的第一個執行緒
void* Productor(void* arg)
{
int data=0;
while(1)
{
pthread_mutex_lock(&lock);
data=rand()%1234;
PushFront(data,head);
printf("productor done..%d\n",data);
pthread_mutex_unlock(&lock);
pthread_cond_signal(&cond);
sleep(1);
}
}
//消費者緩衝區裡拿資料,上鎖,當緩衝區裡沒有資料時,就用條件變數阻塞等待。如果有,就消費資料。釋放鎖
void* Consumer(void* arg)
{
while(1)
{
pthread_mutex_lock(&lock);
int data=0;
while(IsEmpty(head))
{
pthread_cond_wait(&cond,&lock);
printf("consumer wait...\n");
}
PopFront(head,&data);
printf("consume done..%d\n",data);
pthread_mutex_unlock(&lock);
}
}
int main()
{
InitList(&head);
pthread_t consumers,productor;
pthread_create(&consumers,NULL, Consumer,NULL);
pthread_create(&productor,NULL,Productor,NULL);
pthread_join(consumers,NULL);
pthread_join(productor,NULL);
Destory(head);
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
return 0;
}
實現生產者一個數據消費者消費一個數據。
訊號量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
//初始化一個semaphor變數,
//第一個引數是定義的semphor變數,
//第二個引數pshared引數為0表示訊號量用於程序間同步
//第三個引數表示可用資源的數量
int sem_wait(sem_t *sem);
//可以獲得資源(P操作),使訊號量的值-1;如果semaphor變數為0時,則掛起等待。(阻塞)
int sem_trywait(sem_t *sem);
//避免阻塞的,不掛起等待,訊號量為0時,不會阻塞,返回-1並且將error置為EAGAIT
int sem_post(sem_t *sem);
//使訊號量+1,相當於解鎖過程,可以釋放資源(V操作)。使semaphor變數,
//+1,同時喚醒掛起等待的執行緒。
int sem_destroy(sem_t *sem);
//當訊號量使用完後,丟棄。
基於環形佇列的生產者與消費者模型的實現
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<semaphore.h>
int buf[64];/利用陣列實現環形佇列
sem_t sem_blank;//訊號量,表示佇列中的空格量
sem_t sem_data;//訊號量,表示佇列中的資料量
void* product(void* arg)
{
int step=0;
int data=0;
while(1)
{
int data=rand()%1234;//產生一個隨機數
sem_wait(&sem_blank);//表示的空格的訊號量-1,當為0時,掛起等待
buf[step]=data;//放資料
sem_post(&sem_data);//表示資料的訊號量+1,喚醒等待的執行緒。
printf("productor done..%d\n",data);
step++;
step=step%64;
sleep(1);
}
}
//生產者生產資料,當沒有空格時,表示空格的訊號量為0,sem_wait該執行緒就掛起等待,sem_post喚醒等待的執行緒並使對應的訊號量+1
void* consume(void* arg)
{
int step=0;
int data=0;
while(1)
{
sem_wait(&sem_data);//表示資料的訊號量-1
data=buf[step];//放資料
sem_post(&sem_blank);//表示空格的訊號量+1
printf("consumer done..%d\n",data);
step++;
step=step%64;
}
}
//佇列中沒有空格時,喚醒了消費者執行緒消費,一樣的,取資料當沒有資料時,sem_wait使消費者執行緒掛起等待,sem_post喚醒等待的生產者執行緒,並將對應的訊號量+1
int main()
{
pthread_t p,c;
pthread_create(&p,NULL,product,NULL);
pthread_create(&c,NULL,consume,NULL);
sem_init(&sem_blank,0,64);
sem_init(&sem_data,0,0);
pthread_join(p,NULL);
pthread_join(c,NULL);
sem_destroy(&sem_blank);
sem_destroy(&sem_data);
return 0;
}
實現了環形佇列的單生產者與單消費者模型,那麼可以在此基礎上實現多執行緒的。
基於環形佇列多執行緒的生產者與消費者模型的實現
在上面實現的基礎上實現多執行緒,需要加互斥鎖實現消費者與消費者,生產者與生產者之間的互斥。
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#include<semaphore.h>
int buf[64];
sem_t sem_blank;
sem_t sem_data;
pthread_mutex_t lock1=PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t lock2=PTHREAD_MUTEX_INITIALIZER;
void* product(void* arg)
{
static int step=0;
int data=0;
while(1)
{
pthread_mutex_lock(&lock1);
int data=rand()%1234;
sem_wait(&sem_blank);
buf[step]=data;
sem_post(&sem_data);
pthread_t id=pthread_self();
printf("%dproductor done..%d\n",id,data);
step++;
step=step%64;
pthread_mutex_unlock(&lock1);
sleep(1);
}
}
void* consume(void* arg)
{
static int step=0;
int data=0;
while(1)
{
pthread_mutex_lock(&lock2);
sem_wait(&sem_data);
data=buf[step];
sem_post(&sem_blank);
pthread_t id=pthread_self();
printf("%dconsumer done..%d\n",id,data);
step++;
step=step%64;
pthread_mutex_unlock(&lock2);
sleep(1);
}
}
int main()
{
pthread_t p1,p2,c1,c2;
pthread_create(&p1,NULL,product,NULL);
pthread_create(&c1,NULL,product,NULL);
pthread_create(&p2,NULL,consume,NULL);
pthread_create(&c2,NULL,consume,NULL);
sem_init(&sem_blank,0,64);
sem_init(&sem_data,0,0);
pthread_join(p1,NULL);
pthread_join(p2,NULL);
pthread_join(c1,NULL);
pthread_join(c2,NULL);
sem_destroy(&sem_blank);
sem_destroy(&sem_data);
pthread_mutex_destroy(&lock1);
pthread_mutex_destroy(&lock2);
return 0;
}