Linux實現生產者消費者模型
生產者消費者模型
簡單來說就是“321原則(並非某一規則,而是為了理解生產者消費者模型)”
“3”代表的是三種關係
生產者與消費者的互斥與同步關係
生產者與生產者的互斥(或競爭)關係
消費者與消費者的互斥(或競爭)關係
“2”代表兩種角色
生產者:往交易場所放東西(在計算機中一般都是資料)的人
消費者:從交易場所取東西的人
“1”代表一個交易場所
所謂交易場所就是記憶體中具有儲存資料的一段有界緩衝區
綜上,給出生產者消費者模型的描述:兩個程序共享一個緩衝區,一個程序稱為生產者向緩衝區中放資料,另一個稱為消費者從緩衝取中取資料,當緩衝區中被放時,生產者程序就必須可進入掛起狀態,直到消費者從緩衝中取走資料時,生產者才能繼續向緩衝區中存放資料,同樣當緩衝取中沒有資料時,消費者程序就必須進入掛起休眠狀態,直到生產者向緩衝區中放入資料時,消費者才能被喚醒繼續從緩衝區中取走資料。
圖示:
基於連結串列的生產者消費者模型
分析:當使用連結串列來模擬生產者消費者模型時,我們可以藉助連結串列的插入來扮演生產者的角色,用連結串列的刪除來充當消費者的角色,為了便於實現,我們直接採用連結串列的頭插和連結串列的頭刪操作來模擬放資料和取資料這兩個過程。
(1)條件變數介面說明
int pthread_cond_wait(pthread_cond_t* redtrist cond , pthread_mutex_t* redtrist )//掛起等待
1、不滿足條件時必須進行休眠(釋放Mutex)
2、不能抱著鎖資源休眠(進行阻塞式等待)
3、能被喚醒(被喚醒時能夠重新獲得Mutex資源並等待)int pthread_cond_signal(pthread_cond_t* cond ) //喚醒
1、生產者生產好資料之後通知消費者來消費資料
2、消費者消費完資料後通知生產者前來生產
(2)鎖相關介面說明
int pthread_mutex_lock(pthread_mutex_t *mutex);
阻塞式加鎖
int pthread_mutex_trylock(pthread_mutex_t *mutex);
非阻塞式加鎖
int pthread_mutex_unlock(pthread_mutex_t *mutex);
解鎖,無論是阻塞式的加鎖還是非阻塞式的加鎖都需要使用此函式進行解鎖
基於單鏈表的生產者消費者模型實現
#include<stdio.h>
#include<pthread.h>
#include<stdlib.h>
typedef struct list
{
int data;
struct list* next;
}list,*plist,**pplist;
plist head;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
plist alloc_node(int d,plist l)
{
plist tmp=(plist)malloc(sizeof(list));
if(!tmp){
perror("malloc");
exit(1);
}
tmp->data=d;
tmp->next=l;
return tmp;
}
void initlist(pplist l){
*l=alloc_node(0,NULL);
}
int isempty(plist l){
return l->next==NULL?1:0;
}
void free_node(plist l){
if(l!=NULL){
free(l);
l=NULL;
}
}
void push_front(plist l,int d){
plist tmp=alloc_node(d,NULL);
tmp->next=l->next;
l->next=tmp;
}
void pop_front(plist l,int* out)
{
if(!isempty(l)){
plist tmp=l->next;
l->next=tmp->next;
*out=tmp->data;
free_node(tmp);
}
}
void showlist(plist l)
{
plist start=l->next;
while(start){
printf("%d ",start->data);
start=start->next;
}
printf("\n");
}
void destroy(plist l){
int data;
while(!isempty(l)){
pop_front(l,&data);
}
free_node(l);
}
void* consume(void* arg){
pthread_mutex_t* lockp=(pthread_mutex_t*)arg;
int data=0;
while(1){
pthread_mutex_lock(lockp);
if(isempty(head)){
pthread_cond_wait(&cond,&lockp);
}
pop_front(head,&data);
printf("consum done: %d\n",data);
pthread_mutex_unlock(lockp);
pthread_cond_signal(&cond);
}
}
void* product(void* arg){
pthread_mutex_t* lockp=(pthread_mutex_t*)arg;
int data=0;
while(1){
pthread_mutex_lock(lockp);
data=rand()%1234;
push_front(head,data);
printf("product done: %d\n",data);
pthread_mutex_unlock(lockp);
pthread_cond_signal(&cond);
}
}
int main()
{
pthread_mutex_t lock;
pthread_mutex_init(&lock,NULL);
initlist(&head);
pthread_t consumer,producter;
pthread_create(&consumer,NULL,consume,(pthread_mutex_t *)&lock);
pthread_create(&producter,NULL,product,(pthread_mutex_t *)&lock);
pthread_join(consumer,NULL);
pthread_join(producter,NULL);
destroy(head);
pthread_mutex_destroy(&lock);
return 0;
}
基於多元訊號量的生產者消費者
背景知識
涉及到訊號量的知識,首先我們需要搞清楚的一個東西就是PV操作的含義,PV操作是由P操作原語和V操作原語組成的(原語很好理解,就是任務要麼全部做完,要麼都不做),對於訊號量的具體操作如下所示:
P(S):
(1)將訊號量S的值進行減1操作;
(2)如果S>=0則該程序繼續執行,否則該程序就會被掛起等待,直到有程序釋放了,才可以被喚醒;
V(S):
(1)與P操作正好相反,V操作是將訊號量的值加1;
(2)如果S>0,則該程序繼續執行,否則釋放佇列中第一個等待訊號量的程序。
PV操作的意義:用訊號量及PV操作,實現程序的同步與互斥,PV操作屬於程序的低階通訊。
在使用型號量來模擬生產者和消費者模型時,我們是環形佇列來實現生產者和消費者這兩種角色之間的對於資源的存取,首先我們知道佇列的底層儲存資料的方式其實就是一個數組,只要控制好對於隊頭和隊尾的相關計算,我們就可以實現迴圈佇列,而生產者依舊是在有空間的時候進行存放資料,沒有空間時進入掛起等待狀態,消費者則是在有資料時進行取資料,沒有資料時進行掛起等待操作,這樣我們便可以實現生產者和消費模型
不過先不要高興的太早了,我們還需要制定這個規則才能實現:
生產者優先:其實就算消費者優先,由於剛開始沒有生產出資料,消費者也會被掛起;
消費者永遠不能追上消費者:試想一下如果消費者追上生產者或者超過消費者的時候,此時消費者消費的並不是生產者實際所生產出的資料,而屬於垃圾資料;
生產者不能將消費者包一圈:這個也很好理解,如果生產者允許將消費者包一圈的話,那就相當於生產者可以無限的生產,並不停的覆蓋掉原來所產生的資料,那麼如果原來生產出的資料中如果有的是消費者需要獲取的資料,那麼除了生產者在次生產出該資料外,消費者將再也不能得到所想要的資料;
圖示:
程式碼實現:
#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>
int ring[64];
sem_t semBlank;
sem_t semData;
void* consume(void* arg){
int step=0;
while(1){
sem_wait(&semData);
int data=ring[step];
step++;
step%=64;
printf("consume done: %d\n",data);
sem_post(&semBlank);
}
}
void* product(void* arg){
int step=0;
while(1){
sem_wait(&semBlank);
int data=rand()%1234;
ring[step]=data;
step++;
step%=64;
printf("produce done: %d\n",data);
sem_post(&semData);
}
}
int main()
{
sem_init(&semBlank,0,64);
sem_init(&semData,0,0);
pthread_t consumer,producter;
pthread_create(&consumer,NULL,consume,NULL);
pthread_create(&producter,NULL,product,NULL);
pthread_join(consumer,NULL);
pthread_join(producter,NULL);
sem_destroy(&semBlank);
sem_destroy(&semData);
return 0;
}
執行結果: