1. 程式人生 > >LinuxC高階程式設計——執行緒間同步

LinuxC高階程式設計——執行緒間同步

 LinuxC高階程式設計——執行緒間同步

宗旨:技術的學習是有限的,分享的精神是無限的。

1、 互斥鎖mutex

        多個執行緒同時訪問共享資料時可能會衝突。對於多執行緒的程式,訪問衝突的問題是很普遍的,解決的辦法是引入互斥鎖(Mutex, Mutual Exclusive Lock),獲得鎖的執行緒可以完成“讀-修改-寫”的操作,然後釋放鎖給其它執行緒,沒有獲得 鎖的執行緒只能等待而不能訪問共享資料,這樣“讀-修改-寫”三步操作組成一個原子操作,要麼都執行,要麼都不執行,不會執行到中間被打斷,也不會在其它處理器上並行做這個操作。Mutex用pthread_mutex_t型別的變量表示,可以這樣初始化和銷燬:

#include<pthread.h>
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_init(pthread_mutex_t *restrict mutex,constpthread_mutexattr_t *restrict attr);
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

        返回值:成功返回0,失敗返回錯誤號。

pthread_mutex_init函式對Mutex做初始化,引數attr設定Mutex的屬性,如果attr為NULL則表示預設屬性,用pthread_mutex_init函 數初始化的Mutex可以用pthread_mutex_destroy銷燬。如果Mutex變數是靜態分配的(全域性變數 或static變數),也可以用巨集定義PTHREAD_MUTEX_INITIALIZER來初始化,相當於用pthread_mutex_init初始化並且attr引數為NULL。 Mutex的加鎖和解鎖操作可以用下列函式:

#include<pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

        返回值:成功返回0,失敗返回錯誤號。

        一個執行緒可以呼叫pthread_mutex_lock獲得Mutex,如果這時另一個執行緒已經呼叫pthread_mutex_lock獲得了該Mutex,則當前執行緒需要掛起等待,直到另一個執行緒呼叫pthread_mutex_unlock釋放Mutex,當前執行緒被喚醒,才能獲得該Mutex並繼續執行。 如果一個執行緒既想獲得鎖,又不想掛起等待,可以呼叫pthread_mutex_trylock,如果Mutex已經被另一個執行緒獲得,這個函式會失敗返回EBUSY,而不會使執行緒掛起等待。 

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define NLOOP 5000
int counter; /* incremented by threads */

pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;

void *doit(void *);

int main(int argc, char **argv)
{
  pthread_t tidA, tidB;
  pthread_create(&tidA, NULL, doit, NULL);
  pthread_create(&tidB, NULL, doit, NULL);
  /* wait for both threads to terminate */
  pthread_join(tidA, NULL);
  pthread_join(tidB, NULL);
  return 0;
}
void *doit(void *vptr)
{
  int i, val;

  for (i = 0; i < NLOOP; i++)
  {
    pthread_mutex_lock(&counter_mutex);
    val = counter;
    printf("%x: %d\n", (unsigned int)pthread_self(), val   + 1);
    counter = val + 1;
    pthread_mutex_unlock(&counter_mutex);
  }
  return NULL;
}

2、 條件變數

        執行緒A需要等某個條件成立才能繼續往下執行,現在這個條件不成立,執行緒A就阻塞等待,而執行緒B在執行過程中使這個條件成立了,就喚醒執行緒A繼續執行。 在pthread庫中通過條件變數(Condition Variable) 來阻塞等待一個條件,或者喚醒等待這個條件的執行緒。 Condition Variable用pthread_cond_t型別的變量表示,可以這樣初始化和銷燬:

#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t*restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

        返回值:成功返回0,失敗返回錯誤號。

        和Mutex的初始化和銷燬類似,pthread_cond_init函式初始化一個Condition Variable, attr引數 為NULL則表示預設屬性, pthread_cond_destroy函式銷燬一個Condition Variable。如果Condition Variable是靜態分配的,也可以用巨集定義PTHEAD_COND_INITIALIZER初始化,相當於 用pthread_cond_init函式初始化並且attr引數為NULL。 Condition Variable的操作可以用下列函式:

#include <pthread.h>
int pthread_cond_timedwait(pthread_cond_t *restrict cond,pthread_mutex_t*restrict mutex,
const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrictmutex);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

        返回值:成功返回0,失敗返回錯誤號。 可見,一個ConditionVariable總是和一個Mutex搭配使用的。一個執行緒可以調 用pthread_cond_wait在一個Condition Variable上阻塞等待,這個函式做以下三步操作:

1.  釋放Mutex   2. 阻塞等待   3. 當被喚醒時,重新獲得Mutex並返回.

        pthread_cond_timedwait函式還有一個額外的引數可以設定等待超時,如果到達了abstime所指定的 時刻仍然沒有別的執行緒來喚醒當前執行緒,就返回ETIMEDOUT。一個執行緒可以調 用pthread_cond_signal喚醒在某個Condition Variable上等待的另一個執行緒,也可以呼叫pthread_cond_broadcast喚醒在這個Condition Variable上等待的所有執行緒。 

#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>

struct msg
{
  struct msg *next;
  int num;
};

struct msg *head;
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

void *consumer(void *p)
{
  struct msg *mp;
  for (;;)
  {
    pthread_mutex_lock(&lock);
    while (head == NULL)
    {
      pthread_cond_wait(&has_product, &lock);
    }
    mp = head;
    head = mp->next;
    pthread_mutex_unlock(&lock);
    printf("Consume %d\n", mp->num);
    free(mp);
    sleep(rand() % 5);
  }
}
void *producer(void *p)
{
  struct msg *mp;
  for (;;)
  {
    mp = malloc(sizeof(struct msg));
    mp->num = rand() % 1000 + 1;
    printf("Produce %d\n", mp->num);
    pthread_mutex_lock(&lock);
    mp->next = head;
    head = mp;
    pthread_mutex_unlock(&lock);
    pthread_cond_signal(&has_product);
    sleep(rand() % 5);
  }
}

int main(int argc, char *argv[])
{
  pthread_t pid, cid;
  srand(time(NULL));
  pthread_create(&pid, NULL, producer, NULL);
  pthread_create(&cid, NULL, consumer, NULL);
  pthread_join(pid, NULL);
  pthread_join(cid, NULL);

  return 0;
}

3、 訊號量

        Mutex變數是非0即1的,可看作一種資源的可用數量,初始化時Mutex是1,表示有一個可用資源,加鎖時獲得該資源,將Mutex減到0,表示不再有可用資源,解鎖時釋放該資源,將Mutex重新加到1,表示又有了一個可用資源。訊號量(Semaphore)和Mutex類似,表示可用資源的數量,和Mutex不同的是這個數量可以大於1.

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t * sem);
int sem_destroy(sem_t * sem);

        semaphore變數的型別為sem_t, sem_init()初始化一個semaphore變數, value引數表示可用資源 的數量, pshared引數為0表示訊號量用於同一程序的執行緒間同步,本節只介紹這種情況。在用 完semaphore變數之後應該呼叫sem_destroy()釋放與semaphore相關的資源。 呼叫sem_wait()可以獲得資源,使semaphore的值減1,如果呼叫sem_wait()時semaphore的值已 經是0,則掛起等待。如果不希望掛起等待,可以呼叫sem_trywait()。呼叫sem_post()可以釋放資 源,使semaphore的值加1,同時喚醒掛起等待的執行緒。

#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>

#define NUM 5
int queue[NUM];
sem_t blank_number, product_number;

void *producer(void *arg)
{
  int p = 0;
  while (1)
  {
    sem_wait(&blank_number);
    queue[p] = rand() % 1000 + 1;
    printf("Produce %d\n", queue[p]);
    sem_post(&product_number);
    p = (p + 1) % NUM;
    sleep(rand() % 5);
  }
}

void *consumer(void *arg)
{
  int c = 0;
  while (1)
  {
    sem_wait(&product_number);
    printf("Consume %d\n", queue[c]);
    queue[c] = 0;
    sem_post(&blank_number);
    c = (c + 1) % NUM;
    sleep(rand() % 5);
  }
}

int main(int argc, char *argv[])
{
  pthread_t pid, cid;
  sem_init(&blank_number, 0, NUM);
  sem_init(&product_number, 0, 0);
  pthread_create(&pid, NULL, producer, NULL);
  pthread_create(&cid, NULL, consumer, NULL);
  pthread_join(pid, NULL);
  pthread_join(cid, NULL);
  sem_destroy(&blank_number);
  sem_destroy(&product_number);
  return 0;
}