1. 程式人生 > >c語言資料結構應用-陣列佇列(無鎖佇列)在多執行緒中的使用

c語言資料結構應用-陣列佇列(無鎖佇列)在多執行緒中的使用

一、背景

上篇文章《c語言資料結構實現-陣列佇列/環形佇列》講述了陣列佇列的原理與實現,本文編寫一個雙執行緒進行速度測試

二、相關知識

多執行緒程式設計介面:

1) 建立執行緒 pthread_create 函式

SYNOPSIS
       #include <pthread.h>

       int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
                          void *(*start_routine) (void *), void *arg);

       Compile and link with -pthread.
DESCRIPTION:
       The pthread_create() function starts a new thread in the calling process.  The new thread starts execution by invoking start_routine(); arg is passed as the sole argument of start_routine().
RETURN VALUE:
       On success, pthread_create() returns 0; on error, it returns an error number, and the contents of *thread are undefined.
2) 設定執行緒屬性 pthread_attr_setdetachstate
SYNOPSIS
       #include <pthread.h>
       int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
       Compile and link with -pthread.

DESCRIPTION
       The  pthread_attr_setdetachstate() function sets the detach state attribute of the thread attributes object referred to by attr to the value specified in detachstate.  The detach state attribute determines whether a thread created using the thread attributes object attr will be created in a joinable or a detached state.

RETURN VALUE
       On success, these functions return 0; on error, they return a nonzero error number.
3) 執行緒鎖 pthread_mutex_init、pthread_mutex_lock、pthread_mutex_unlock、pthread_mutex_destory

執行緒鎖是用於多執行緒直接的同步,用來保護臨界資源(數值佇列),同樣類似的功能有 Posix 的 sem訊號量。

4) 條件變數 pthread_cond_init、pthread_cond_signal、pthread_cond_wait、pthread_cond_destroy

條件變數是同步執行緒的一種機制,它允許執行緒掛起,讓出處理器等待其他執行緒向它傳送訊號,該執行緒收到該訊號後被喚醒繼續執行程式。

、實現

資料結構 test_t 為使用者資料,instance_t 為程式上下文

typedef struct item_t
{
    int value;
} test_t;

typedef struct instance 
{
    sem_t sem_mutex;
    pthread_mutex_t thd_mutex;
    pthread_cond_t thd_cond;
    bufqueue_t queue;
} instance_t;

主執行緒建立子執行緒,並生產資料進行入列操作,順序地生產 times 個 pitem 資料進行入列,生產完成後等待子執行緒消費

執行緒的工作方式分別有三種:執行緒完全非同步的機制、使用sem保證實時執行緒同步、使用條件變數保持同步

int test_mutilthread(int times)
{
    int ret = FAILURE;
    int ix = 0;

    pthread_t tid = 0;
    pthread_attr_t attr;

    instance_t inst = {.queue = {0}};
    test_t *pitem = NULL;

#ifdef TEST_SEM
    printf("Sem sem_mutex mode\n");
    assert(SUCCESS == ( ret = sem_init(&inst.sem_mutex, 0, 0) ));
#elif defined(TEST_COND)
    printf("Pthread Cond mode\n");
    assert(SUCCESS == ( ret = pthread_mutex_init(&inst.thd_mutex, NULL) ));
    assert(SUCCESS == ( ret = pthread_cond_init(&inst.thd_cond, NULL) ));
#endif
    assert(SUCCESS == ( ret = bufqueue_create(&inst.queue, times, sizeof(test_t)) ));
    assert(SUCCESS == ( ret = pthread_attr_init(&attr) ));
    assert(SUCCESS == ( ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) ));
    assert(SUCCESS == ( ret = pthread_create(&tid, NULL, __do_customer, &inst) ));

    /* Producer */
    for ( ix = 1; ix < times; ix++ ) {
        assert(NULL != ( pitem = (test_t*)bufqueue_tail(&inst.queue) ));
        /* Set custom item */
        pitem->value = ix;
        assert(SUCCESS == ( ret = bufqueue_push(&inst.queue) ));
        printf("Enqueue        %u/%u\n", pitem->value, ix);
#ifdef TEST_SEM
        sem_post(&inst.sem_mutex);
#elif defined(TEST_COND)
        pthread_cond_signal(&inst.thd_cond);
#endif
    }

    while ( !bufqueue_isempty(&inst.queue) ) {
        printf("Wait dequeue...\n");
        usleep(300);
    }

    ret = SUCCESS;
_E1:
    pthread_attr_destroy(&attr);
    bufqueue_destroy(&inst.queue, free);
#ifdef TEST_SEM
    sem_destroy(&inst.sem_mutex);
#elif defined(TEST_COND)
    pthread_cond_destroy(&inst.thd_cond);
    pthread_mutex_destroy(&inst.thd_mutex);
#endif
    return ret;
}
子執行緒負責取佇列中的資料,取成功 times 次後就退出

執行緒的工作方式分別也有三種:執行緒完全非同步的機制、使用sem保證實時執行緒同步、使用條件變數保持同步

static void *__do_customer(void *args)
{
    instance_t *pinst = (instance_t *)args;
    test_t *pitem = NULL;

    u32 cnt = 1;
    u32 max = 0;

    ASSERT_FAIL(NULL, args);

    for ( max = pinst->queue.size; cnt < max; ) {
#ifdef TEST_SEM
        sem_wait(&pinst->sem_mutex);
#elif defined(TEST_COND)
        if ( bufqueue_isempty(&pinst->queue) ) {
            pthread_mutex_lock(&pinst->thd_mutex);
            pthread_cond_wait(&pinst->thd_cond, &pinst->thd_mutex);
            pthread_mutex_unlock(&pinst->thd_mutex);
            printf("Wait enqueue...\n");
            continue;
        }
#else
        if ( bufqueue_isempty(&pinst->queue) ) {
            usleep(300);
            printf("Wait enqueue...\n");
            continue;
        }
#endif
        assert(NULL != (pitem = (test_t*)bufqueue_pop(&pinst->queue) ));
        assert(cnt++ == pitem->value);
        printf("Dequeue        %u/%u\n", pitem->value, cnt);
    }

    printf("Dequeue Done\n");
_E1:
    return NULL;
}

main函式
int main(int argc, char *argv[])
{
    int ret = FAILURE;
    u32 times = 1024;

    if ( argc > 1 ) {
        times = atoi(argv[1]);
    }

    ret = test_mutilthread(times)
    printf("Test result:\t\t\t\t[%s]\n", ret ? "FAILURE" : "SUCCESS");
    exit(ret ? 1 : 0);
} 

注意以上寫法為原型斷言的寫法,保證斷言失敗時程式立刻退出,僅為測試使用,不符合實際的程式碼規範

四、總結分析

測試硬體為Intel(R) Core(TM) i7-4500U CPU @ 1.80GHz

樣本大小:100'000個

主執行緒得知佇列空的情況後,加入一個時間統計的功能,分別得出一秒鐘操作的速度:

  1. 執行緒完全非同步:20000'000 次/每秒
  2. 使用sem保證實時執行緒同步:1960'000 次/每秒
  3. 使用條件變數保持同步:9000'000 次/每秒
陣列佇列在三種工作方式下均能成功執行,速度上為無鎖的情況為最快,CPU佔用也是最大(usleep輪詢) 綜合結果考慮來說使用條件變數正好可以避免 子執行緒過多地佔用CPU進行輪詢操作,速度也僅為無鎖的一半

參考文章:

[1] Linux C++的多執行緒程式設計, http://www.cnblogs.com/youtherhome/archive/2013/03/17/2964195.html

[2] 多執行緒程式設計-互斥鎖 http://blog.chinaunix.net/uid-21411227-id-1826888.html