c語言資料結構應用-陣列佇列(無鎖佇列)在多執行緒中的使用
一、背景
上篇文章《c語言資料結構實現-陣列佇列/環形佇列》講述了陣列佇列的原理與實現,本文編寫一個雙執行緒進行速度測試
二、相關知識
多執行緒程式設計介面:
1) 建立執行緒 pthread_create 函式
2) 設定執行緒屬性 pthread_attr_setdetachstateSYNOPSIS #include <pthread.h> int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); Compile and link with -pthread. DESCRIPTION: The pthread_create() function starts a new thread in the calling process. The new thread starts execution by invoking start_routine(); arg is passed as the sole argument of start_routine(). RETURN VALUE: On success, pthread_create() returns 0; on error, it returns an error number, and the contents of *thread are undefined.
3) 執行緒鎖 pthread_mutex_init、pthread_mutex_lock、pthread_mutex_unlock、pthread_mutex_destorySYNOPSIS #include <pthread.h> int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate); Compile and link with -pthread. DESCRIPTION The pthread_attr_setdetachstate() function sets the detach state attribute of the thread attributes object referred to by attr to the value specified in detachstate. The detach state attribute determines whether a thread created using the thread attributes object attr will be created in a joinable or a detached state. RETURN VALUE On success, these functions return 0; on error, they return a nonzero error number.
執行緒鎖是用於多執行緒直接的同步,用來保護臨界資源(數值佇列),同樣類似的功能有 Posix 的 sem訊號量。
4) 條件變數 pthread_cond_init、pthread_cond_signal、pthread_cond_wait、pthread_cond_destroy
條件變數是同步執行緒的一種機制,它允許執行緒掛起,讓出處理器等待其他執行緒向它傳送訊號,該執行緒收到該訊號後被喚醒繼續執行程式。
三、實現
資料結構 test_t 為使用者資料,instance_t 為程式上下文
typedef struct item_t
{
int value;
} test_t;
typedef struct instance
{
sem_t sem_mutex;
pthread_mutex_t thd_mutex;
pthread_cond_t thd_cond;
bufqueue_t queue;
} instance_t;
主執行緒建立子執行緒,並生產資料進行入列操作,順序地生產 times 個 pitem 資料進行入列,生產完成後等待子執行緒消費
執行緒的工作方式分別有三種:執行緒完全非同步的機制、使用sem保證實時執行緒同步、使用條件變數保持同步
int test_mutilthread(int times)
{
int ret = FAILURE;
int ix = 0;
pthread_t tid = 0;
pthread_attr_t attr;
instance_t inst = {.queue = {0}};
test_t *pitem = NULL;
#ifdef TEST_SEM
printf("Sem sem_mutex mode\n");
assert(SUCCESS == ( ret = sem_init(&inst.sem_mutex, 0, 0) ));
#elif defined(TEST_COND)
printf("Pthread Cond mode\n");
assert(SUCCESS == ( ret = pthread_mutex_init(&inst.thd_mutex, NULL) ));
assert(SUCCESS == ( ret = pthread_cond_init(&inst.thd_cond, NULL) ));
#endif
assert(SUCCESS == ( ret = bufqueue_create(&inst.queue, times, sizeof(test_t)) ));
assert(SUCCESS == ( ret = pthread_attr_init(&attr) ));
assert(SUCCESS == ( ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) ));
assert(SUCCESS == ( ret = pthread_create(&tid, NULL, __do_customer, &inst) ));
/* Producer */
for ( ix = 1; ix < times; ix++ ) {
assert(NULL != ( pitem = (test_t*)bufqueue_tail(&inst.queue) ));
/* Set custom item */
pitem->value = ix;
assert(SUCCESS == ( ret = bufqueue_push(&inst.queue) ));
printf("Enqueue %u/%u\n", pitem->value, ix);
#ifdef TEST_SEM
sem_post(&inst.sem_mutex);
#elif defined(TEST_COND)
pthread_cond_signal(&inst.thd_cond);
#endif
}
while ( !bufqueue_isempty(&inst.queue) ) {
printf("Wait dequeue...\n");
usleep(300);
}
ret = SUCCESS;
_E1:
pthread_attr_destroy(&attr);
bufqueue_destroy(&inst.queue, free);
#ifdef TEST_SEM
sem_destroy(&inst.sem_mutex);
#elif defined(TEST_COND)
pthread_cond_destroy(&inst.thd_cond);
pthread_mutex_destroy(&inst.thd_mutex);
#endif
return ret;
}
子執行緒負責取佇列中的資料,取成功 times 次後就退出
執行緒的工作方式分別也有三種:執行緒完全非同步的機制、使用sem保證實時執行緒同步、使用條件變數保持同步
static void *__do_customer(void *args)
{
instance_t *pinst = (instance_t *)args;
test_t *pitem = NULL;
u32 cnt = 1;
u32 max = 0;
ASSERT_FAIL(NULL, args);
for ( max = pinst->queue.size; cnt < max; ) {
#ifdef TEST_SEM
sem_wait(&pinst->sem_mutex);
#elif defined(TEST_COND)
if ( bufqueue_isempty(&pinst->queue) ) {
pthread_mutex_lock(&pinst->thd_mutex);
pthread_cond_wait(&pinst->thd_cond, &pinst->thd_mutex);
pthread_mutex_unlock(&pinst->thd_mutex);
printf("Wait enqueue...\n");
continue;
}
#else
if ( bufqueue_isempty(&pinst->queue) ) {
usleep(300);
printf("Wait enqueue...\n");
continue;
}
#endif
assert(NULL != (pitem = (test_t*)bufqueue_pop(&pinst->queue) ));
assert(cnt++ == pitem->value);
printf("Dequeue %u/%u\n", pitem->value, cnt);
}
printf("Dequeue Done\n");
_E1:
return NULL;
}
main函式
int main(int argc, char *argv[])
{
int ret = FAILURE;
u32 times = 1024;
if ( argc > 1 ) {
times = atoi(argv[1]);
}
ret = test_mutilthread(times)
printf("Test result:\t\t\t\t[%s]\n", ret ? "FAILURE" : "SUCCESS");
exit(ret ? 1 : 0);
}
注意以上寫法為原型斷言的寫法,保證斷言失敗時程式立刻退出,僅為測試使用,不符合實際的程式碼規範
四、總結分析
測試硬體為Intel(R) Core(TM) i7-4500U CPU @ 1.80GHz
樣本大小:100'000個
主執行緒得知佇列空的情況後,加入一個時間統計的功能,分別得出一秒鐘操作的速度:
- 執行緒完全非同步:20000'000 次/每秒
- 使用sem保證實時執行緒同步:1960'000 次/每秒
- 使用條件變數保持同步:9000'000 次/每秒
參考文章:
[1] Linux C++的多執行緒程式設計, http://www.cnblogs.com/youtherhome/archive/2013/03/17/2964195.html
[2] 多執行緒程式設計-互斥鎖 http://blog.chinaunix.net/uid-21411227-id-1826888.html