生產者/消費者模型改進版 ——佇列
阿新 • • 發佈:2019-02-05
上述消費者/生產者模型比較簡單,緩衝區中只能容納一條訊息。生產者每提交一條訊息到緩衝區中,就會通知消費者,等消費者取走訊息之後才能提交下一條訊息。同樣,消費者也必須等待生產者提交一條訊息後才能進行處理。這種設計的效率是比較低下的。
如果將緩衝區設計為一個先進先出的佇列,可以同時容納多條訊息,那麼只要緩衝區不滿,生產者就可以提交訊息;同時,只要緩衝區不空,消費者就可以取出訊息進行處理。這將大大提高整個程式的效率。
實現時,可以利用訊號量計數的特性,用訊號量的值表示緩衝區中訊息的個數及空閒空間的個數。但這時由於生產者和消費者可能同時訪問緩衝區,故需要再用一個互斥量來進行保護。
綜上,對一個緩衝區需要定義以下三個同步變數:
sem_t full; /* 表示緩衝區中訊息的個數 */
sem_t empty;/* 表示緩衝區中的空閒空間(還能容納的訊息個數) */
pthread_mutex_t lock; /* 同步對緩衝區的訪問 */
這些同步變數的初始化如下:
sem_init(&full, 0, 0); /* 緩衝區中訊息數為0 */
sem_init(&empty, 0, N); /* 緩衝區中的空閒空間數為N,即緩衝區的容量 */
pthread_mutex_init(&lock, NULL); /* 初始化互斥量 */
#include <stdio.h> #include <pthread.h> #include <semaphore.h> #include <time.h> #include <string.h> #include "SqQueue.h" // 訊號量和緩衝區 struct data { sem_t empty; // 用來控制生產者,只有緩衝區為空,生產者才可以生產訊息 sem_t full; // 用來控制消費者,只有緩衝區有資料,才可以消費 Queue q; // 緩衝區佇列 }; struct data msg; // 互斥鎖 pthread_mutex_t mutex; int num = 0; // 生產者執行緒工作函式 void *Produce(void *v) { while (1) { int time = rand() % 100 + 1; usleep(time*10000); // 只要佇列不滿 就能生產訊息, empty代表當前佇列剩餘的空間 sem_wait(&msg.empty); pthread_mutex_lock(&mutex); // 搶鎖 num++; // 生產一個訊息 // 將訊息放入到佇列裡面 EnQueue (&(msg.q), num); printf ("生產一條訊息\n"); pthread_mutex_unlock(&mutex); // 解鎖 // 生產完了,通知消費者進行消費 sem_post(&msg.full); } } // 消費者執行緒工作函式 void *Consum(void *v) { char buf[32]; while (1) { int time = rand() % 100 + 1; usleep(time*10000); // 只有緩衝區有資料,就能消費訊息, full當前佇列訊息的個數 sem_wait(&msg.full); pthread_mutex_lock(&mutex); // 搶鎖 int num; DeQueue(&(msg.q), &num); // 去佇列裡取出一條訊息 printf("消費了一條訊息: %d\n", num); pthread_mutex_unlock(&mutex); // 解鎖 // 消費完了,通知生產則會進行生產 sem_post(&msg.empty); } } int main() { srand ((unsigned int)time(NULL)); // 初始化訊號量 sem_init(&msg.empty, 0, 10); // 生產者,一開始要生產 10 條訊息 sem_init(&msg.full, 0, 0); // 消費者,一開始要不能消費訊息 // 初始化互斥鎖 pthread_mutex_init(&mutex, NULL); // 初始化佇列 InitQueue(&(msg.q)); pthread_t produceId; pthread_t consumId; int i = 0; for (i = 0; i < 5; i++) { // 建立生產者執行緒 pthread_create(&produceId, NULL, Produce, NULL); pthread_detach(produceId); } // 建立消費者執行緒 pthread_create(&consumId, NULL, Consum, NULL); // 等待執行緒結束 pthread_join(consumId, NULL); // 銷燬訊號量 sem_destroy(&msg.empty); sem_destroy(&msg.full); // 銷燬互斥鎖 pthread_mutex_destroy(&mutex); return 0; }