1. 程式人生 > >C++並行程式設計(二): 利用C++標準庫實現Semaphore訊號量

C++並行程式設計(二): 利用C++標準庫實現Semaphore訊號量

    在上一節中,我們使用C++11標準庫中的提供的條件變數以及互斥變數封裝實現了兩個仿Windows核心事件類:ManualResetEvent和AutoResetEvent。在這一節中,我們將繼續使用標準庫中提供的類來實現高仿訊號量Semaphore。而後文的程式碼中,有使用到上一節程式碼中定義的某些抽象基類,看不懂的讀者們可以查閱上一章《C++並行程式設計(一): 利用C++標準庫實現仿Windows核心事件物件》。

    Semaphore訊號量機制在Windows和Linux中都有提供,作為作業系統最重要的一種執行緒間同步機制之一。在我個人的程式設計生涯中,Semaphore最多的使用場景是用於實現生產者-消費者佇列

,這種多執行緒演算法在各種後臺伺服器應用程式中使用極廣,在C#中,它是BlockingCollection;在Java中,它是BlockingList(但願我沒有記錯名字);而在C++的標準庫中,它還沒有實現。言歸正題,我們這一節是要基於C++11標準庫中的mutex和condition_variable來實現Semaphore,這樣做最大的好處是你無需呼叫作業系統相關的API而能直接寫出跨平臺程式碼,但壞處是,它的效能肯定比不上你直接調系統API。

    下面的程式碼和上一節中的程式碼是同一個專案中的程式碼,並且引用到了上一節的程式碼,大家可以把它們合併到一起。廢話少說,上乾貨!(本文所有程式碼在VS2017下驗證通過)

Semaphore.h

和上一節的設計一樣,我們先定義一組無實現的抽象介面,抽象介面中提供靜態方法CreateSemaphore建立實體物件。

// Semaphore.h
#pragma once
#include "Event.h"

namespace Threads
{
	// 訊號量
	class Semaphore : WaitHandle
	{
	public:
		// 無上限
		static const int Unlimited = -1;

		// 建立訊號量
		static unique_ptr<Semaphore> CreateSemaphore(int initialCount, int maxCount = Unlimited);

	public:
		// 等待一個訊號量
		virtual void Wait() = 0;

		// 限時等待訊號量
		virtual bool Wait(const milliseconds& timeout) = 0;

		// 釋放訊號量
		virtual void Release() = 0;

		// 釋放訊號量指定次數
		virtual void Release(int releaseCount) = 0;

		// 獲取剩餘訊號量數量.
		virtual int GetCurrentCount() const = 0;

		// 釋放訊號量物件
		virtual void Close() = 0;
	};

}

SemaphoreImpl.h

接下來,我們定義實現類,原則是,使用者不應該直接使用這個類。

#pragma once
#include "Semaphore.h"
#include <mutex>
#include <atomic>
#include <condition_variable>

namespace Threads
{
	// 訊號量實現類
	class SemaphoreImpl : public Semaphore
	{
		mutex m_mutex;
		condition_variable m_cond;
		atomic<bool> m_bDisposed;
		volatile int m_currentCount;
		int m_maxCount;

	public:
		SemaphoreImpl(int initialCount, int maxCount);

		// 等待一個訊號量
		virtual void Wait();

		// 限時等待訊號量
		virtual bool Wait(const milliseconds& timeout);

		// 釋放訊號量
		virtual void Release();

		// 釋放訊號量指定次數
		virtual void Release(int releaseCount);

		// 獲取剩餘訊號量數量.
		virtual int GetCurrentCount() const;

		// 釋放訊號量物件
		virtual void Close();

	private:
		void CheckDisposed()const;
	};

}

Semaphore.cpp

這個抽象類中唯一需要實現是靜態方法CreateSemaphore,用以建立實體物件,並返回其唯一引用。

#include "Semaphore.h"
#include "SemaphoreImpl.h"
using namespace Threads;

unique_ptr<Semaphore> Semaphore::CreateSemaphore(int initialCount, int maxCount)
{
	if (initialCount < 0)
	{
		throw out_of_range("initialCount < 0");
	}
	if (maxCount != Unlimited && (maxCount < initialCount || maxCount < 1))
	{
		throw out_of_range("maxCount < initialCount || maxCount < 1");
	}
	return unique_ptr<Semaphore>(new SemaphoreImpl(initialCount, maxCount));
}

SemaphoreImpl.cpp

乾貨滿滿的時間到了! 

// SemaphoreImpl.cpp
#include "SemaphoreImpl.h"
using namespace Threads;

SemaphoreImpl::SemaphoreImpl(int initialCount, int maxCount)
	:m_currentCount(initialCount), m_maxCount(maxCount),
	 m_bDisposed(false)
{
}

void Threads::SemaphoreImpl::Wait()
{
	unique_lock<mutex> lock(m_mutex);
	m_cond.wait(lock, [this] {return GetCurrentCount() > 0; });
	--m_currentCount;
}

bool Threads::SemaphoreImpl::Wait(const milliseconds & timeout)
{
	unique_lock<mutex> lock(m_mutex);
	if (m_cond.wait_for(lock, timeout, [this] {return GetCurrentCount() > 0; }))
	{
		--m_currentCount;
		return true;
	}
	return false;
}

void Threads::SemaphoreImpl::Release()
{
	Release(1);
}

void Threads::SemaphoreImpl::Release(int releaseCount)
{
	if (releaseCount < 1)
	{
		throw invalid_argument("releaseCount < 1");
	}

	lock_guard<mutex> lock(m_mutex);
	CheckDisposed();
	if (m_maxCount != Semaphore::Unlimited && GetCurrentCount() + releaseCount > m_maxCount)
	{
		throw invalid_argument("GetCurrentCount() + releaseCount > m_maxCount");
	}
	m_currentCount += releaseCount;
	do {
		m_cond.notify_one();
	} while (--releaseCount);
}

int Threads::SemaphoreImpl::GetCurrentCount() const
{
	CheckDisposed();
	return m_currentCount;
}

void SemaphoreImpl::Close()
{
	auto expected = false;
	if (!m_bDisposed.compare_exchange_weak(expected, true))
	{
		return;
	}
	lock_guard<mutex> lock(m_mutex);
	m_cond.notify_all();
}

void Threads::SemaphoreImpl::CheckDisposed() const
{
	if (m_bDisposed.load() == true)
	{
		throw logic_error("SemaphoreImpl Disposed.");
	}
}

//==================== 華麗的分割線 ====================

至此,一個高仿的Semaphore就已經實現完畢了,讀者們看懂的嗎?接下來,就該上示例Demo了:

main.cpp

#include <cstdio>
#include <stdlib.h>
#include <iostream>
#include <memory>
#include <thread>
#include "Semaphore.h"
using namespace std;
using namespace Threads;
using namespace chrono;

int main()
{
	auto sem = Semaphore::CreateSemaphore(0);
	thread t1([&] {
		try
		{
			while (true)
			{
				// 等待訊號量
				sem->Wait();

				cout << "[Thread1] Wake up!" << endl;
			}
		}
		catch (const std::logic_error& e) // 訊號量被銷燬
		{
			cerr << "[Thread1] : " << e.what() << endl;
		}
	});

	thread t2([&] {
		try
		{
			while (true)
			{
				if (!sem->Wait(duration_cast<milliseconds>(seconds(5))))
				{
					cout << "[Thread2] Time out!" << endl;
					continue;
				}
				cout << "[Thread2] Wake up!" << endl;
			}
		}
		catch (const std::logic_error& e) // 訊號量被銷燬
		{
			cerr << "[Thread2] : " << e.what() << endl;
		}
	});

	thread t3([&] {
		try
		{
			while (true)
			{
				sem->Wait();
				cout << "[Thread3] Wake up!" << endl;
			}
		}
		catch (const std::logic_error& e) // 訊號量被銷燬
		{
			cerr << "[Thread3] : " << e.what() << endl;
		}
	});

	while (true)
	{
		int input = 0;
		cout << "Please input an integer(-1 to exit): " << endl;
		cin >> input;

		if (input < 0)
		{
			sem->Close();
			break;
		}
		
		sem->Release(max(input % 5, 1));
	}
	t1.join();
	t2.join();
	t3.join();
	return 0;
}

上述示例演示了訊號量的等待、限時等待以及釋放中斷等幾種情況,並且在Windows和Linux下測試通過。