1. 程式人生 > >C++11 線程並發

C++11 線程並發

[] som 三種 有用 getline start current iba 返回值

並發 頭文件<future> <thread>

高級接口

async()、future<>


future<int> result1; //int為func1返回值
result1 = async(func1); //啟動func1,但有可能被推遲,直到調用get或wait


future<int> result1(async(func1));


result1.get();//獲得返回值

async()的參數可以是函數、成員函數、函數對象或lambda

async([]{})

//絕不推遲
future<int> result1 = async(lunch::async, func1);

//強制延緩,直到調用f.get()或f.wait()
future<int> f(async(lunch::deferred, func1));

auto f1 = sync(lunch::deferred, task1);
auto f2 = sync(lunch::deferred, task2);

auto val = b?f1.get():f2.get();


auto f1 = async(task1);

try
{
f1.get();
}
catch (exception* e)
{
cerr << "EXCEPTION: " << e->what() << endl;
}


一個future<>只能被調用get()一次,之後future就處於無效狀態,
這種狀態可調用valid()來檢測,

只要對future調用wait(),就可以強制啟動該future象征的線程並等待這一後臺操作終止
future<...> f(asybc(func));
...
f.wait();

f.wait_for(seconds(10));
f.wait_until(system_clock::now()+chrono::minutes(1));

不論wait_for()或wait_until()都可以返回一下三種東西之一:
1.future_status::deferred
如果async()延緩了操作而程序中又完全沒有調用wait()或get()
2.future_status::timeout
如果某個操作被異步啟動但尚未結束,而waiting又已逾期
3.future_status::ready
操作已完成


future<...> f(async(task));
如果在單線程環境,有可能延緩而沒有啟動
所以先檢查

if (f.wait_for(chrono::seconds(0)) != future_status::deferred){
while (f.wait_for(chrono::seconds(0)) != future_status::ready)){
...
this_thread::yeild();
//或this_thread::sleep_for(milliseconds(100));
}
}
...
auto r = f.get();

例子:

#include <chrono>
#include <iostream>
#include <future>
#include <thread>
#include <random>
#include <exception>

using namespace std;
using namespace std::chrono;

int doSomething(char c)
{
default_random_engine dre(c);
uniform_int_distribution<int> id(10, 1000);

for (int i = 0; i < 10; ++i)
{
this_thread::sleep_for(milliseconds(id(dre)));
cout.put(c).flush();
}

return c;
}

int main()
{
auto f1 = async([]{doSomething(‘.‘); });
auto f2 = async([]{doSomething(‘+‘); });

if (f1.wait_for(seconds(0)) != future_status::deferred ||
f2.wait_for(seconds(0)) != future_status::deferred)
{
while (f1.wait_for(seconds(0)) != future_status::ready &&//有一個線程完成就跳出循環
f2.wait_for(seconds(0)) != future_status::ready)
{
this_thread::yield();//提示重新安排到下一個線程
}
}
cout.put(‘\n‘).flush();

try {
f1.get();
f2.get();
}
catch (const exception& e){
cout << "\nException: " << e.what() << endl;
}
cout << "\ndone" << endl;

}


傳遞實參

上例使用了lambda並讓它調用後臺函數
auto f1 = async([]{doSomething(‘.‘); });

也可以傳遞async語句之前就存在的實參
char c = ‘@‘;
auto f1 = async([=]{doSomething(c); });

[=]表示傳遞給lambda的是c的拷貝

char c = ‘@‘;
//傳遞c的拷貝
auto f1 = async([=]{doSomething(c); });
auto f1 = async(doSomething, c);

//傳遞c的引用
auto f1 = async([&]{doSomething(c); });
auto f1 = async(doSomething, ref(c));


如果使用async,就應該以值傳遞方式來傳遞所有用來處理目標函數的必要object,
使async只需使用局部拷貝


調用成員函數
class X
{
public:
void mem(int num){ cout << "memfunc: " << num << endl; };
};

int main()
{
X x;
auto f = async(&X::mem, x, 42); //x.mem(42);
f.get();
}
傳給async一個成員函數的pointer,之後的第一個實參必須是個reference或pointer,指向某個object


多次處理
shared_future 可多次調用get()

int queryNum()
{
cout << "read number: ";
int num;
cin >> num;

if (!cin)
{
throw runtime_error("no number read");
}
return num;
}

void doSomething1(char c, shared_future<int> f)
{
try{
int num = f.get();

for (int i = 0; i < num; ++i)
{
this_thread::sleep_for(chrono::milliseconds(100));
cout.put(c).flush();
}
}
catch (const exception& e)
{
cerr << "EXCEPTION in thread " << this_thread::get_id() << ": " << e.what() << endl;
}
}

int main()
{
try{
shared_future<int> f = async(queryNum);

auto f1 = async(launch::async, doSomething1, ‘.‘, f);
auto f2 = async(launch::async, doSomething1, ‘+‘, f);
auto f3 = async(launch::async, doSomething1, ‘*‘, f);

f1.get();
f2.get();
f3.get();
}
catch (const exception& e){
cout << "\nEXCEPTION: " << e.what() << endl;
}
cout << "\ndone" << endl;
}


確保f的壽命不短於被啟動的線程

低層接口
Thread 和 Promise


std::thread

void doSomething();

thread t(doSomething); //啟動
...
t.join();//等待結束

t.detach();//卸載

cout << thread::hardware_concurrency() << endl; //CPU線程個數

和async相比
1.沒有所謂發射策略,試著將目標函數啟動與一個新線程中,如果無法做到會拋出system_error,並帶有錯誤碼resource_unavailable_try_again
2.沒有接口處理線程結果,唯一可獲得的是一個獨一無二的線程ID
3.如果發生異常,但未捕捉於線程之內,程序會立刻中止並調用terminate()
4.你必須聲明是否想要等待線程結束(join)
或打算將它自母體卸載使它運行與後臺而不受任何控制(detach)
如果你在thread object壽命結束前不這麽做,或如果它發生了一次move assignment,
程序會中止並調用terminate()
4.如果你讓線程運行於後臺而main結束了,所有線程會被魯莽而硬性地終止


Detached Thread(卸離後的線程)

一般性規則:
Detached Thread應該寧可只訪問local copy
。。。


Thread ID

this_thread::get_id()

thread t(doSomething2, 5, ‘.‘);
t.get_id()

thread::id(); //默認構造,生成一個獨一無二的ID用來表現 no thread

Promise

void doSomething3(promise<string>& p)
{
try{
cout << "read char (‘x‘ for exception): ";
char c = cin.get();
if (c == ‘x‘){
throw runtime_error(string("char ") + c + " read");
}
string s = string("char ") + c + " processed";
p.set_value(move(s));//存放一個值
//p.set_value_at_thread_exit(move(s));//線程結束時存放
}
catch (...){
p.set_exception(current_exception());//存放一個異常
//p.set_exception_at_thread_exit(current_exception());
}
}

int main()
{
try {
promise<string> p;
thread t(doSomething3, ref(p));//傳入一個promise引用
t.detach();

future<string> f(p.get_future());//get_future取出值

cout << "result: " << f.get() << endl;//取得存儲結果//get會停滯,直到p.set_value
}
catch (const exception& e){
cerr << "EXCEPTION: " << e.what() << endl;
}
catch (...){
cerr << "EXCEPTION " << endl;
}
}


this_thread

this_thread::get_id()
this_thread::sleep_for(duration)
this_thread::sleep_until(timepoint)
this_thread::yeild() //建議釋放控制以便重新調度,讓下一個線程能夠執行


當心 Concurrency (並發)

多個線程並發處理相同數據而又不曾同步化,那麽唯一安全的情況就是:所有線程只讀取數據

除非另有說明,c++標準庫提供的函數通常不支持讀或寫動作與另一個寫動作(寫至同一筆數據)並發執行

Mutex 和 Lock


mutex

mutex valMutex;


valMutex.lock();
++val;
valMutex.unlock();


lock_guard:

{
//lock、析構函數自動釋放lock(unlock) (加一個大括號使其更快釋放)
lock_guard<mutex> lg(valMutex);
++val;
}


//recursive_mutex 允許同一線程多次鎖定,並在最近一次(last)相應的unlock時釋放lock
recursive_mutex dbMutex;
lock_guard<recursive_mutex> lg(dbMutex);

嘗試性的Lock

mutex m;
//試圖獲得一個lock,成功返回true,為了仍能夠使用lock_guard,要額外傳入實參 adopt_lock
//有可能假性失敗,lock為被他人拿走也可能失敗返回false
while (m.try_lock() == false)
{
//doSomeOtherStuff();
}

lock_guard<mutex> lg1(m, adopt_lock);


timed_mutex m1; //recursive_timed_mutex
//if (m1.try_lock_until(system_clock::now() + chrono::seconds(1)))
if (m1.try_lock_for(chrono::seconds(1))){
lock_guard<timed_mutex> lg2(m1, adopt_lock);
}
else
{
//couldNotGetTheLock();
}


處理多個Lock,易發生互鎖,死鎖
使用全局函數lock解決

mutex

mutex m1;
mutex m2;
{
//lock會阻塞,直到所有mutex被鎖定或出現異常
lock(m1, m2);
//成功鎖定後使用lock_guard,以adopt_lock作為第二實參,確保這些mutex在離開作用域時解鎖
lock_guard<mutex> lockM2(m1, adopt_lock);
lock_guard<mutex> lockM3(m2, adopt_lock);
//...
}//自動unlock

//或者使用try_lock,取得所有lock時返回-1,否則返回第一失敗的lock的索引且其它成功的會unlock
int idx = try_lock(m1, m2);
if (idx < 0)
{
lock_guard<mutex> lockM2(m1, adopt_lock);
lock_guard<mutex> lockM3(m2, adopt_lock);
//...
}//自動unlock
else
{
cerr << "could not lock mutex m" << idx + 1 << endl;
}

//使用lock、try_lock後使用adopt_lock過繼給lock_guard才能自動解鎖


只調用一次

once_flag oc;
call_once(oc, initialize);//保證initialize函數只調用一次

once_flag oc1;
call_once(oc1, []{staticData = initializeStaticData(); });

class X
{
private:
mutable once_flag initDataFlag;
void initData()const;
public:
int GetData()const{
call_once(initDataFlag, &X::initData, this);
//...
}
};


Condition Variable(條件變量) <condition_variable>

future從某線程傳遞數據到另一線程只能一次,且future的主要目的是處理線程的返回值或異常

條件變量可用來同步化線程之間的數據流邏輯依賴關系


使用ready flag(一個bool變量)讓某線程等待另一線程是一個粗淺辦法
while (!readyFlag){
//...
this_thread::yeild();
}
消耗寶貴的CPU時間重復檢查flag,
此外很難找出適當的sleep周期,
2次檢查間隔太短則仍舊浪費CPU時間於檢查動作上,太長則會發生延誤

一個較好的做法是使用條件變量,使一個線程可以喚醒一或多個其他等待中的線程

包含<mutex> <condition_variable>

mutex readyMutex;
condition_variable readyCondVar;

//激發的條件終於滿足的線程(或多線程之一)必須調用
readyCondVar.notify_one();
//或
readyCondVar.notify_all();
//等待條件滿足的線程必須調用
unique_lock<mutex> l(readyMutex);
readyCondVar.wait(l);


但是有可能出現假醒,即wait在condition varibale尚未被notified時便返回
假醒無法被預測,實質上是隨機的
在wakeup之後你仍需要代碼去驗證條件實際已達成,
例如必須檢查數據是否真正備妥,或仍需要諸如ready flag之類的東西


#include <condition_variable>
#include <mutex>
#include <future>
#include <iostream>
using namespace std;

bool readyFlag; //表示條件真的滿足了
mutex readyMutex;
condition_variable readyCondVar;

void thread1()
{
cout << "<return>" << endl;
cin.get();
{
lock_guard<mutex> lg(readyMutex);
readyFlag = true;
}
readyCondVar.notify_one();
}

void thread2()
{
{//這裏必須使用unque_lock,不可使用lock_guard,因為wait的內部會明確地對mutex進行解鎖和鎖定
unique_lock<mutex> ul(readyMutex);
//lambda當做第二實參,用來檢查條件是否真的滿足,直到返回true
readyCondVar.wait(ul, []{return readyFlag; });
//相當於
//while (!readyFlag){
// readyCondVar.wait(ul);
//}
}
cout << "done" << endl;
}

int main()
{
auto f1 = async(launch::async, thread1);
auto f2 = async(launch::async, thread2);

f1.wait();
f2.wait();
}


例子

std::queue<int> queue1;//並發使用,被一個mutex和一個condition variable保護著
mutex queueMutex;
condition_variable queueCondVar;

void provider(int val)
{
for (int i = 0; i < 6; ++i)
{
{
lock_guard<mutex> ul(queueMutex);
queue1.push(val + i);
}
queueCondVar.notify_one();
this_thread::sleep_for(chrono::milliseconds(val));
}
}

void consumer(int num)
{
while (true)
{
int val;
{
unique_lock<mutex> ul(queueMutex);
queueCondVar.wait(ul, []{return !queue1.empty(); });
val = queue1.front();
queue1.pop();
cout << "consumer" << num << ": " << val << endl;

//使用wait_for wait_until則要判斷返回值
//if (queueCondVar.wait_for(ul, chrono::seconds(1), []{return !queue1.empty(); }))
//if (queueCondVar.wait_for(ul, chrono::seconds(1)) == cv_status::no_timeout)//這個沒有判斷readyFlag
//{
// val = queue1.front();
// queue1.pop();
// cout << "consumer" << num << ": " << val << endl;
//}
}

}
}


int main()
{
auto p1 = async(launch::async, provider, 100);
auto p2 = async(launch::async, provider, 300);
auto p3 = async(launch::async, provider, 500);

auto c1 = async(launch::async, consumer, 1);
auto c2 = async(launch::async, consumer, 2);

//c1.wait();
getchar();
exit(0);
}

atomic

即使基本數據類型,讀寫也不是atomic(不可切割的),readyFlag可能讀到被寫一半的bool
編譯器生成的代碼有可能改變操作次序

借由mutex可解決上述2個問題,但從必要的資源和潛藏的獨占訪問來看,
mutex也許是個相對昂貴的操作,所以也許值得以atomic取代mutex和lock


atomic<bool> readyFlag(false);

void thread1()
{
//...
readyFlag.store(true);//賦予一個新值
}

void thread2()
{
while (!readyFlag.load())//取當前值
{
this_thread::sleep_for(chrono::milliseconds(100));
}
//...
}


void provider()
{
while (true)
{
cout << "<return>" << endl;
char arr[100];
cin.getline(arr, 100);

data = 42;
readyFlag.store(true);
}
}

void consumer()
{
while (true)
{
while (!readyFlag.load()){
cout.put(‘.‘).flush();
this_thread::sleep_for(chrono::milliseconds(50));
}
cout << "\nvalue : " << data << endl;
readyFlag = false;
}

}

int main()
{
auto p = async(launch::async, provider);
auto c = async(launch::async, consumer);

c.wait();
}

atomic<int> ai(0);
int x = ai;
ai = 10;
ai++;
ai -= 17;

atomic<int> a;
atomic_init(&a, 0);//沒有初始化則使用這個函數進行初始化

#include "stdafx.h"

//#include <memory>
#include <chrono>
#include <ctime>
#include <string>
#include <iostream>
#include <future>
#include <thread>
#include <random>
#include <exception>
#include <vector>


using namespace std;
using namespace std::chrono;

int doSomething(/*const char&*/char c)
{
	default_random_engine dre(c);
	uniform_int_distribution<int> id(10, 1000);

	for (int i = 0; i < 10; ++i)
	{
		this_thread::sleep_for(milliseconds(id(dre)));
		cout.put(c).flush();
	}

	return c;
}

int func1()
{
	return doSomething(‘.‘);
}

int func2()
{
	return doSomething(‘+‘);
}
#include <list>
void task1()
{
	list<int> v;
	while (true)
	{
		for (int i = 0; i < 1000000; ++i)
		{
			v.push_back(i);
		}
		cout.put(‘.‘).flush();
	}
}

#if 0
int quickComputation();		//快速直接 quick and dirty
int accurateComputation();	//精確但是慢

future<int> f;

int bestResultInTime()
{
	auto tp = chrono::system_clock::now() + chrono::minutes(1);

	f = async(launch::async, accurateComputation);
	int guess = quickComputation();

	future_status s = f.wait_until(tp);	//等待

	if (s == future_status::ready)	//完成
	{
		return f.get();
	} 
	else
	{
		return guess;
	}
}
#endif 

#if 0
int main()
{
	//傳遞實參
	//auto f1 = async([]{ doSomething(‘.‘); });
	char c = ‘@‘;
	//傳遞c的拷貝
	//auto f1 = async([=]{ doSomething(c); });	
	//auto f1 = async(doSomething, c);

	//傳遞c的引用
	//auto f1 = async([&]{ doSomething(c); });//這個需要把doSomething(const char& c) ???
	auto f1 = async( doSomething, ref(c));


	auto f2 = async([]{ doSomething(‘+‘); });

	c = ‘_‘;

	if (f1.wait_for(seconds(0)) != future_status::deferred ||
		f2.wait_for(seconds(0)) != future_status::deferred)
	{
		while (f1.wait_for(seconds(0)) != future_status::ready &&	//有一個線程完成就跳出循環
				f2.wait_for(seconds(0)) != future_status::ready)
		{
			this_thread::yield();//提示重新安排到下一個線程
		}
	}
	
	cout.put(‘\n‘).flush();

	try {
		f1.get(); //一個future只能調用一次get,之後future處於無效狀態
		f2.get();
	}
	catch (const exception& e){
		cout << "\nException: " << e.what() << endl;
	}
	cout << "\ndone" << endl;

	//cout << f1._Get_value() << endl; //相當於
	cout << f1._Is_ready() << endl;
	cout << f1.valid() << endl;
}
#endif // 0

#if 0
class X
{
public:
	void mem(int num){ cout << "memfunc: " <<  num << endl; };
};

int main()
{
	X x;
	auto f = async(&X::mem, x, 42); //x.mem(42);
	f.get();
}
#endif // 0


#if 0

int queryNum()
{
	cout << "read number: ";
	int num;
	cin >> num;

	if (!cin)
	{
		throw runtime_error("no number read");
	}
	return num;
}

void doSomething1(char c, shared_future<int> f)
{
	try{
		int num = f.get();

		for (int i = 0; i < num; ++i)
		{
			this_thread::sleep_for(chrono::milliseconds(100));
			cout.put(c).flush();
		}
	}
	catch (const exception& e)
	{
		cerr << "EXCEPTION in thread " << this_thread::get_id() << ": " << e.what() << endl;
	}
}

int main()
{
	try{
		shared_future<int> f = async(queryNum);

		auto f1 = async(launch::async, doSomething1, ‘.‘, f);
		auto f2 = async(launch::async, doSomething1, ‘+‘, f);
		auto f3 = async(launch::async, doSomething1, ‘*‘, f);

		f1.get();
		f2.get();
		f3.get();
	}
	catch (const exception& e){
		cout << "\nEXCEPTION: " << e.what() << endl;
	}
	cout << "\ndone" << endl;
}
#endif // 0

#include <thread>

#if 0
void doSomething2(int num, char c)
{
	try {
		default_random_engine dre(42 * c);
		uniform_int_distribution<int> id(10, 1000);
		for (int i = 0; i < num; ++i)
		{
			this_thread::sleep_for(milliseconds(id(dre)));
			cout.put(c).flush();
		}
	}
	catch (...){
		cerr << "THREAD-EXCEPTION (thread " << this_thread::get_id() << ")" << endl;
	}
}

int main()
{
	try {
		thread t1(doSomething2, 5, ‘.‘);
		cout << "- start fg thread " << t1.get_id() << endl;

		for (int i = 0; i < 5; ++i)
		{
			thread t(doSomething2, 10, ‘a‘ + i);
			cout << "-detach start bg thread " << t.get_id() << endl;
			t.detach();
		}
		cin.get();
		cout << "- join fg thread " << t1.get_id() << endl;
		t1.join();
	}
	catch (const exception& e){
		cerr << "EXCEPTION: " << e.what() << endl;
	}
}
#endif // 0


#if 0
void doSomething3(promise<string>& p)
{
	try{
		cout << "read char (‘x‘ for exception): ";
		char c = cin.get();
		if (c == ‘x‘){
			throw runtime_error(string("char ") + c + " read");
		}
		string s = string("char ") + c + " processed";
		p.set_value(move(s));//存放一個值
		//p.set_value_at_thread_exit(move(s));//線程結束時存放
	}
	catch (...){
		p.set_exception(current_exception());//存放一個異常
		//p.set_exception_at_thread_exit(current_exception());
	}
}

int main()
{
	try {
		promise<string> p;
		thread t(doSomething3, ref(p));//傳入一個promise引用
		t.detach();

		future<string> f(p.get_future());//get_future取出值

		cout << "result: " << f.get() << endl;//取得存儲結果//get會停滯,直到p.set_value
	}
	catch (const exception& e){
		cerr << "EXCEPTION: " << e.what() << endl;
	}
	catch (...){
		cerr << "EXCEPTION " << endl;
	}
}
#endif // 0

mutex printMutex;
void print(const string& s)
{
	lock_guard<mutex> l(printMutex);
	for (char c : s)
	{
		cout.put(c);
	}
	cout << endl;
}

int initialize()
{
	return 0;
}
vector<string> initializeStaticData()
{
	vector<string> staticData;
	return staticData;
}

vector<string> staticData;

#if 0

int main()
{
 	auto f1 = async(launch::async, print, "Hello from a first thread");
 	auto f2 = async(launch::async, print, "Hello from a second thread");
 	print("Hello from the main thread");
 	
 	try {//確保mutex銷毀之前,f1,f2 線程完成
 		f1.wait();
 		f2.wait();
 	}
 	catch (...)
 	{
 	}

	//遞歸的 Lock 造成死鎖,在第二次lock拋出異常system_error,錯誤碼resource_deadlock_would_occur
	//lock_guard<mutex> l(printMutex);
	//print("Hello from the main thread");

	recursive_mutex dbMutex;
	lock_guard<recursive_mutex> lg(dbMutex);
	//recursive_mutex 允許同一線程多次鎖定,並在最近一次(last)相應的unlock時釋放lock


	mutex m;
	//試圖獲得一個lock,成功返回true,為了仍能夠使用lock_guard,要額外傳入實參 adopt_lock
	//有可能假性失敗,lock為被他人拿走也可能失敗返回false
	while (m.try_lock() == false)
	{
		//doSomeOtherStuff();
	}

	lock_guard<mutex> lg1(m, adopt_lock);


	timed_mutex mm;	//recursive_timed_mutex
	//if (m1.try_lock_until(system_clock::now() + chrono::seconds(1)))
	if (mm.try_lock_for(chrono::seconds(1))){
		lock_guard<timed_mutex> lg2(mm, adopt_lock);
	}
	else
	{
		//couldNotGetTheLock();
	}


	mutex m1;
	mutex m2;
	{
		//lock會阻塞,直到所有mutex被鎖定或出現異常
		lock(m1, m2);
		//成功鎖定後使用lock_guard,以adopt_lock作為第二實參,確保這些mutex在離開作用域時解鎖
		lock_guard<mutex> lockM2(m1, adopt_lock);
		lock_guard<mutex> lockM3(m2, adopt_lock);
		//...
	}//自動unlock

	//或者使用try_lock,取得所有lock時返回-1,否則返回第一失敗的lock的索引且其它成功的會unlock
	int idx = try_lock(m1, m2);
	if (idx < 0)
	{
		lock_guard<mutex> lockM2(m1, adopt_lock);
		lock_guard<mutex> lockM3(m2, adopt_lock);
		//...
	}//自動unlock
	else
	{
		cerr << "could not lock mutex m" << idx + 1 << endl;
	}

	//使用lock、try_lock後使用adopt_lock過繼給lock_guard才能自動解鎖

	//只調用一次
	once_flag oc;
	call_once(oc, initialize);//保證initialize函數只調用一次

	once_flag oc1;
	call_once(oc1, []{staticData = initializeStaticData(); });


}

class X
{
private:
	mutable once_flag initDataFlag;
	void initData()const;
public:
	int GetData()const{
		call_once(initDataFlag, &X::initData, this);
		//...
	}
};
#endif // _DEBUG

#if 0
//條件變量
#include <condition_variable>
//mutex readyMutex;
//condition_variable readyCondVar;
////激發的條件終於滿足的線程(或多線程之一)必須調用
//readyCondVar.notify_one();	//一個
////或
//readyCondVar.notify_all();	//所有
////等待條件滿足的線程必須調用
//unique_lock<mutex> l(readyMutex);
//readyCondVar.wait(l);

bool readyFlag; //表示條件真的滿足了
mutex readyMutex;
condition_variable readyCondVar;

void thread1()
{
	cout << "<return>" << endl;
	cin.get();
	{
		lock_guard<mutex> lg(readyMutex);
		readyFlag = true;
	}
	readyCondVar.notify_one();
}

void thread2()
{
	{//這裏必須使用unque_lock,不可使用lock_guard,因為wait的內部會明確地對mutex進行解鎖和鎖定
		unique_lock<mutex> ul(readyMutex);
		//lambda當做第二實參,用來檢查條件是否真的滿足,直到返回true
		readyCondVar.wait(ul, []{return readyFlag; });
		//相當於
		//while (!readyFlag){
		//	readyCondVar.wait(ul);
		//}
	}
	cout << "done" << endl;
}

int main()
{
	auto f1 = async(launch::async, thread1);
	auto f2 = async(launch::async, thread2);

	f1.wait();
	f2.wait();
}
#endif // 0



#include <queue>


#if 0
std::queue<int> queue1;//並發使用,被一個mutex和一個condition variable保護著
mutex queueMutex;
condition_variable queueCondVar;

void provider(int val)
{
	for (int i = 0; i < 6; ++i)
	{
		{
			lock_guard<mutex> ul(queueMutex);
			queue1.push(val + i);
		}
		queueCondVar.notify_one();
		this_thread::sleep_for(chrono::milliseconds(val));
	}
}

void consumer(int num)
{
	while (true)
	{
		int val;
		{
			unique_lock<mutex> ul(queueMutex);
			queueCondVar.wait(ul, []{return !queue1.empty(); });
			val = queue1.front();
			queue1.pop();
			cout << "consumer" << num << ": " << val << endl;

			//使用wait_for wait_until則要判斷返回值
			//if (queueCondVar.wait_for(ul, chrono::seconds(1), []{return !queue1.empty(); }))
			//if (queueCondVar.wait_for(ul, chrono::seconds(1)) == cv_status::no_timeout)//這個沒有判斷readyFlag
			//{
			//	val = queue1.front();
			//	queue1.pop();
			//	cout << "consumer" << num << ": " << val << endl;
			//}
		}
		
	}
}


int main()
{
	auto p1 = async(launch::async, provider, 100);
	auto p2 = async(launch::async, provider, 300);
	auto p3 = async(launch::async, provider, 500);

	auto c1 = async(launch::async, consumer, 1);
	auto c2 = async(launch::async, consumer, 2);

	//c1.wait();
	getchar();
	exit(0);
}
#endif // 0


#include <atomic>

void foo(){
	atomic<int> ai(0);
	int x = ai;
	ai = 10;
	ai++;
	ai -= 17;

	atomic<int> a;
	atomic_init(&a, 0);//沒有初始化則使用這個進行初始化
}



long data;
atomic<bool> readyFlag(false);

//void thread1()
//{
//	//...
//	readyFlag.store(true);//賦予一個新值
//}
//
//void thread2()
//{
//	while (!readyFlag.load())//取當前值
//	{
//		this_thread::sleep_for(chrono::milliseconds(100));
//	}
//	//...
//}

void provider()
{
	while (true)
	{
		cout << "<return>" << endl;
		char arr[100];
		cin.getline(arr, 100);

		data = 42;
		readyFlag.store(true);
	}
}

void consumer()
{
	while (true)
	{
		while (!readyFlag.load()){
			cout.put(‘.‘).flush();
			this_thread::sleep_for(chrono::milliseconds(50));
		}
		cout << "\nvalue : " << data << endl;
		readyFlag = false;
	}
	
}

int main()
{
	auto p = async(launch::async, provider);
	auto c = async(launch::async, consumer);
	
	c.wait();
}

//與條件變量一起使用
//unique_lock<mutex> ul(readyMutex);
//readyCondVar.wait(ul, []{return !readyFlag.load(); });

  

C++11 線程並發