1. 程式人生 > >生產者與消費者模式(執行緒的同步與互斥)

生產者與消費者模式(執行緒的同步與互斥)

條件變數

條件變數的提出首先要涉及一個概念,就是生產者消費者模型:


生產者消費者,是在多執行緒同步的一個問題,兩個固定大小緩衝區的執行緒,在實際執行是會發生問題,生產者是生成資料放入緩衝區,重複過程,消費者在緩衝區取走資料。

生產者消費者的模型提出了三種關係,兩種角色,一個場所

三種關係: 
- 生產者之間的互斥關係 
- 消費者之間的競互斥關係 
- 生產者和消費者之間互斥和同步關係(同一時刻只能有一個,要麼在生產,要麼在消費,這就是互斥關係,只能在生產者生產完了之後才能消費,這就是同步關係)

兩個角色:一般是用程序或執行緒來承擔生產者或消費者

一個場所:有效的記憶體區域。(如單鏈表,陣列)
我們就可以把這個想象成生活中的超市供貨商,超市,顧客的關係,超市供貨商供貨,超市是擺放貨物的場所,然後使用者就是消費的。
條件變數屬於執行緒的一種同步的機制,條件變數與互斥鎖一起使用,可以使得執行緒進行等待特定條件的發生。條件本身是由互斥量保護的,執行緒在改變條件狀態之前首先會鎖住互斥量。其他執行緒在獲得互斥量之前不會察覺這種改變,因此互斥量鎖定後才能計算條件。

和互斥鎖一樣,使用條件變數,同樣首先進行初始化:


int pthread_cond_init(pthread_cond_t *restrict cond,
              const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

和互斥鎖的初始化一樣,它也可以採用init或者是直接利用巨集進行初始化。

條件變數本身就是依賴互斥鎖的,條件本身是由互斥量保護的,執行緒在改變條件狀態錢先要鎖住互斥量,它是利用執行緒間共享的全域性變數進行同步的一種機制。

我們使用pthread_cond_wait進行等待條件變數變為真,如果在規定的時間不能滿足,就會生成一個返回錯誤碼的變數。


 int pthread_cond_wait(pthread_cond_t *restrict cond,
              pthread_mutex_t *restrict mutex);
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

把鎖傳遞給wait函式,函式自動把等待條件的執行緒掛起,放入消費者等待佇列,然後解鎖掛起執行緒所佔有的互斥鎖,這個時候就可以去跑其他執行緒,然後當等待條件滿足的時候,這個時候從等待佇列中出來執行,獲得剛才自己所佔有的鎖。

一個執行緒可以呼叫pthread_cond_wait在一個Condition Variable上阻塞等待,這個函式做以下三步操作:
1. 釋放Mutex
2. 阻塞等待
3. 當被喚醒時,重新獲得Mutex並返回


滿足條件的時候可以使用函式pthread_cond_signal進行喚醒

      int pthread_cond_broadcast(pthread_cond_t *cond);
       int pthread_cond_signal(pthread_cond_t *cond);
  • 1
  • 2
  • 3
  • 1
  • 2
  • 3

這兩個函式都是用來進行喚醒執行緒操作的,signal一次從消費者佇列中至少喚醒一個執行緒,broad_cast能喚醒等待該條件的所有執行緒。

當然和mutex類似,條件變數也需要清除。

int pthread_cond_destroy(pthread_cond_t *cond);
生產者消費者示例:
#include<stdio.h>
#include<assert.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

typedef struct Node
{
	struct Node *next;
	int val;
}Node;

void list_init(Node **phead)
{
	assert(phead);

	Node *temp = (Node*)malloc(sizeof(Node));
	temp->next = NULL;
	temp->val = 0;
	*phead = temp;
}

void list_push(Node *phead,int _data)  //頭插
{
	assert(phead);

	Node *pM = (Node*)malloc(sizeof(Node));
	if (pM)
	{
		pM->val = _data;
		pM->next = phead->next;
		phead->next = pM;
	}
}

int empty(Node *phead)
{
	return (phead->next == NULL ? 1: 0);
}

void list_print(Node *phead)
{
	assert(phead);

	if (empty(phead))
		return ;

	Node *pCur = phead->next;
	while (pCur)
	{
		printf("%d->",pCur->val);
		pCur = pCur->next;
	}
	printf("%s\n","NULL");
}

void list_pop(Node *phead,int *data)  //頭刪
{
	assert(phead);
	
	if (empty(phead))
		return ;

	Node *ptemp = phead->next;

	phead->next = ptemp->next;

	*data = ptemp->val;
	free(ptemp);

	ptemp = NULL;
}

void list_destroy(Node *phead)
{
	assert(phead);

	while (!empty(phead))
	{
		int data;
		list_pop(phead,&data);
	}
}
//生產者執行緒
void* producer(void *arg)
{
	Node *phead = (Node *)arg;
	while (1)
	{
		pthread_mutex_lock(&mutex);  //申請互斥鎖
		int data = rand()%100;
		list_push(phead,data);
		pthread_mutex_unlock(&mutex);//釋放互斥鎖
		printf("prodecer sucess %d\n",data);
		pthread_cond_signal(&cond); //以單播的方式通知擁有條件變數的另外一個執行緒,告訴消費者,生產者生產好了,可以消費了。
		sleep(1);
	}

	return NULL;
}

//消費者執行緒
void* consumer(void *arg)
{
	Node *phead = (Node *)arg;
	while (1)
	{
		int data;

		pthread_mutex_lock(&mutex);
		
		if (empty(phead)) //如果沒有資源可以消費了,則等待
		{
			pthread_cond_wait(&cond,&mutex); //這個函式呼叫一定是在擁有互斥鎖的前提下.這個函式做三件事,第一:釋放互斥鎖,二,阻塞等待,三,喚醒的時候重新獲得互斥鎖。
		}
		list_pop(phead,&data);//有資源就消費
		pthread_mutex_unlock(&mutex);
		printf("consumer sucess %d\n",data);
	}
	return NULL;
}

int main()
{
	Node *phead;
	list_init(&phead);

	pthread_t id1;
	pthread_t id2;


	pthread_create(&id1,NULL,producer,(void*)phead);
	pthread_create(&id2,NULL,consumer,(void*)phead);


	pthread_join(id1,NULL);
	pthread_join(id2,NULL);


	pthread_mutex_destroy(&mutex);
	pthread_cond_destroy(&cond);
	
	return 0;
}

訊號量

訊號量相關函式:

訊號量(Semaphore)Mutex類似,表示可用資源的數量,Mutex不同的是這個數量可以大於1

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

sem_init() 初始化一個定位在 sem 的匿名訊號量。value 引數指定訊號量的初始值。 pshared 引數指明訊號量是由程序內執行緒共享,還是由程序之間共享。如果 pshared 的值為 0,那麼訊號量將被程序內的執行緒共享,並且應該放置在所有執行緒都可見的地址上(如全域性變數,或者堆上動態分配的變數)。如果 pshared 是非零值,那麼訊號量將在程序之間共享

int sem_destroy(sem_t *sem);
int sem_wait(sem_t *sem);//類似P操作
int sem_trywait(sem_t *sem);
int sem_post(sem_t *sem);//類似V操作


用陣列模擬環形佇列儲存資料程式碼:
#include<stdio.h>
#include<stdio.h>
#include<pthread.h>
#include<unistd.h>
#include<semaphore.h>

//二個規則,(1)消費者一定要等到生產才能消費,(2)生產者如果生產的快的話,不能套消費者一個圈

#define SIZE 64

sem_t blanks; //二個訊號量
sem_t datas;
int buf[SIZE]; //用陣列模擬一個環形佇列。


void *productor(void *arg)
{
	int i = 0; //i = 0表示生產的起始位置。
		while (1)
		{
			//	pthread_mutex_lock(&mutex_p);
			sem_wait (&blanks);//生產者在生產之前要有格子資源。
			int data = rand()%1234;
			buf[i] = data;
			printf("productor done...data:%d\n",data);
			i++;
			i%=SIZE;//i++總有超過陣列的長度的時候,為了模擬環形佇列,所以求模。
			sleep(1); //生產的慢點
			sem_post(&datas);//生產者生產完了,資料資源就多了一個。
			//	pthread_mutex_unlock(&mutex_p);
		}
		return NULL;
}
	
void *consummer(void *arg)
{
	int i = 0;
	while (1)
	{										
	sem_wait(&datas); //消費者在消費之前要有消費資源。
	int data = buf[i];
	printf("consummer done...data:%d\n",data);
	i++;
	i%= SIZE;
	sem_post(&blanks);//消費者消費完了格子資源就多了一個。
	}
	return NULL;
}

	
int main()
{
	sem_init(&blanks,0,SIZE);
	sem_init(&datas,0,0);   //訊號量的初始化要在多執行緒之前

	pthread_t id1,id2;
	pthread_create(&id1,NULL,productor,NULL);	
	pthread_create(&id2,NULL,consummer,NULL);

	pthread_join(id1,NULL);
	pthread_join(id2,NULL);


	sem_destroy(&blanks);
	sem_destroy(&datas);

	return 0;									
}