執行緒同步機制(互斥量,讀寫鎖,自旋鎖,條件變數,屏障)
阿新 • • 發佈:2019-02-07
先知:
(1)執行緒是由程序建立而來,是cpu排程的最小單位。
(2)每個程序都有自己獨立的地址空間,而程序中的多個執行緒共用程序的資源,他們只有自己獨立的棧資源。
執行緒同步:
當多個控制執行緒共享相同的記憶體時,需要確保每個程序看到一致的資料檢視。同一個資料如果被兩個及以上的執行緒進行同時訪問操作的時候,有可能就會造成資料不一致的現象。為了解決這個問題,執行緒不得不使用鎖。同一時間只允許一個執行緒訪問該變數。
比如增量操作:
(1)從記憶體單元讀入暫存器
(2)在暫存器中對變數做增量操作
(3)把新值寫回記憶體單元
如果兩個執行緒幾乎是同一時間對同一個變數做增量操作而不進行同步的話,結果就有可能出現數據不一致,變數有可能增加1了,也有可能比原來增加2了,具體是增加1了還是增加2了取決於第二個執行緒開始操作時候獲取的數值。
執行緒同步機制(1):互斥量
什麼是互斥量:
互斥量的本質就是一把鎖,在訪問共享資源前對互斥量進行設定(加鎖),在訪問完成後釋放(解鎖)互斥量。對互斥量加鎖以後,任何其他試圖再次對互斥量加鎖的執行緒都會被阻塞知道執行緒釋放該互斥量。
互斥量程式碼:
鎖開銷: 多執行緒的引入在訪問共享資源的時候就必須要加鎖,為了避免多次加鎖或者多次解鎖,都要多次判斷加鎖解鎖的條件。而影響伺服器的效能殺手之一就是鎖開銷。如果鎖的粒度太粗,就會出現很多執行緒阻塞等待相同的鎖,並不能改善併發性,如果鎖的粒度太細,那麼過多的鎖開銷會使系統性能受到影響,而且程式碼變得複雜。作為一個程式設計師,在需要滿足鎖的需求的情況下,在程式碼複雜性和效能之間找到確定的平衡。 執行緒的同步機制(2):讀寫鎖 什麼是讀寫鎖: 讀寫鎖和互斥量類似,不過讀寫鎖允許更高的並行性。互斥量有兩種狀態,要麼加鎖,要麼解鎖,而且兩種狀態一次只能有一個執行緒進行訪問。讀寫鎖有3中狀態,讀模式下加鎖,寫模式下加鎖,不加鎖裝填。一次只能有一個執行緒可以佔有寫模式下的讀寫鎖,但是有多個程序可以同時佔有寫模式下的讀寫鎖 適用場景:
讀寫鎖非常適用於對資料結構的讀次數遠大於寫次數的情況。
讀寫鎖程式碼:
執行緒同步機制(3):條件變數 什麼是條件變數: 條件變數給多個執行緒提供了一個會合的場所。條件變數和互斥鎖一起使用,允許執行緒以無競爭的方式等待特定條件的發生。條件本身是由互斥量保護的。執行緒在改變條件狀態之前必須首先鎖住互斥量。 結合使用條件變數和互斥量對執行緒之間進行同步:生產者消費者模型
執行緒同步機制(4):自旋鎖 什麼是自旋鎖: 自旋鎖和互斥量類似,但是它不是通過休眠使執行緒阻塞,而是在獲得鎖之前一直處於忙等(自旋)阻塞狀態。 適用場景: 鎖被持有的時間短,而且執行緒並不希望在重新排程上花費大多的成本。等執行緒自旋等待變為可用時,cpu不能做其他的事情,這就是自旋鎖只能被持有一小段時間的原因。 執行緒同步機制(5):屏障 屏障是什麼: 屏障是使用者協調多個執行緒並行工作的同步機制。屏障允許每個執行緒等待,知道所有合作的執行緒都到達某一個點時,然後從該點繼續執行。pthread_join函式就是一種屏障,允許一個執行緒等待,直到另一個執行緒退出。屏障物件的概念就更加廣泛了,他們允許任意數量的執行緒等待,直到所有的執行緒完成處理工作,而執行緒不用退出。所有的執行緒到達屏障以後可以接著工作。 屏障的應用之一: 比如說,現在有800萬個資料要進行排序。現在創建出9個執行緒,一個主執行緒和8個工作執行緒。每個工作執行緒分別對100萬個資料進行堆排序,主執行緒中設定屏障,等待八個執行緒完成資料的排序後,對八個執行緒排好序的八組資料在進行歸併排序。在8核的系統中,單執行緒程式對800萬個數進行排序需要12.14秒。同樣的系統,使用8個並行執行緒和一個合併執行緒處理相同的800萬個數排序僅僅只需要1.91秒,速度提高了6倍。
#include<stdio.h> #include<pthread.h> int count = 0; pthread_mutex_t mylock = PTHREAD_MUTEX_INITIALIZER; void* thread_run1(void* arg) { int i = 0; int val = 0; for(;i < 5000;i++) { //進臨界區時加鎖 pthread_mutex_lock(&mylock); //臨界區 val = count; printf("tid : %lu,count : %d\n",pthread_self(),count); count = val + 1; //出臨界區時解鎖 pthread_mutex_unlock(&mylock); } } void* thread_run2(void* arg) { int i = 0; int val = 0; for(;i < 5000;i++) { //進臨界區時加鎖 pthread_mutex_lock(&mylock); //臨界區 val = count; printf("tid : %lu,count : %d\n",pthread_self(),count); count = val + 1; //出臨界區時解鎖 pthread_mutex_unlock(&mylock); } } int main() { printf("Lock...\n"); //執行緒ID pthread_t tid1; pthread_t tid2; //建立執行緒 pthread_create(&tid1, NULL, thread_run1, NULL); pthread_create(&tid2, NULL, thread_run2, NULL); //等待新執行緒 pthread_join(tid1, NULL); pthread_join(tid2, NULL); printf("count = %d\n",count); //銷燬鎖 pthread_mutex_destroy(&mylock); return 0; }
鎖開銷: 多執行緒的引入在訪問共享資源的時候就必須要加鎖,為了避免多次加鎖或者多次解鎖,都要多次判斷加鎖解鎖的條件。而影響伺服器的效能殺手之一就是鎖開銷。如果鎖的粒度太粗,就會出現很多執行緒阻塞等待相同的鎖,並不能改善併發性,如果鎖的粒度太細,那麼過多的鎖開銷會使系統性能受到影響,而且程式碼變得複雜。作為一個程式設計師,在需要滿足鎖的需求的情況下,在程式碼複雜性和效能之間找到確定的平衡。 執行緒的同步機制(2):讀寫鎖 什麼是讀寫鎖: 讀寫鎖和互斥量類似,不過讀寫鎖允許更高的並行性。互斥量有兩種狀態,要麼加鎖,要麼解鎖,而且兩種狀態一次只能有一個執行緒進行訪問。讀寫鎖有3中狀態,讀模式下加鎖,寫模式下加鎖,不加鎖裝填。一次只能有一個執行緒可以佔有寫模式下的讀寫鎖,但是有多個程序可以同時佔有寫模式下的讀寫鎖 適用場景:
#include<stdio.h>
#include<pthread.h>
int val = 0;
pthread_rwlock_t myrw;
void* run_reader(void* arg)
{
while(1)
{
//sleep(1);
pthread_rwlock_rdlock(&myrw);//只讀模式去加鎖
printf("val = %d\n",val);
pthread_rwlock_unlock(&myrw);
}
}
void* run_writer(void* arg)
{
while(1)
{
sleep(1);
pthread_rwlock_wrlock(&myrw);//只寫模式去加鎖
val++;
pthread_rwlock_unlock(&myrw);
}
}
int main()
{
printf("rwlock...\n");
pthread_rwlock_init(&myrw, NULL);
pthread_t reader;
pthread_t writer;
pthread_create(&reader, NULL, run_reader, NULL);
pthread_create(&writer, NULL, run_writer, NULL);
pthread_join(reader, NULL);
pthread_join(writer, NULL);
pthread_rwlock_destroy(&myrw);
return 0;
}
執行緒同步機制(3):條件變數 什麼是條件變數: 條件變數給多個執行緒提供了一個會合的場所。條件變數和互斥鎖一起使用,允許執行緒以無競爭的方式等待特定條件的發生。條件本身是由互斥量保護的。執行緒在改變條件狀態之前必須首先鎖住互斥量。 結合使用條件變數和互斥量對執行緒之間進行同步:生產者消費者模型
#include<stdio.h>
#include<stdlib.h>
#include<assert.h>
#include<pthread.h>
typedef struct _node
{
int data;
struct n_ode *next;
}node_t, *node_p, **node_pp;
pthread_mutex_t mylock = PTHREAD_MUTEX_INITIALIZER;//初始化鎖
pthread_cond_t mycond = PTHREAD_COND_INITIALIZER;//初始化條件變數
node_p AllocNode(int data)
{
node_p node = (node_p)malloc(sizeof(node_t));
if(NULL == node)
{
perror("malloc");
return NULL;
}
node->data = data;
node->next = NULL;
return node;
}
void InitList(node_pp _h)
{
*_h = AllocNode(0);
}
void PushFrond(node_p list,int data)
{
assert(list);
node_p new_node = AllocNode(data);
new_node->next = list->next;
list->next = new_node;
}
int IsEmpty(node_p list)
{
if( NULL == list->next)
{
return 1;
}
else
{
return 0;
}
}
void DelNode(node_p node)
{
assert(node);
free(node);
}
void PopFrond(node_p list,int* data)
{
assert(list);
assert(data);
if( IsEmpty(list) )
{
printf("list is empty...\n");
return ;
}
node_p delnode = list->next;
list->next = delnode->next;
*data = delnode->data;
DelNode(delnode);
}
void DestroyList(node_p list)
{
assert(list);
int data = 0;
while( !IsEmpty(list))
{
PopFrond(list,&data);
}
DelNode(list);
}
void ShowList(node_p list)
{
node_p node = list->next;
while(node)
{
printf("%d ",node->data);
node = node->next;
}
printf("\n");
}
void* thread_product(void* arg)
{
node_p head = (node_p)arg;
while(1)
{
usleep(123456);
pthread_mutex_lock(&mylock);
int data = rand()%10000;
PushFrond(head,data);
printf("product done,data is : %d\n",data);
pthread_mutex_unlock(&mylock);
//生產出一個產品以後,喚醒一個消費者,通知消費者來取產品
pthread_cond_signal(&mycond);//signal(喚醒一個),broadcast(喚醒多個)
}
}
void* thread_consumer(void* arg)
{
node_p node = (node_p)arg;
int data = 0;
while(1)
{
pthread_mutex_lock(&mylock);
if(IsEmpty(node))
{
//如果連結串列為空的話,那麼等待生產者至少生產一個產品的時候,才去取產品
pthread_cond_wait(&mycond, &mylock);
}
PopFrond(node,&data);
printf("consumer done,data is : %d\n",data);
pthread_mutex_unlock(&mylock);
}
}
int main()
{
//用連結串列充當中間場所
node_p head = NULL;
InitList(&head);
//執行緒ID
pthread_t tid1;
pthread_t tid2;
//建立執行緒
int ret1 = pthread_create(&tid1, NULL, thread_product, (void*)head);//建立生產者執行緒
int ret2 = pthread_create(&tid2, NULL, thread_consumer, (void*)head);//建立消費者執行緒
if(ret1 < 0 || ret2 < 0)
{
perror("pthread_create");
return -1;
}
//等待執行緒
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
DestroyList(head);
pthread_mutex_destroy(&mylock);
pthread_cond_destroy(&mycond);
//node_p head = NULL;
//InitList(&head);
//int i = 0;
//for( ; i < 10; ++i)
//{
// PushFrond(head, i);
// ShowList(head);
// sleep(1);
//}
//int data = 0;
//for( ; i >= 5; --i)
//{
// PopFrond(head, &data);
// ShowList(head);
// sleep(1);
//}
//DestroyList(head);
//ShowList(head);
return 0;
}
執行緒同步機制(4):自旋鎖 什麼是自旋鎖: 自旋鎖和互斥量類似,但是它不是通過休眠使執行緒阻塞,而是在獲得鎖之前一直處於忙等(自旋)阻塞狀態。 適用場景: 鎖被持有的時間短,而且執行緒並不希望在重新排程上花費大多的成本。等執行緒自旋等待變為可用時,cpu不能做其他的事情,這就是自旋鎖只能被持有一小段時間的原因。 執行緒同步機制(5):屏障 屏障是什麼: 屏障是使用者協調多個執行緒並行工作的同步機制。屏障允許每個執行緒等待,知道所有合作的執行緒都到達某一個點時,然後從該點繼續執行。pthread_join函式就是一種屏障,允許一個執行緒等待,直到另一個執行緒退出。屏障物件的概念就更加廣泛了,他們允許任意數量的執行緒等待,直到所有的執行緒完成處理工作,而執行緒不用退出。所有的執行緒到達屏障以後可以接著工作。 屏障的應用之一: 比如說,現在有800萬個資料要進行排序。現在創建出9個執行緒,一個主執行緒和8個工作執行緒。每個工作執行緒分別對100萬個資料進行堆排序,主執行緒中設定屏障,等待八個執行緒完成資料的排序後,對八個執行緒排好序的八組資料在進行歸併排序。在8核的系統中,單執行緒程式對800萬個數進行排序需要12.14秒。同樣的系統,使用8個並行執行緒和一個合併執行緒處理相同的800萬個數排序僅僅只需要1.91秒,速度提高了6倍。