1. 程式人生 > >生產者/消費者模型改進版 ——佇列

生產者/消費者模型改進版 ——佇列

上述消費者/生產者模型比較簡單,緩衝區中只能容納一條訊息。生產者每提交一條訊息到緩衝區中,就會通知消費者,等消費者取走訊息之後才能提交下一條訊息。同樣,消費者也必須等待生產者提交一條訊息後才能進行處理。這種設計的效率是比較低下的。

    如果將緩衝區設計為一個先進先出的佇列,可以同時容納多條訊息,那麼只要緩衝區不滿,生產者就可以提交訊息;同時,只要緩衝區不空,消費者就可以取出訊息進行處理。這將大大提高整個程式的效率。
  實現時,可以利用訊號量計數的特性,用訊號量的值表示緩衝區中訊息的個數及空閒空間的個數。但這時由於生產者和消費者可能同時訪問緩衝區,故需要再用一個互斥量來進行保護。
    綜上,對一個緩衝區需要定義以下三個同步變數:
sem_t full; /* 表示緩衝區中訊息的個數 */
sem_t empty;/* 表示緩衝區中的空閒空間(還能容納的訊息個數) */
pthread_mutex_t lock; /* 同步對緩衝區的訪問 */
這些同步變數的初始化如下:
sem_init(&full, 0, 0); /* 緩衝區中訊息數為0 */
sem_init(&empty, 0, N); /* 緩衝區中的空閒空間數為N,即緩衝區的容量 */
pthread_mutex_init(&lock, NULL); /* 初始化互斥量 */

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <time.h>
#include <string.h>
#include "SqQueue.h"

// 訊號量和緩衝區
struct data
{
	sem_t empty;  // 用來控制生產者,只有緩衝區為空,生產者才可以生產訊息
	sem_t full;   // 用來控制消費者,只有緩衝區有資料,才可以消費
	Queue q;      // 緩衝區佇列
};

struct data msg;
// 互斥鎖
pthread_mutex_t mutex;

int num = 0;

// 生產者執行緒工作函式
void *Produce(void *v)
{	
	while (1)
	{
		int time = rand() % 100 + 1;
		usleep(time*10000);

		// 只要佇列不滿 就能生產訊息, empty代表當前佇列剩餘的空間
		sem_wait(&msg.empty);
		pthread_mutex_lock(&mutex);    // 搶鎖
		
		num++;   // 生產一個訊息
  		
		// 將訊息放入到佇列裡面
		EnQueue (&(msg.q), num);
		printf ("生產一條訊息\n");
		
		pthread_mutex_unlock(&mutex);  // 解鎖
		// 生產完了,通知消費者進行消費
		sem_post(&msg.full);
	}
}

// 消費者執行緒工作函式
void *Consum(void *v)
{
	char  buf[32];
	while (1)
	{
		int time = rand() % 100 + 1;
		usleep(time*10000);
		
		// 只有緩衝區有資料,就能消費訊息, full當前佇列訊息的個數
		sem_wait(&msg.full);
		pthread_mutex_lock(&mutex);    // 搶鎖
		
		int num;
		DeQueue(&(msg.q), &num);   // 去佇列裡取出一條訊息
		printf("消費了一條訊息: %d\n", num);
		
		pthread_mutex_unlock(&mutex);  // 解鎖
		// 消費完了,通知生產則會進行生產
		sem_post(&msg.empty);
	}
}

int main()
{
	srand ((unsigned int)time(NULL));
	
	// 初始化訊號量
	sem_init(&msg.empty, 0, 10); // 生產者,一開始要生產 10 條訊息
	sem_init(&msg.full, 0, 0);   // 消費者,一開始要不能消費訊息
	
	// 初始化互斥鎖
	pthread_mutex_init(&mutex, NULL);
	
	// 初始化佇列
	InitQueue(&(msg.q));

	pthread_t produceId;
	pthread_t consumId;
	
	int i = 0;
	for (i = 0; i < 5; i++)
	{
		// 建立生產者執行緒
		pthread_create(&produceId, NULL, Produce, NULL);
		pthread_detach(produceId);
	}
	
	// 建立消費者執行緒
	pthread_create(&consumId, NULL, Consum, NULL);
	
	// 等待執行緒結束
	pthread_join(consumId, NULL);
	
	// 銷燬訊號量
	sem_destroy(&msg.empty);
	sem_destroy(&msg.full);
	
	// 銷燬互斥鎖
	pthread_mutex_destroy(&mutex);
	return 0;
}