linux下互斥鎖實現的簡單的生產者消費者問題
阿新 • • 發佈:2018-11-04
這個程式實現的功能很簡單,也算是入門linux下的多執行緒程式設計了吧~
其創造了兩個生產者和一個消費者,兩個生產者通過互斥鎖實現同步,往緩衝區裡放入資料,資料的值和其下標值一樣,方便消費者的檢驗
消費者等到生產者放完資料後,從緩衝區中取出資料,並進行檢驗,看是否有出現差錯,沒有的話即實現了同步操作
/* include main */ #include <stdio.h> #include <stdlib.h> #include <pthread.h> #define MAXNITEMS 10 //緩衝區大小為10 /* 兩個生產者,實現它們之間的同步 緩衝區內第一個位置放1,第二個位置放2這種 一個消費者,檢驗其是否出錯 */ struct { pthread_mutex_t lock; int key; int value; int buff[MAXNITEMS]; }shared = { PTHREAD_MUTEX_INITIALIZER }; //這樣對結構體初始化 void * create_producer1() { for(;;) { pthread_mutex_lock(&shared.lock); if(shared.key >= MAXNITEMS) { pthread_mutex_unlock(&shared.lock); printf("producer1: my function is over~\n"); pthread_exit(NULL); } shared.buff[shared.key] = shared.value; printf("producer1 have put the data: %d\n",shared.value); shared.key ++; shared.value ++; pthread_mutex_unlock(&shared.lock); } } void * create_producer2() { for(;;) { pthread_mutex_lock(&shared.lock); if(shared.key >= MAXNITEMS) { pthread_mutex_unlock(&shared.lock); printf("producer2: my function is over~\n"); pthread_exit(NULL); } shared.buff[shared.key] = shared.value; printf("producer2 have put the data: %d\n",shared.value); shared.key ++; shared.value ++; pthread_mutex_unlock(&shared.lock); } } void * create_consumer() { for(int i = 0;i < 10;i++) { if(shared.buff[i] != i) { printf("buff[%d] = %d was wrong!\n", i , shared.buff[i]); } } printf("All have scanned!\n"); return (NULL); } int main(int argc, char const *argv[]) { pthread_t tid1,tid2; pthread_t consumer_tid; printf("I will create two producers and one consumers\n"); if(pthread_create(&tid1,NULL,create_producer1,NULL) != 0) printf("create producer1 failed!\n"); else printf("producer1 has been create!\n"); if(pthread_create(&tid2,NULL,create_producer2,NULL) != 0) printf("create producer2 failed!\n"); else printf("producer2 has been create!\n"); //等待生產者執行緒結束 pthread_join(tid1,NULL); pthread_join(tid2,NULL); if(pthread_create(&consumer_tid,NULL,create_consumer,NULL) != 0) printf("create consumer failed!\n"); else printf("consumer has been create and begin check out the buff...\n"); pthread_join(consumer_tid,NULL); return 0; }
執行截圖:
實現進階:
我們上一個實現中,規定了當生產者往緩衝區裡放完資料後,消費者才去檢驗,所以就不用考慮生產者和消費者之間的同步問題。這次我們讓生產者和消費者一起產生,然後讓消費者等待生產者放入其想要監測的資料,即加上這麼一段等待的程式碼
void consumer_wait(int i) { for(;;) { pthread_mutex_lock(&shared.lock); if (i < shared.key) { pthread_mutex_unlock(&shared.lock); return; } pthread_mutex_unlock(&shared.lock); } }
消費者的程式碼改為如下:
void * create_consumer()
{
for(int i = 0;i < 10;i++)
{
consumer_wait(i);
if(shared.buff[i] != i)
{
printf("buff[%d] = %d was wrong!\n", i , shared.buff[i]);
}
}
printf("All have scanned!\n");
return (NULL);
}
實現進階二:
由於上一個實現中,當消費者在等待生產者的時候,它是一遍一遍的輪詢,這樣很消耗計算機記憶體資源,我們為了更高效一點,引入訊號量。
linux中每一個訊號量都關聯一個互斥鎖,所以我們建立兩個結構體,一個存放訊號量和互斥鎖,還又訊號量判斷nready,nready=0則消費者阻塞,nready=1則消費者接觸堵塞。,一個結構體存放下一個要放入緩衝區下標,還有數值,還有防止衝突的互斥鎖。
兩個結構體:其自動初始化,也可以手動初始化
struct
{
pthread_mutex_t lock;
int key;
int value;
}put = {
PTHREAD_MUTEX_INITIALIZER
};
//通過訊號量來控制訊息的傳遞 每一個訊號量關聯著一個互斥鎖
struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
int nready;
} nready = {
PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER
};
全部程式碼如下:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define MAXNITEMS 10 //緩衝區大小為10
/*
兩個生產者,實現它們之間的同步
緩衝區內第一個位置放1,第二個位置放2這種
一個消費者,檢驗其是否出錯
生產者消費者一起建立 還要保證生產者和消費者之間的同步
還要防止再期待的條件尚未達到的時候不斷輪詢造成資源浪費,所以要有訊號量
*/
int buff[MAXNITEMS];
struct
{
pthread_mutex_t lock;
int key;
int value;
}put = {
PTHREAD_MUTEX_INITIALIZER
};
//通過訊號量來控制訊息的傳遞 每一個訊號量關聯著一個互斥鎖
struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
int nready;
} nready = {
PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER
};
//這樣對結構體初始化
//都要加上shared字首,不要忘了!!!
//還忘記寫迴圈了,沒有迴圈放元素
void * create_producer1()
{
for(;;)
{
pthread_mutex_lock(&put.lock);
if(put.key >= MAXNITEMS)
{
pthread_mutex_unlock(&put.lock);//這裡為shared.lock還要加&符號
printf("producer1: my function is over~\n");
pthread_exit(NULL);
}
buff[put.key] = put.value;
printf("producer1 have put the data: %d\n",put.value);
put.key ++;
put.value ++;
pthread_mutex_unlock(&put.lock);
pthread_mutex_lock(&nready.mutex);
if(nready.nready == 0)
pthread_cond_signal(&nready.cond);
nready.nready++;
pthread_mutex_unlock(&nready.mutex);
}
}
void * create_producer2()
{
for(;;)
{
pthread_mutex_lock(&put.lock);
if(put.key >= MAXNITEMS)
{
pthread_mutex_unlock(&put.lock);//這裡為shared.lock還要加&符號
printf("producer2: my function is over~\n");
pthread_exit(NULL);
}
buff[put.key] = put.value;
printf("producer2 have put the data: %d\n",put.value);
put.key ++;
put.value ++;
pthread_mutex_unlock(&put.lock);
pthread_mutex_lock(&nready.mutex);
if(nready.nready == 0)
pthread_cond_signal(&nready.cond);
nready.nready++;
pthread_mutex_unlock(&nready.mutex);
}
}
void * create_consumer()
{
for(int i = 0;i < 10;i++)
{
pthread_mutex_lock(&nready.mutex);
while(nready.nready == 0)
pthread_cond_wait(&nready.cond,&nready.mutex);
nready.nready -- ;
pthread_mutex_unlock(&nready.mutex);
if(buff[i] != i)
{
printf("buff[%d] = %d was wrong!\n", i ,buff[i]);
}
}
printf("All have scanned!\n");
return (NULL);
}
int main(int argc, char const *argv[])
{
pthread_t tid1,tid2;
pthread_t consumer_tid;
printf("I will create two producers and one consumers\n");
if(pthread_create(&tid1,NULL,create_producer1,NULL) != 0)
printf("create producer1 failed!\n");
else
printf("producer1 has been create!\n");
if(pthread_create(&tid2,NULL,create_producer2,NULL) != 0)
printf("create producer2 failed!\n");
else
printf("producer2 has been create!\n");
if(pthread_create(&consumer_tid,NULL,create_consumer,NULL) != 0)
printf("create consumer failed!\n");
else
printf("consumer has been create and begin check out the buff...\n");
//怎樣等待執行緒結束呢。好像就直接這樣接著寫結束
pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
pthread_join(consumer_tid,NULL);
return 0;
}
執行截圖為: