1. 程式人生 > >多執行緒實驗之”生產者——消費者“問題

多執行緒實驗之”生產者——消費者“問題

一 例項

  參考書籍《從實踐中學linux應用程式開發》

/*producer-customer.c*/
#include <stdio.h>  
#include <pthread.h>  
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <semaphore.h>
#include <unistd.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/ipc.h>

#define MYFIFO "myfifo" /*緩衝區有名管道的名字*/

#define BUFF_SIZE 3 /*緩衝區的單元數*/
#define UNIT_SIZE 6 /*每個單元的大小*/

#define DELAY_TIME_LEVELS 5.0 /*小任務之間的最大時間間隔*/
#define RUN_TIME 30 /*執行時間*/

sem_t mutex;  //mutex訊號量用於解決兩個執行緒之間的互斥問題
sem_t full;  //表示有界緩衝區中的非空單元數
sem_t avail; //表示有界緩衝區中的空單元數

int fd;
time_t end_time;

/*生產者執行緒*/
void *producer(void *arg)
{
   int delay_time=0;
   int real_write=0;
   int temp;
   
   while(time(NULL) < end_time)
   {
   	/*RAND_MAX 是 <stdlib.h> 中偽隨機數生成函式 rand 所能返回的最大數值*/
        delay_time=(int)(rand()* DELAY_TIME_LEVELS/(RAND_MAX))+1;
 
        sleep(delay_time);
              
        
        /*P操作訊號量avail和mutex*/
        /*P操作使sem減1*/
        temp=sem_wait(&avail);
        temp+=sem_wait(&mutex);
        printf("\nproducer:delay=%d\n",delay_time);  
        if(temp !=0 )
        {
            printf("sem_wait error\n");
            break;	
        }
        
        /*生產者寫入資料*/
        if( (real_write=write(fd,"hello",UNIT_SIZE)) == -1 )
        {
            /*寫入資料失敗*/  /*EAGAIN : Resource temporarily unavailable*/
            if(errno==EAGAIN)
            {
                printf("the fifo has not been read yet,please try later\n");		
            }
           
        }
        else
        {
            printf("write %d to the fifo\n",real_write);	
        }	
        
        /*V操作訊號量full和mutex*/
        /*V操作相當於加1*/
        sem_post(&full);
        sem_post(&mutex);  	
   } 
   pthread_exit(NULL);		
}


/*消費者程序*/
void *customer(void *arg)
{
    int delay_time=0;
    unsigned char read_buffer[UNIT_SIZE];
    int real_read;
    while(time(NULL)< end_time)
    {
    	/*RAND_MAX 是 <stdlib.h> 中偽隨機數生成函式 rand 所能返回的最大數值*/
        delay_time=(int)(rand()* DELAY_TIME_LEVELS/(RAND_MAX)/2.0)+1;
        sleep(delay_time);
    	      
    	 /*P操作訊號量full和mutex*/
        /*P操作相當於加1*/
    	sem_wait(&full);
        sem_wait(&mutex);
        printf("\ncustomer:delay=%d\n",delay_time);  
        memset(read_buffer,0,UNIT_SIZE);
        
        /*消費者讀取資料*/
        if((real_read=read(fd,read_buffer,UNIT_SIZE)) == -1)
        {
            /*讀取資料失敗*/
            if(errno == EAGAIN)
            {
                printf("no data yet\n");	
            }
                             	
        }
        
        printf("read %s from fifo\n",read_buffer);
        
        /*V操作對avail和mutex*/
        sem_post(&avail);
        sem_post(&mutex); 	
    }
    
    pthread_exit(NULL);		
}


int main()
{
    pthread_t thrd_prd_id,thrd_cst_id;
    pthread_t mon_th_id;
    int ret;
    
    srand(time(NULL));
    end_time=time(NULL)+RUN_TIME;
    
    /*建立有名管道*/
    /*****************
    函式原型:int mkfifo(const char * pathname,
                         mode_t mode);
    函式引數:pathname 要建立的管道
              mode O_RDONLY 讀管道
                   O_WRONLY 寫管道
                   O_RDWR   讀寫管道
                   O_CREAT  如果管道不存在,那麼建立一個新的檔案
                            並用第3個引數為其設定許可權
                   O_EXCL   如果使用O_CREAT時檔案存在,那麼返回錯誤資訊。
                            這個引數可以測試檔案是否存在
    *******************/
    if( ((mkfifo(MYFIFO,O_EXCL|O_CREAT)) < 0) && (errno != EEXIST))
    {
        printf("cannot creat fifo\n");
        return errno;	
    }
    else
    {
        printf("creat fifo success\n");	
    }
    
    /*開啟管道*/
    fd=open(MYFIFO,O_RDWR);
    if(fd==-1)
    {
        printf("open fifo error,fd=%d\n",fd);
        return fd;	    
    }
    else
    {
        printf("open fifo success,fd=%d\n",fd);	
    }
    
    /*初始化互斥訊號量為1*/
    /************************
    函式原型:int sem_init(sem_t *sem,int pshared,unsigned int value); 
    函式引數:sem  訊號量指標
              pshared  決定訊號量能否在幾個程序間共享
                       !!!由於目前linux還沒有實現程序間共享訊號量,
                          所以這個值只能是0,表示這個訊號量是當前程序的區域性訊號量                       
              value 訊號量初始化的值
    ***************************/
    ret=sem_init(&mutex,0,1);
    ret+=sem_init(&full,0,0);
    ret+=sem_init(&avail,0,BUFF_SIZE);
    
    if(ret != 0)
    {
        printf("any semaphore initialization failed\n");
        return ret;	
    }
    else
    {
        printf("any semaphore initialization success\n");	
    }
    
    /*建立兩個執行緒*/
    ret=pthread_create(&thrd_prd_id,NULL,producer,NULL);
    if(ret!=0)
    {
        printf("creat producer thread error\n");
        return ret;	
    }
    else
    {
        printf("creat producer thread success\n");	
    }
    
    ret=pthread_create(&thrd_cst_id,NULL,customer,NULL);
    if(ret!=0)
    {
        printf("creat customer thread error\n");
        return ret;	
    }
    else
    {
        printf("creat customer thread success\n");	
    }
    
    /*等待執行緒結束*/
    pthread_join(thrd_prd_id,NULL);
    pthread_join(thrd_cst_id,NULL);
    /*關閉檔案*/
    close(fd);
    
    unlink(MYFIFO);
    
    return 0;	
}

執行結果