1. 程式人生 > >Linux多執行緒學習(3)--POSIX訊號量及生產者消費者模型

Linux多執行緒學習(3)--POSIX訊號量及生產者消費者模型

Linux多執行緒學習總結

一.生產者-消費者模型

1. 什麼是生產者-消費者模型

線上程的同步和互斥中,有一個生產者-消費者模型

的經典問題,它也稱為有界緩衝區。兩個程序共享一個公共的固定大小的緩衝區。其中一個是生產者,將資訊放入緩衝區,另一個是消費者,從緩衝區中取資訊。既一個或多個生產者(執行緒或程序)建立著一些資料放入緩衝區,然後這些資料由一個或多個消費者(執行緒或程序)處理,從緩衝區拿走。資料在生產者和消費者之間可以通過某種IPC傳遞。

2.生產者-消費者模型的三種關係

在這裡插入圖片描述

  • 生產者-生產者:互斥
  • 生產者-消費者:同步和互斥
  • 消費者-消費者:互斥

3.基於BlockQueue實現生產者-消費者模型

生產者-消費者模型問題的關鍵在於緩衝區已滿,而此時生產者還想往其中放入一個新的資料

的情況。其解決辦法是讓生產者睡眠,待消費者從緩衝區中取出一個或多個數據時再喚醒它。同樣的, 當消費者試圖從緩衝區中取資料而發現緩衝區空時,消費者就睡眠,直到生產者向其中放一些資料後再將其喚醒

//cp.hpp
#ifndef __CP_HPP__
#define __CP_HPP__ 

#include <iostream>
#include <queue>
#include <mutex>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include
<pthread.h>
using namespace std; template<class T> class BlockQueue{ private: //給佇列上鎖 void LockQueue() { pthread_mutex_lock(&lock); } //解鎖佇列 void UnlockQueue() { pthread_mutex_unlock(&lock); } //判斷佇列是否滿 bool IsFull() { return q.size() == (unsigned int)cap ? true : false; } //判斷杜烈是否為空 bool IsEmpty() { return q.size() == 0 ? true : false; } //生產者在條件變數下阻塞等待 void ProductWait() { pthread_cond_wait(&p_cond, &lock); } //消費者在條件變數下阻塞等待 void ConsumeWait() { pthread_cond_wait(&c_cond, &lock); } //使生產者的條件得到滿足,喚醒該執行緒 void SignalProduct() { pthread_cond_signal(&p_cond); } //使消費者的條件得到滿足,喚醒該執行緒 void SignalConsume() { pthread_cond_signal(&c_cond); } //判斷是否達到高水位線(達到2/3佇列的長度既通知消費者來消費) bool IsHighLine() { return q.size() > (unsigned int)high_line ? true : false; } //判斷是否達到低水位線(低於到1/3佇列的長度既通知生產者來消費) bool IsLowLine() { return q.size() < (unsigned int)low_line ? true : false; } public: //建構函式 BlockQueue(const int& _cap) :cap(_cap) ,high_line((_cap*2)/3) ,low_line((_cap*1)/3) { pthread_mutex_init(&lock, NULL); pthread_cond_init(&p_cond, NULL); pthread_cond_init(&c_cond, NULL); } //生產者向緩衝區放資料(同步與互斥) void PushData(int& data) { LockQueue();//互斥鎖實現互斥 while(IsFull())//使用while進行二次判斷,防止假喚醒 { ProductWait(); } //高於水位線則通知消費者來消費 if(IsHighLine()) { SignalConsume();//條件變數實現同步 } q.push(data); UnlockQueue(); } void PopData(int& data)//輸出型引數,通過引數帶回返回值 { LockQueue(); //使用while進行二次判斷,防止假喚醒 while(IsEmpty()) { ConsumeWait(); } //低於水位線則通知生產者來生成 if(IsLowLine()) { SignalProduct(); } data = q.front(); q.pop(); UnlockQueue(); } //解構函式,銷燬互斥量和條件變數 ~BlockQueue() { pthread_mutex_destroy(&lock); pthread_cond_destroy(&p_cond); pthread_cond_destroy(&c_cond); } private: queue<T> q; int cap;//阻塞佇列的最大長度 int high_line;//高水位線 int low_line;//低水位線 pthread_mutex_t lock; pthread_cond_t p_cond;//隊空,生產者生產 pthread_cond_t c_cond;//隊滿,消費者消費 }; #endif //__CP_HPP__ //cp.cc #include "cp.hpp" const int num = 10;//阻塞佇列的最大長度 void *consume_routine(void *arg) { BlockQueue<int>* pbq = (BlockQueue<int>*)arg; int data; for(;;) { pbq -> PopData(data); cout << "consume done, data is " << data << endl; sleep(1); } } void *product_routine(void *arg) { BlockQueue<int>* pbq = (BlockQueue<int>*)arg; srand((unsigned long)time(NULL)); for(;;) { int data = rand() % 100 + 1; pbq -> PushData(data); cout << "product done,data is " << data << endl; sleep(1); } } int main() { BlockQueue<int> *pbq = new BlockQueue<int>(num); pthread_t p,c;//product(生產者)、consume(消費者) pthread_create(&c, NULL, consume_routine, (void*)pbq); pthread_create(&p, NULL, product_routine, (void*)pbq); //等待執行緒終止 pthread_join(p, NULL); pthread_join(c, NULL); //釋放開闢在棧上的佇列 delete pbq; return 0; }

上邊的阻塞佇列實現的生產者-消費者模型只實現了生產者-消費者之間的同步與互斥關係,如果要實現生產者和生產者、消費者和消費者之間的互斥關係,還應該增加一些生產者和消費者,並且在生產和消費的時候加上互斥鎖,即可實現他們之間的互斥關係。
注:上述問題的實現程式碼位於:https://github.com/hansionz/Linux_Code/tree/master/pthread/cp

二.POSIX訊號量

1.什麼是POSIX訊號量

在學習程序間通訊的時候,接觸學習過SystemV版本的訊號量,一個二元訊號量就相當於一把互斥鎖。而POSIX訊號量和SystemV訊號量作用是相同的,都是用來實現程序和執行緒間同步操作的,從而可以達到無衝突的共享資源的目的。它們兩個的區別在於SystemV版本的訊號量在核心中維護,而POSIX訊號量存放在共享記憶體區。

2.初始化訊號量

訊號量是一個sem_t型別的變數,要使用POSIX訊號量必須引入標頭檔案#include <semaphore.h>,在使用之前一定要初始化。

int sem_init(sem_t *sem, int pshared, unsigned int value); 
引數:    
	pshared:0表⽰執行緒間共享,非0表示程序間共享    
	value:訊號量初始值

3. 銷燬訊號量

int sem_destroy(sem_t *sem);
引數: 
	sem:代表要銷燬訊號的地址

4. 等待訊號量

等待訊號量相對SystemV版本訊號量的P操作

功能:等待訊號量,會將訊號量的值減1 
int sem_wait(sem_t *sem);

5.釋出訊號量

釋出訊號量相對於SystemV版本訊號量的V操作

功能:釋出訊號量,表示資源使⽤用完畢,可以歸還資源了。將訊號量值加1int sem_post(sem_t *sem);

6.基於POSIX訊號量實現的環形佇列模擬的生產者-消費者模型

在這裡插入圖片描述

//cp.hpp
#ifndef __CP_HPP__
#define __CP_HPP__ 

#include <iostream>
#include <queue>
#include <mutex>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>

using namespace std;

template<class T>
class RingQueue{
private:
 //等待訊號量,訊號量減1
 void P(sem_t& sem)
 {
   sem_wait(&sem);
 }
 //釋出訊號量,訊號量加1
 void V(sem_t& sem)
 {
   sem_post(&sem);
 }
public:
  //建構函式,在構造中必須先將vector初始化一定長度代表空格訊號量的個數
  RingQueue(int cap)
    :_cap(cap)
    ,ring(cap)
  {
    c_step = p_step = 0;
    sem_init(&blank_sem, 0, _cap);
    //資料訊號量初始為0
    sem_init(&data_sem, 0, 0);
  }
  //向佇列中入一個數據,要保證同步與互斥
  void PushData(const T& data)
  {
    P(blank_sem);

    ring[p_step] = data;
    p_step++;
    p_step %= _cap;

    V(data_sem);
  }
  //從迴圈隊列出一個數據
  void PopData(int& data)
  {
    P(data_sem);

    data = ring[c_step];
    c_step++;
    c_step %= _cap;

    V(blank_sem);
  }
  ~RingQueue()
  {
    sem_destroy(&blank_sem);
    sem_destroy(&data_sem);
  }
private:
  vector<T> ring; //vector模擬實現環形佇列
  int _cap; //佇列的長度 

  sem_t blank_sem;
  sem_t data_sem;

  int c_step;//消費者步伐
  int p_step;//生產者步伐
};

#endif //__CP_HPP__
//cp.cc
#include "cp.hpp"
const int num = 10; //空格訊號量個數

void *consume_routine(void *prq)
{
  RingQueue<int>* q = (RingQueue<int>*)prq;
  int data;
  for(;;)
  {
    q -> PopData(data);
    cout << "consume done,data is:" << data << endl;
  }
}
void *product_routine(void *prq)
{
  RingQueue<int>* q = (RingQueue<int>*)prq;
  srand((unsigned long)time(NULL));
  for(;;)
  {
    int data = rand() % 100 + 1;
    q -> PushData(data);
    cout << "product done,data is:" << data << endl;
    sleep(1);
  }
}
int main()
{
  RingQueue<int>* prq = new RingQueue<int>(num);
  pthread_t p,c;

  pthread_create(&p, NULL, product_routine, (void*)prq);
  pthread_create(&c, NULL, consume_routine, (void*)prq);

  pthread_join(p, NULL);
  pthread_join(c, NULL);

  delete prq;
  prq = nullptr;

  return 0;
}

注:程式碼於:https://github.com/hansionz/Linux_Code/tree/master/pthread/ring_cp