C++ 併發程式設計之互斥鎖和條件變數的效能比較
介紹
本文以最簡單生產者消費者模型,通過執行程式,觀察該程序的cpu使用率,來對比使用互斥鎖 和 互斥鎖+條件變數的效能比較。
本例子的生產者消費者模型,1個生產者,5個消費者。
生產者執行緒往佇列裡放入資料,5個消費者執行緒從佇列取資料,取資料前需要判斷一下佇列中是否有資料,這個佇列是全域性佇列,是執行緒間共享的資料,所以需要使用互斥鎖進行保護。即生產者在往佇列裡放入資料時,其餘消費者不能取,反之亦然。
互斥鎖實現的程式碼
#include <iostream> // std::cout #include <deque> // std::deque #include <thread> // std::thread #include <chrono> // std::chrono #include <mutex> // std::mutex // 全域性佇列 std::deque<int> g_deque; // 全域性鎖 std::mutex g_mutex; // 生產者執行標記 bool producer_is_running = true; // 生產者執行緒函式 void Producer() { // 庫存個數 int count = 8; do { // 智慧鎖,初始化後即加鎖,保護的範圍是程式碼花括號內,花括號退出即會自動解鎖 // 可以手動解鎖,從而控制互斥鎖的細粒度 std::unique_lock<std::mutex> locker( g_mutex ); // 入隊一個數據 g_deque.push_front( count ); // 提前解鎖,縮小互斥鎖的細粒度,只針對共享的佇列資料進行同步保護 locker.unlock(); std::cout << "生產者 :我現在庫存有 :" << count << std::endl; // 放慢生產者生產速度,睡1秒 std::this_thread::sleep_for( std::chrono::seconds( 1 ) ); // 庫存自減少 count--; } while( count > 0 ); // 標記生產者打樣了 producer_is_running = false; std::cout << "生產者 : 我的庫存沒有了,我要打樣了!" << std::endl; } // 消費者執行緒函式 void Consumer(int id) { int data = 0; do { std::unique_lock<std::mutex> locker( g_mutex ); if( !g_deque.empty() ) { data = g_deque.back(); g_deque.pop_back(); locker.unlock(); std::cout << "消費者[" << id << "] : 我搶到貨的編號是 :" << data << std::endl; } else { locker.unlock(); } } while( producer_is_running ); std::cout << "消費者[" << id << "] :賣家沒有貨打樣了,真可惜,下次再來搶!" << std::endl; } int main(void) { std::cout << "1 producer start ..." << std::endl; std::thread producer( Producer ); std::cout << "5 consumer start ..." << std::endl; std::thread consumer[ 5 ]; for(int i = 0; i < 5; i++) { consumer[i] = std::thread(Consumer, i + 1); } producer.join(); for(int i = 0; i < 5; i++) { consumer[i].join(); } std::cout << "All threads joined." << std::endl; return 0; }
互斥鎖實現執行結果:
結果輸出
[root@lincoding condition]# g++ -std=c++0x -pthread -D_GLIBCXX_USE_NANOSLEEP main.cpp -o main [root@lincoding condition]# ./main 1 producer start ... 5 consumer start ... 生產者 :我現在庫存有 :8 消費者[1] : 我搶到貨的編號是 :8 消費者[1] : 我搶到貨的編號是 :7 生產者 :我現在庫存有 :7 生產者 :我現在庫存有 :6 消費者[3] : 我搶到貨的編號是 :6 生產者 :我現在庫存有 :5 消費者[1] : 我搶到貨的編號是 :5 生產者 :我現在庫存有 :4 消費者[2] : 我搶到貨的編號是 :4 生產者 :我現在庫存有 :3 消費者[5] : 我搶到貨的編號是 :3 生產者 :我現在庫存有 :2 消費者[2] : 我搶到貨的編號是 :2 生產者 :我現在庫存有 :1 消費者[1] : 我搶到貨的編號是 :1 生產者 : 我的庫存沒有了,我要打樣了!消費者[ 5] :賣家沒有貨打樣了,真可惜,下次再來搶! 消費者[2] :賣家沒有貨打樣了,真可惜,下次再來搶! 消費者[3] :賣家沒有貨打樣了,真可惜,下次再來搶! 消費者[4] :賣家沒有貨打樣了,真可惜,下次再來搶! 消費者[1] :賣家沒有貨打樣了,真可惜,下次再來搶! All threads joined.
可以看到,互斥鎖其實可以完成這個任務,但是卻存在著效能問題。
Producer
是生產者執行緒,在生產者資料過程中,會休息1秒
,所以這個生產過程是很慢的;Consumer
是消費者執行緒,存在著一個while
迴圈,只有判斷到生產者不運行了,才會退出while
迴圈,那麼每次在迴圈體內,都是會先加鎖,判斷佇列不空,然後從列隊取出一個數據,最後解鎖。所以說,在生產者休息1秒
的時候,消費者執行緒實際上會做很多無用功,導致CPU使用率非常高!
執行的環境是4核cpu
[root@lincoding ~]# grep 'model name' /proc/cpuinfo | wc -l
4
top命令檢視cpu使用情況,可見使用純互斥鎖cpu的開銷是很大的,main
357.5%CPU
,系統開銷的cpu為54.5%sy
,使用者開銷的cpu為18.2%us
[root@lincoding ~]# top
top - 19:13:41 up 36 min, 3 users, load average: 0.06, 0.05, 0.01
Tasks: 179 total, 1 running, 178 sleeping, 0 stopped, 0 zombie
Cpu(s): 18.2%us, 54.5%sy, 0.0%ni, 27.3%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st
Mem: 1004412k total, 313492k used, 690920k free, 41424k buffers
Swap: 2031608k total, 0k used, 2031608k free, 79968k cached
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
35346 root 20 0 137m 3288 1024 S 357.5 0.3 0:05.92 main
1 root 20 0 19232 1492 1224 S 0.0 0.1 0:02.16 init
2 root 20 0 0 0 0 S 0.0 0.0 0:00.01 kthreadd
3 root RT 0 0 0 0 S 0.0 0.0 0:00.68 migration/0
解決的辦法之一就是給消費者也加一個小延時,當消費者沒取到資料時,就休息一下500毫秒
,這樣可以減少互斥鎖給cpu帶來的開銷。
// 消費者執行緒函式
void Consumer(int id)
{
int data = 0;
do
{
std::unique_lock<std::mutex> locker( g_mutex );
if( !g_deque.empty() )
{
data = g_deque.back();
g_deque.pop_back();
locker.unlock();
std::cout << "消費者[" << id << "] : 我搶到貨的編號是 :" << data << std::endl;
}
else
{
locker.unlock();
// 當消費者沒取到資料時,就休息一下500毫秒
std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) );
}
} while( producer_is_running );
std::cout << "消費者[" << id << "] :賣家沒有貨打樣了,真可惜,下次再來搶!" << std::endl;
}
從執行結果可知,cpu使用率大大降低了
[root@lincoding ~]# ps aux | grep -v grep |grep main
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
root 61296 0.0 0.1 141068 1244 pts/1 Sl+ 19:40 0:00 ./main
條件變數+互斥鎖實現的程式碼
那麼問題來了,如何確定消費者延時(休息)多久呢?
- 如果生產者生產的非常快,消費者卻延時了
500毫秒
,也不是很好 - 如果生產者生產的更慢,那麼消費延時
500毫秒
,也會有無用功,佔用了CPU
這就需要引入條件變數std::condition_variable
,應用於消費者生產模型中,就是生產者生產完一個數據後,通過notify_one()
喚醒正在wait()
消費者執行緒,使得消費者從佇列取出一個數據。
#include <iostream> // std::cout
#include <deque> // std::deque
#include <thread> // std::thread
#include <chrono> // std::chrono
#include <mutex> // std::mutex
#include <condition_variable> // std::condition_variable
// 全域性佇列
std::deque<int> g_deque;
// 全域性鎖
std::mutex g_mutex;
// 全域性條件變數
std::condition_variable g_cond;
// 生產者執行標記
bool producer_is_running = true;
// 生產者執行緒函式
void Producer()
{
// 庫存個數
int count = 8;
do
{
// 智慧鎖,初始化後即加鎖,保護的範圍是程式碼花括號內,花括號退出即會自動解鎖
// 可以手動解鎖,從而控制互斥鎖的細粒度
std::unique_lock<std::mutex> locker( g_mutex );
// 入隊一個數據
g_deque.push_front( count );
// 提前解鎖,縮小互斥鎖的細粒度,只針對共享的佇列資料進行同步保護
locker.unlock();
std::cout << "生產者 :我現在庫存有 :" << count << std::endl;
// 喚醒一個執行緒
g_cond.notify_one();
// 睡1秒
std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
// 庫存自減少
count--;
} while( count > 0 );
// 標記生產者打樣了
producer_is_running = false;
// 喚醒所有消費執行緒
g_cond.notify_all();
std::cout << "生產者 : 我的庫存沒有了,我要打樣了!" << std::endl;
}
// 消費者執行緒函式
void Consumer(int id)
{
// 購買的貨品編號
int data = 0;
do
{
// 智慧鎖,初始化後即加鎖,保護的範圍是程式碼花括號內,花括號退出即會自動解鎖
// 可以手動解鎖,從而控制互斥鎖的細粒度
std::unique_lock<std::mutex> locker( g_mutex );
// wait()函式會先呼叫互斥鎖的unlock()函式,然後再將自己睡眠,在被喚醒後,又會繼續持有鎖,保護後面的佇列操作
// 必須使用unique_lock,不能使用lock_guard,因為lock_guard沒有lock和unlock介面,而unique_lock則都提供了
g_cond.wait(locker);
// 佇列不為空
if( !g_deque.empty() )
{
// 取出佇列裡最後一個數據
data = g_deque.back();
// 刪除佇列裡最後一個數據
g_deque.pop_back();
// 提前解鎖,縮小互斥鎖的細粒度,只針對共享的佇列資料進行同步保護
locker.unlock();
std::cout << "消費者[" << id << "] : 我搶到貨的編號是 :" << data << std::endl;
}
// 佇列為空
else
{
locker.unlock();
}
} while( producer_is_running );
std::cout << "消費者[" << id << "] :賣家沒有貨打樣了,真可惜,下次再來搶!" << std::endl;
}
int main(void)
{
std::cout << "1 producer start ..." << std::endl;
std::thread producer( Producer );
std::cout << "5 consumer start ..." << std::endl;
std::thread consumer[ 5 ];
for(int i = 0; i < 5; i++)
{
consumer[i] = std::thread(Consumer, i + 1);
}
producer.join();
for(int i = 0; i < 5; i++)
{
consumer[i].join();
}
std::cout << "All threads joined." << std::endl;
return 0;
}
條件變數+互斥鎖執行結果
[root@lincoding condition]# g++ -std=c++0x -pthread -D_GLIBCXX_USE_NANOSLEEP main.cpp -o main
[root@lincoding condition]#
[root@lincoding condition]# ./main
1 producer start ...
5 consumer start ...
生產者 :我現在庫存有 :8
消費者[4] : 我搶到貨的編號是 :8
生產者 :我現在庫存有 :7
消費者[2] : 我搶到貨的編號是 :7
生產者 :我現在庫存有 :6
消費者[3] : 我搶到貨的編號是 :6
生產者 :我現在庫存有 :5
消費者[5] : 我搶到貨的編號是 :5
生產者 :我現在庫存有 :4
消費者[1] : 我搶到貨的編號是 :4
生產者 :我現在庫存有 :3
消費者[4] : 我搶到貨的編號是 :3
生產者 :我現在庫存有 :2
消費者[2] : 我搶到貨的編號是 :2
生產者 :我現在庫存有 :1
消費者[3] : 我搶到貨的編號是 :1
生產者 : 我的庫存沒有了,我要打樣了!
消費者[5] :賣家沒有貨打樣了,真可惜,下次再來搶!
消費者[1] :賣家沒有貨打樣了,真可惜,下次再來搶!
消費者[4] :賣家沒有貨打樣了,真可惜,下次再來搶!
消費者[2] :賣家沒有貨打樣了,真可惜,下次再來搶!
消費者[3] :賣家沒有貨打樣了,真可惜,下次再來搶!
All threads joined.
CPU開銷非常的小
[root@lincoding ~]# ps aux | grep -v grep |grep main
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
root 73838 0.0 0.1 141068 1256 pts/1 Sl+ 19:54 0:00 ./main
總結
在不確定生產者的生產速度是快還是慢的場景裡,不能只使用互斥鎖保護共享的資料,這樣會對CPU的效能開銷非常大,可以使用互斥鎖+條件變數的方式,當生產者執行緒生產了一個數據,就喚醒消費者執行緒進行消費,避免一些無用功的效能開銷