1. 程式人生 > >筆記:程序間通訊——同步(互斥鎖、讀寫鎖、條件變數、訊號量)以及Linux中的RCU

筆記:程序間通訊——同步(互斥鎖、讀寫鎖、條件變數、訊號量)以及Linux中的RCU

1.互斥鎖

多個執行緒的IPC,需要同步,同步有隱式的和顯示的:

比如unix提供的管道和FIFO,由核心負責同步,比如read發生在write之前,那麼read就會被核心阻塞,這中同步是由核心負責的,使用者不會感知。

但如果使用共享區作為生產者和消費者之間的IPC,那麼程式設計師就需要負責同步,這種稱為顯示同步。

2.讀寫鎖

互斥鎖把試圖進入臨界區的所有其他執行緒都阻塞住。該臨界區通常涉及對由這些執行緒共享的一個或多個數據的訪問或更新。

但我們有時候可以在read某個資料和write某個資料之間做區分,即讀寫鎖(read-write lock),規則如下:

(1)只要沒有執行緒持有讀寫鎖進行寫,那麼任意數目的執行緒可以持有該讀寫鎖進行度

(2)僅當沒有執行緒持有讀寫鎖進行讀或者寫時,才能分配該讀寫鎖用於寫

對於那些讀操作比寫操作頻繁的應用,讀寫鎖相比互斥鎖具有更好的效能。因為它允許多個讀者讀資料,具有更高的併發性,同時又能保證寫者不受干擾。

讀寫鎖

pthread_rwlock_t

#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwptr);//獲取一個讀出鎖,rwptr已經由寫著持有,那麼阻塞
int pthread_rwlock_wrlock(pthread_rwlock_t *rwptr);//獲取一個寫入鎖,如果已經由讀者或寫著持有,那麼阻塞


3.條件變數

互斥鎖,當消費者等待生產者的資料時,需要不斷的測試(即spinning),這種反覆的檢測和輪詢會浪費CPU的時間,為了避免這種spining,自旋轉lock,改進,使用條件變數。

互斥鎖用於上鎖,條件變數則用於等待。這兩種不同的同步都是需要的。

posix的條件變數為pthread_cond_t

#include <pthread.h>

int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr);

int pthread_cond_signal(pthread_cond_t);這不是Unix中的SIG

條件變數是用來等待而不是用來上鎖的。條件變數用來阻塞一個執行緒,直到某特殊情況發生為止。通常條件變數和互斥鎖同時使用。條件的檢測是在互斥鎖的保護下進行的。如果一個條件為假,一個執行緒自動阻塞,並釋放等待狀態改變的互斥鎖。如果另一個執行緒改變了條件,它發訊號給關聯的條件變數,喚醒一個或多個等待它的執行緒,重新獲得互斥鎖,重新評價條件。如果兩程序共享可讀寫的記憶體,條件變數可以被用來實現這兩程序間的執行緒同步

#include <pthread.h>
#include <unistd.h>

static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

struct node {
int n_number;
struct node *n_next;
} *head = NULL;

/*[thread_func]*/
static void cleanup_handler(void *arg)
{
    printf("Cleanup handler of second thread./n");
    free(arg);
    (void)pthread_mutex_unlock(&mtx);
}
static void *thread_func(void *arg)
{
    struct node *p = NULL;

    pthread_cleanup_push(cleanup_handler, p);
    while (1) {
    pthread_mutex_lock(&mtx);           //這個mutex主要是用來保證pthread_cond_wait的併發性
    while (head == NULL)   {  //這個while要特別說明一下,單個pthread_cond_wait功能很完善,為何這裡要有一個while (head == NULL)呢?因為pthread_cond_wait裡的執行緒可能會被意外喚醒,如果這個時候head != NULL,則不是我們想要的情況。這個時候,應該讓執行緒繼續進入pthread_cond_wait
        pthread_cond_wait(&cond, &mtx);         // pthread_cond_wait會先解除之前的pthread_mutex_lock鎖定的mtx,然後阻塞在等待對列裡休眠,直到再次被喚醒(大多數情況下是等待的條件成立而被喚醒,喚醒後,該程序會先鎖定先pthread_mutex_lock(&mtx);,再讀取資源
                 //用這個流程是比較清楚的/*block-->unlock-->wait() return-->lock*/
    }
        p = head;
        head = head->n_next;
        printf("Got %d from front of queue/n", p->n_number);
        free(p);
        pthread_mutex_unlock(&mtx);             //臨界區資料操作完畢,釋放互斥鎖
    }
    pthread_cleanup_pop(0);
    return 0;
}

int main(void)
{
    pthread_t tid;
    int i;
    struct node *p;
    pthread_create(&tid, NULL, thread_func, NULL);   //子執行緒會一直等待資源,類似生產者和消費者,但是這裡的消費者可以是多個消費者,而不僅僅支援普通的單個消費者,這個模型雖然簡單,但是很強大
    /*[tx6-main]*/
    for (i = 0; i < 10; i++) {
        p = malloc(sizeof(struct node));
        p->n_number = i;
        pthread_mutex_lock(&mtx);             //需要操作head這個臨界資源,先加鎖,
        p->n_next = head;
        head = p;
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mtx);           //解鎖
        sleep(1);
    }
    printf("thread 1 wanna end the line.So cancel thread 2./n");
    pthread_cancel(tid);             //關於pthread_cancel,有一點額外的說明,它是從外部終止子執行緒,子執行緒會在最近的取消點,退出執行緒,而在我們的程式碼裡,最近的取消點肯定就是pthread_cond_wait()了。關於取消點的資訊,有興趣可以google,這裡不多說了
    pthread_join(tid, NULL);
    printf("All done -- exiting/n");
    return 0;
}


4.posix訊號量

一個訊號上的三種操作

1.建立一個訊號量,並賦予初始值

2.等待(wait)一個訊號量,即P操作,或者上鎖(lock),等待(wait)

3.掛出(post)一個訊號量,即V操作,或者解鎖(unlock),傳送訊號(singal)

計算訊號量(counting semaphore),計算訊號量通常初始化為某個值N,指示可用資源數,

二值訊號量可用於互斥目的,就像互斥鎖一樣

 初始化訊號量為1; 
sem_wait(&sem);
臨界區;
sem_post(&sem);

初始化互斥鎖;
pthread_mutex_lock(&mutex); 
臨界區;
pthread_mutex_unlock(&mutex);

sem_wait呼叫中等待訊號量值變為1,即有了可用資源,然後再減1,

sem_post呼叫將訊號量的值加1,然後喚醒阻塞在sem_wait呼叫中等待該訊號量的任何執行緒

兩者的區別:互斥鎖必須總是由鎖住它的執行緒解鎖,訊號量的掛出(sem_post)卻不必由執行過它的等待操作的同一執行緒執行

訊號量,互斥鎖,條件變數之間的區別

1.互斥鎖必須總是由鎖住它的執行緒解鎖,訊號量的掛出卻不必由執行過它的等待操作的同一執行緒執行。

2.互斥鎖要麼被鎖住,要麼被解開(二值狀態,二值訊號量)

3.訊號量有一個與之關聯的狀態(它的計數值)

Posix提供的訊號量分兩種:有名(named)訊號量和基於記憶體的(memory-based)的訊號量(也稱為無名unnamed)訊號量。

1.有名訊號量

#include <semaphore.h>
sem_t *sem_open(const char *name, int oflag, ...)
成功則返回指向訊號量的指標,若出現錯誤則為SEM_FAILED

sem_open返回值指向sem_t資料型別的指標。
#include <semaphore.h>
int sem_close(sem_t *sem);
關閉訊號量。
但關閉一個訊號量並沒有從系統中刪除:即使當前沒有程序開啟著某個訊號量,它的值仍然保持

#include <semaphore.h>
int sem_unlink(const char *name);   
有名訊號量使用sem_unlink從系統中刪除

每個訊號量都有一個引用計數器,記錄當前開啟次數:其訊號量的析構卻要等到最後一個sem_close發生為止

sem_wait函式測試所制定訊號量的值,如果該值大於0,那就將它減1並立即返回。如果該值等於0,呼叫執行緒就被投入睡眠中,直到該值為大於0,

#include <semaphore.h>
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);

sem_wait和sem_trywait的差別是:當所指定訊號量的值已經是0時,後者並不將呼叫執行緒投入睡眠,而是返回一個EAGAIN錯誤。
#include <semaphore.h>
int sem_post(sem_t *sem);
int sem_getvalue(sem_t *sem, int *valp);

sem_post和sem_getvalue

當一個執行緒使用玩某個訊號量時,它應該呼叫sem_post,本函式把所指定訊號的值加1,然後喚醒正在等待該訊號量值變為正數的任意執行緒

sem_getvalue在由valp指向的整數中返回指定訊號量的當前值,可以為負數,其絕對值就是等待訊號量解鎖的執行緒數。

2.無名訊號量,基於記憶體的訊號量

它們由應用程式分配訊號量的記憶體空間(即分配一個sem_t資料型別的記憶體空間),然後由系統初始化它們。

#include <semaphore.h>
int sem_init(sem_t *sem, int shared, unsigned int value);
int sem_destory(sem_t *sem);
sem_init中sem引數必須指向應用程式分配的sem_t變數,如果shared為0,那麼待初始化的訊號量是在同一個程序的各個執行緒間共享的,否則該訊號量是在程序間共享的。如果是共享的,那麼sem必須放在各個程序的共享區。

5.Linux核心中的RCU機制

RCU(Read-Copy Update)是資料同步的一種方式,在當前的Linux核心中發揮著重要的作用。RCU主要針對的資料物件是連結串列,目的是提高遍歷讀取資料的效率,為了達到目的使用RCU機制讀取資料的時候不對連結串列進行耗時的加鎖操作。這樣在同一時間可以有多個執行緒同時讀取該連結串列,並且允許一個執行緒對連結串列進行修改(修改的時候,需要加鎖)。(是不是有點類似與讀寫鎖呢

RCU適用於需要頻繁的讀取資料,而相應修改資料並不多的情景,例如在檔案系統中,經常需要查詢定位目錄,而對目錄的修改相對來說並不多,這就是RCU發揮作用的最佳場景。

通過允許在更新的同時讀資料,RCU 提高了同步機制的可伸縮性(scalability)。相對於傳統的在併發執行緒間不區分是讀者還是寫者的簡單互斥性鎖機制,或者是哪些允許併發讀但同時不 允許寫的讀寫鎖,RCU 支援同時一個更新執行緒和多個讀執行緒的併發。RCU 通過儲存物件的多個副本來保障讀操作的連續性,並保證在預定的讀方臨界區沒有完成之前不會釋放這個物件。RCU定義並使用高效、可伸縮的機制來發布並讀取 物件的新版本,並延長舊版本們的壽命。這些機制將工作分發到了讀和更新路徑上,以保證讀路徑可以極快地執行。在某些場合(非搶佔核心),RCU 的讀方沒有任何效能負擔。

RCU的實現?

 在RCU的實現過程中,我們主要解決以下問題:

       1,在讀取過程中,另外一個執行緒刪除了一個節點。刪除執行緒可以把這個節點從連結串列中移除,但它不能直接銷燬這個節點,必須等到所有的讀取執行緒讀取完成以後,才進行銷燬操作。RCU中把這個過程稱為寬限期(Grace period)。

       2,在讀取過程中,另外一個執行緒插入了一個新節點,而讀執行緒讀到了這個節點,那麼需要保證讀到的這個節點是完整的。這裡涉及到了釋出-訂閱機制(Publish-Subscribe Mechanism)。

       3, 保證讀取連結串列的完整性。新增或者刪除一個節點,不至於導致遍歷一個連結串列從中間斷開。但是RCU並不保證一定能讀到新增的節點或者不讀到要被刪除的節點。

 RCU機制是Linux2.6之後提供的一種資料一致性訪問的機制,從RCU(read-copy-update)的名稱上看,我們就能對他的實現機制有一個大概的瞭解,在修改資料的時候,首先需要讀取資料,然後生成一個副本,對副本進行修改,修改完成之後再將老資料update成新的資料,此所謂RCU。

       在作業系統中,資料一致性訪問是一個非常重要的部分,通常我們可以採用鎖機制實現資料的一致性訪問。例如,semaphore、spinlock機制,在訪問共享資料時,首先訪問鎖資源,在獲取鎖資源的前提下才能實現資料的訪問。這種原理很簡單,根本的思想就是在訪問臨界資源時,首先訪問一個全域性的變數(鎖),通過全域性變數的狀態來控制執行緒對臨界資源的訪問。但是,這種思想是需要硬體支援的,硬體需要配合實現全域性變數(鎖)的讀-修改-寫,現代CPU都會提供這樣的原子化指令。採用鎖機制實現資料訪問的一致性存在如下兩個問題:

1、  效率問題。鎖機制的實現需要對記憶體的原子化訪問,這種訪問操作會破壞流水線操作,降低了流水線效率。這是影響效能的一個因素。另外,在採用讀寫鎖機制的情況下,寫鎖是排他鎖,無法實現寫鎖與讀鎖的併發操作,在某些應用下回降低效能。

2、  擴充套件性問題(scalability)。當系統中CPU數量增多的時候,採用鎖機制實現資料的同步訪問效率偏低。並且隨著CPU數量的增多,效率降低,由此可見鎖機制實現的資料一致性訪問擴充套件性差。

為了解決上述問題,Linux中引進了RCU機制。該機制在多CPU的平臺上比較適用,對於讀多寫少的應用尤其適用。RCU的思路實際上很簡單,下面對其進行描述:

1.對於讀操作,可以直接對共享資源進行訪問但是前提是需要CPU支援訪存操作的原子化,現代CPU對這一點都做了保證。但是RCU的讀操作上下文是不可搶佔的(這一點在下面解釋),所以讀訪問共享資源時可以採用read_rcu_lock(),該函式的工作是停止搶佔。

2.對於寫操作,其需要將原來的老資料作一次備份(copy),然後對備份資料進行修改,修改完畢之後再用新資料更新老資料,更新老資料時採用了rcu_assign_pointer()巨集,在該函式中首先屏障一下memory,然後修改老資料。這個操作完成之後,需要進行老資料資源的回收。操作執行緒向系統註冊回收方法,等待回收。採用資料備份的方法可以實現讀者與寫者之間的併發操作,但是不能解決多個寫者之間的同步,所以當存在多個寫者時,需要通過鎖機制對其進行互斥,也就是在同一時刻只能存在一個寫者。

3.RCU機制中存在一個垃圾回收的daemon,當共享資源被update之後,可以採用該daemon實現老資料資源的回收。回收時間點就是在update之前的所有的讀者全部退出。由此可見寫者在update之後是需要睡眠等待的,需要等待讀者完成操作,如果在這個時刻讀者被搶佔或者睡眠,那麼很可能會導致系統死鎖。因為此時寫者在等待讀者,讀者被搶佔或者睡眠,如果正在執行的執行緒需要訪問讀者和寫者已經佔用的資源,那麼死鎖的條件就很有可能形成了。

從上述分析來看,RCU思想是比較簡單的,其核心內容緊緊圍繞“寫時拷貝”,採用RCU機制,能夠保證在讀寫操作共享資源時,基本不需要取鎖操作,能夠在一定程度上提升效能。但是該機制的應用是有條件的,對於讀多寫少的應用,機制的開銷比較小,效能會大幅度提升,但是如果寫操作較多時,開銷將會增大,效能不一定會有所提升。總體來說,RCU機制是對rw_lock的一種優化。