Linux多執行緒學習(3)--POSIX訊號量及生產者消費者模型
阿新 • • 發佈:2019-01-13
Linux多執行緒學習總結
一.生產者-消費者模型
1. 什麼是生產者-消費者模型
線上程的同步和互斥
中,有一個生產者-消費者模型
有界緩衝區
。兩個程序共享一個公共的固定大小
的緩衝區。其中一個是生產者
,將資訊放入緩衝區
,另一個是消費者
,從緩衝區中取資訊
。既一個或多個生產者(執行緒或程序)
建立著一些資料
放入緩衝區,然後這些資料由一個或多個消費者(執行緒或程序)
處理,從緩衝區拿走。資料在生產者和消費者
之間可以通過某種IPC
傳遞。
2.生產者-消費者模型的三種關係
生產者-生產者
:互斥生產者-消費者
:同步和互斥消費者-消費者
:互斥
3.基於BlockQueue實現生產者-消費者模型
生產者-消費者模型
問題的關鍵在於緩衝區已滿
,而此時生產者
還想往其中放入一個新的資料
生產者睡眠
,待消費者從緩衝區
中取出一個或多個數據
時再喚醒
它。同樣的, 當消費者
試圖從緩衝區中取資料
而發現緩衝區空
時,消費者就睡眠
,直到生產者
向其中放一些資料
後再將其喚醒
。
//cp.hpp
#ifndef __CP_HPP__
#define __CP_HPP__
#include <iostream>
#include <queue>
#include <mutex>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <pthread.h>
using namespace std;
template<class T>
class BlockQueue{
private:
//給佇列上鎖
void LockQueue()
{
pthread_mutex_lock(&lock);
}
//解鎖佇列
void UnlockQueue()
{
pthread_mutex_unlock(&lock);
}
//判斷佇列是否滿
bool IsFull()
{
return q.size() == (unsigned int)cap ? true : false;
}
//判斷杜烈是否為空
bool IsEmpty()
{
return q.size() == 0 ? true : false;
}
//生產者在條件變數下阻塞等待
void ProductWait()
{
pthread_cond_wait(&p_cond, &lock);
}
//消費者在條件變數下阻塞等待
void ConsumeWait()
{
pthread_cond_wait(&c_cond, &lock);
}
//使生產者的條件得到滿足,喚醒該執行緒
void SignalProduct()
{
pthread_cond_signal(&p_cond);
}
//使消費者的條件得到滿足,喚醒該執行緒
void SignalConsume()
{
pthread_cond_signal(&c_cond);
}
//判斷是否達到高水位線(達到2/3佇列的長度既通知消費者來消費)
bool IsHighLine()
{
return q.size() > (unsigned int)high_line ? true : false;
}
//判斷是否達到低水位線(低於到1/3佇列的長度既通知生產者來消費)
bool IsLowLine()
{
return q.size() < (unsigned int)low_line ? true : false;
}
public:
//建構函式
BlockQueue(const int& _cap)
:cap(_cap)
,high_line((_cap*2)/3)
,low_line((_cap*1)/3)
{
pthread_mutex_init(&lock, NULL);
pthread_cond_init(&p_cond, NULL);
pthread_cond_init(&c_cond, NULL);
}
//生產者向緩衝區放資料(同步與互斥)
void PushData(int& data)
{
LockQueue();//互斥鎖實現互斥
while(IsFull())//使用while進行二次判斷,防止假喚醒
{
ProductWait();
}
//高於水位線則通知消費者來消費
if(IsHighLine())
{
SignalConsume();//條件變數實現同步
}
q.push(data);
UnlockQueue();
}
void PopData(int& data)//輸出型引數,通過引數帶回返回值
{
LockQueue();
//使用while進行二次判斷,防止假喚醒
while(IsEmpty())
{
ConsumeWait();
}
//低於水位線則通知生產者來生成
if(IsLowLine())
{
SignalProduct();
}
data = q.front();
q.pop();
UnlockQueue();
}
//解構函式,銷燬互斥量和條件變數
~BlockQueue()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&p_cond);
pthread_cond_destroy(&c_cond);
}
private:
queue<T> q;
int cap;//阻塞佇列的最大長度
int high_line;//高水位線
int low_line;//低水位線
pthread_mutex_t lock;
pthread_cond_t p_cond;//隊空,生產者生產
pthread_cond_t c_cond;//隊滿,消費者消費
};
#endif //__CP_HPP__
//cp.cc
#include "cp.hpp"
const int num = 10;//阻塞佇列的最大長度
void *consume_routine(void *arg)
{
BlockQueue<int>* pbq = (BlockQueue<int>*)arg;
int data;
for(;;)
{
pbq -> PopData(data);
cout << "consume done, data is " << data << endl;
sleep(1);
}
}
void *product_routine(void *arg)
{
BlockQueue<int>* pbq = (BlockQueue<int>*)arg;
srand((unsigned long)time(NULL));
for(;;)
{
int data = rand() % 100 + 1;
pbq -> PushData(data);
cout << "product done,data is " << data << endl;
sleep(1);
}
}
int main()
{
BlockQueue<int> *pbq = new BlockQueue<int>(num);
pthread_t p,c;//product(生產者)、consume(消費者)
pthread_create(&c, NULL, consume_routine, (void*)pbq);
pthread_create(&p, NULL, product_routine, (void*)pbq);
//等待執行緒終止
pthread_join(p, NULL);
pthread_join(c, NULL);
//釋放開闢在棧上的佇列
delete pbq;
return 0;
}
上邊的阻塞佇列
實現的生產者-消費者
模型只實現了生產者-消費者
之間的同步與互斥
關係,如果要實現生產者和生產者、消費者和消費者
之間的互斥
關係,還應該增加一些生產者和消費者
,並且在生產和消費的時候加上互斥鎖
,即可實現他們之間的互斥
關係。
注:上述問題的實現程式碼位於:https://github.com/hansionz/Linux_Code/tree/master/pthread/cp
二.POSIX訊號量
1.什麼是POSIX訊號量
在學習程序間通訊
的時候,接觸學習過SystemV版本
的訊號量,一個二元訊號量就相當於一把互斥鎖。而POSIX
訊號量和SystemV
訊號量作用是相同的,都是用來實現程序和執行緒間同步操作的,從而可以達到無衝突的共享資源
的目的。它們兩個的區別在於SystemV版本
的訊號量在核心中維護,而POSIX訊號量
存放在共享記憶體區。
2.初始化訊號量
訊號量是一個sem_t
型別的變數,要使用POSIX訊號量必須引入標頭檔案#include <semaphore.h>
,在使用之前一定要初始化。
int sem_init(sem_t *sem, int pshared, unsigned int value);
引數:
pshared:0表⽰執行緒間共享,非0表示程序間共享
value:訊號量初始值
3. 銷燬訊號量
int sem_destroy(sem_t *sem);
引數:
sem:代表要銷燬訊號的地址
4. 等待訊號量
等待訊號量相對SystemV版本
訊號量的P操作
。
功能:等待訊號量,會將訊號量的值減1
int sem_wait(sem_t *sem);
5.釋出訊號量
釋出訊號量相對於SystemV版本
訊號量的V操作
。
功能:釋出訊號量,表示資源使⽤用完畢,可以歸還資源了。將訊號量值加1。
int sem_post(sem_t *sem);
6.基於POSIX訊號量實現的環形佇列模擬的生產者-消費者模型
//cp.hpp
#ifndef __CP_HPP__
#define __CP_HPP__
#include <iostream>
#include <queue>
#include <mutex>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
using namespace std;
template<class T>
class RingQueue{
private:
//等待訊號量,訊號量減1
void P(sem_t& sem)
{
sem_wait(&sem);
}
//釋出訊號量,訊號量加1
void V(sem_t& sem)
{
sem_post(&sem);
}
public:
//建構函式,在構造中必須先將vector初始化一定長度代表空格訊號量的個數
RingQueue(int cap)
:_cap(cap)
,ring(cap)
{
c_step = p_step = 0;
sem_init(&blank_sem, 0, _cap);
//資料訊號量初始為0
sem_init(&data_sem, 0, 0);
}
//向佇列中入一個數據,要保證同步與互斥
void PushData(const T& data)
{
P(blank_sem);
ring[p_step] = data;
p_step++;
p_step %= _cap;
V(data_sem);
}
//從迴圈隊列出一個數據
void PopData(int& data)
{
P(data_sem);
data = ring[c_step];
c_step++;
c_step %= _cap;
V(blank_sem);
}
~RingQueue()
{
sem_destroy(&blank_sem);
sem_destroy(&data_sem);
}
private:
vector<T> ring; //vector模擬實現環形佇列
int _cap; //佇列的長度
sem_t blank_sem;
sem_t data_sem;
int c_step;//消費者步伐
int p_step;//生產者步伐
};
#endif //__CP_HPP__
//cp.cc
#include "cp.hpp"
const int num = 10; //空格訊號量個數
void *consume_routine(void *prq)
{
RingQueue<int>* q = (RingQueue<int>*)prq;
int data;
for(;;)
{
q -> PopData(data);
cout << "consume done,data is:" << data << endl;
}
}
void *product_routine(void *prq)
{
RingQueue<int>* q = (RingQueue<int>*)prq;
srand((unsigned long)time(NULL));
for(;;)
{
int data = rand() % 100 + 1;
q -> PushData(data);
cout << "product done,data is:" << data << endl;
sleep(1);
}
}
int main()
{
RingQueue<int>* prq = new RingQueue<int>(num);
pthread_t p,c;
pthread_create(&p, NULL, product_routine, (void*)prq);
pthread_create(&c, NULL, consume_routine, (void*)prq);
pthread_join(p, NULL);
pthread_join(c, NULL);
delete prq;
prq = nullptr;
return 0;
}
注:程式碼於:https://github.com/hansionz/Linux_Code/tree/master/pthread/ring_cp