1. 程式人生 > >libevent原始碼分析--鎖和多執行緒

libevent原始碼分析--鎖和多執行緒

寫在前面:

​ 這個原始碼是分析libevent-2.0.20-stable, 並非最新版本的libevent,作者並沒有全看原始碼,在這裡會推薦以下參考的一些網站,也歡迎大家在不足的地方提出來進行討論。

​ libevent的內部實現不需要多執行緒,為此我們在libevent的原始碼中也看不到有關執行緒的介面。但是我們很有可能在某個多執行緒程式中使用libevent,那麼就需要保證執行緒共享的結構體是執行緒安全的,為此libevent提供了鎖和條件變數來確保執行緒的同步。

​ libevent的結構體在多執行緒下通常有三種方式:

  • 某些結構體只能使用在單執行緒:同時在多個執行緒中使用它們總是不安全的。

  • 某些結構具有可選擇的鎖: 可以告知libevent是否需要在多個執行緒中使用每個物件。

  • 某些結構體總是鎖定的:如果libevent在支援鎖的配置下執行,在多執行緒中使用它們總是安全的。

如果想要開啟多執行緒模式,需要呼叫eventhread_use_pthreads()函式

int
evthread_use_pthreads(void)
{
    struct evthread_lock_callbacks cbs = {
        EVTHREAD_LOCK_API_VERSION,
        EVTHREAD_LOCKTYPE_RECURSIVE,
        evthread_posix_lock_alloc,
        evthread_posix_lock_free,
        evthread_posix_lock,
        evthread_posix_unlock
    };
    struct evthread_condition_callbacks cond_cbs = {
        EVTHREAD_CONDITION_API_VERSION,
        evthread_posix_cond_alloc,
        evthread_posix_cond_free,
        evthread_posix_cond_signal,
        evthread_posix_cond_wait
    };
    /* Set ourselves up to get recursive locks. */
    if (pthread_mutexattr_init(&attr_recursive))
        return -1;
    if (pthread_mutexattr_settype(&attr_recursive, PTHREAD_MUTEX_RECURSIVE))
        return -1;
​
    evthread_set_lock_callbacks(&cbs);
    evthread_set_condition_callbacks(&cond_cbs);
    evthread_set_id_callback(evthread_posix_get_id);
    return 0;
}

​ 接下來我們就針對這個函式,自頂向下看看原始碼的實現,首先我們來看看鎖回撥結構體的實現

struct evthread_lock_callbacks {
    int lock_api_version;
    //版本號,預設設定為巨集EVTHREAD_LOCK_API_VERSION
    unsigned supported_locktypes;
    //支援的鎖型別,有普通鎖,遞迴鎖,讀寫鎖三種
    
    //分配一個鎖變數(指標型別),因為不同的平臺鎖變數是不同的型別
    //所以用這個通用的void*型別
    void *(*alloc)(unsigned locktype);  
    void (*free)(void *lock, unsigned locktype);  
    int (*lock)(unsigned mode, void *lock);     
    int (*unlock)(unsigned mode, void *lock);
};

​ 第一個元素是api_version,但是似乎在libevent中,都只有一個版本被巨集定義成了1

#define EVTHREAD_LOCK_API_VERSION 1
#define EVTHREAD_CONDITION_API_VERSION 1

​ 但是在我們實現的時候,我們可以自定義自己的鎖版本和實現。

​ 目前Libevent支援的locktype鎖型別有三種:

  • 普通鎖, 值為0

  • 遞迴鎖, 值為EVTHREAD_LOCKTYPE_RECURSIVE

  • 讀寫鎖, 值為EVTHREAD_LOCKTYPE_READWRITE

    引數mode(鎖模式)則取下面的值:

  • EVTHREAD_READ:僅用於讀寫鎖:為讀操作請求或者釋放鎖

  • EVTHREAD_WRITE:僅用於讀寫鎖:為寫操作請求或者釋放鎖

  • EVTHREAD_TRY:僅用於鎖定:僅在可以立刻鎖定的時候才請求鎖定

  剩餘的元素,我們可以自己定製屬於自己的鎖函式或者呼叫系統預設的版本,接下來我們先看看系統預設版本的實現。

​ 系統預設的實現封裝在evthread_pthread.c檔案中(ps: 本文只討論跨平臺鎖的實現,但是不同時分析兩個平臺)

static void *
evthread_posix_lock_alloc(unsigned locktype)
{
    pthread_mutexattr_t *attr = NULL;
    pthread_mutex_t *lock = mm_malloc(sizeof(pthread_mutex_t)); //動態分配
    if (!lock)
        return NULL;
    if (locktype & EVTHREAD_LOCKTYPE_RECURSIVE) //是否設定遞迴屬性
        attr = &attr_recursive;
    if (pthread_mutex_init(lock, attr)) {
        mm_free(lock);
        return NULL;
    }
    return lock;
}
​
static void
evthread_posix_lock_free(void *_lock, unsigned locktype)
{
    pthread_mutex_t *lock = _lock;
    pthread_mutex_destroy(lock);
    mm_free(lock);  
}
​
static int
evthread_posix_lock(unsigned mode, void *_lock)
{
    pthread_mutex_t *lock = _lock;
    if (mode & EVTHREAD_TRY)
        return pthread_mutex_trylock(lock); //僅在可以立刻鎖定的時候才請求鎖定
    else
        return pthread_mutex_lock(lock);
}
​
static int
evthread_posix_unlock(unsigned mode, void *_lock)
{
    pthread_mutex_t *lock = _lock;
    return pthread_mutex_unlock(lock);
}

​ 條件變數的實現也是了類似,簡單的對於condiiton進行封裝就不在贅述了。

​ 我們就接著分析如何實現自定義鎖的實現。

​ 首先類似於前面的實現,libevent在實現的時候,定義了兩個全域性靜態變數

GLOBAL struct evthread_lock_callbacks _evthread_lock_fns = {
    0, 0, NULL, NULL, NULL, NULL
};
GLOBAL struct evthread_condition_callbacks _evthread_cond_fns = {
    0, NULL, NULL, NULL, NULL
};

​ 然後當我們呼叫回撥設定函式進行函式,下面舉例lock的回撥實現

int
evthread_set_lock_callbacks(const struct evthread_lock_callbacks *cbs)
{
    // for debug
    struct evthread_lock_callbacks *target =
        _evthread_lock_debugging_enabled
        ? &_original_lock_fns : &_evthread_lock_fns;
​
    if (!cbs) {
        if (target->alloc)
            event_warnx("Trying to disable lock functions after "
                "they have been set up will probaby not work.");
        memset(target, 0, sizeof(_evthread_lock_fns));
        return 0;
    }
    if (target->alloc)    //如果存在的話,有可能分配了鎖,如果更改會出現dump core
    {
        /* Uh oh; we already had locking callbacks set up.*/
        if (target->lock_api_version == cbs->lock_api_version &&
            target->supported_locktypes == cbs->supported_locktypes &&
            target->alloc == cbs->alloc &&
            target->free == cbs->free &&
            target->lock == cbs->lock &&
            target->unlock == cbs->unlock) {
            /* no change -- allow this. */
            return 0;
        }
        event_warnx("Can't change lock callbacks once they have been "
            "initialized.");
        return -1;
    }
    if (cbs->alloc && cbs->free && cbs->lock && cbs->unlock) 
    {
        memcpy(target, cbs, sizeof(_evthread_lock_fns));
        return event_global_setup_locks_(1); 
    } 
    else 
    {
        return -1;
    }
}

​ 一旦使用者呼叫evthread_use_windows_threads()或者evthread_use_pthreads()函式,那麼使用者就為libevent定製了自己的執行緒鎖操作。Libevent的其他程式碼中,如果需要用到鎖,就會去呼叫這些執行緒鎖操作。在實現上,當呼叫evthread_use_windows_threads()或者evthread_use_pthreads()函式時,兩個函式的內部都會呼叫evthread_set_lock_callbacks函式。而這個設定函式會把前面兩個evthread_use_xxx函式中定義的cbs變數值複製到一個evthread_lock_callbacks型別的evthread_lock_fns全域性變數儲存起來。以後,Libevent需要用到多執行緒鎖操作,直接訪問這個evthread_lock_fn變數即可。對於條件變數,也是用這樣方式實現的。

​ 但是如果使用者想要自定義自己的鎖,不可以呼叫evthread_use_windows_threads()或者evthread_use_pthreads()函式,可以仿照evthread_use_pthreads()實現,呼叫三個執行緒回撥函式,去初始話三個全域性變數即可。

執行緒id封裝

​ 在多執行緒程式中,使用getpid()函式是不正確的,因為在多執行緒程式中,同一程序的不同的執行緒getpid()函式呼叫的結果是一致的,這個時候我們並不能區分出執行緒,我們可以通過封裝一個執行緒getpid()來解決。

​ 執行緒的封裝,初始化,實現和上述鎖的實現技巧沒有任何區別,我們在這裡重點講講,如何封裝的問題。

​
static unsigned long
evthread_posix_get_id(void)
{
    union {
        pthread_t thr;
#if _EVENT_SIZEOF_PTHREAD_T > _EVENT_SIZEOF_LONG
        ev_uint64_t id;
#else
        unsigned long id;
#endif
    } r;
#if _EVENT_SIZEOF_PTHREAD_T < _EVENT_SIZEOF_LONG
    memset(&r, 0, sizeof(r));
#endif
    r.thr = pthread_self();
    return (unsigned long)r.id;
}

​ pthread_self是POSIX實現的,它返回一個由pthread_t資料型別表示的執行緒ID,在Linux系統中用無符號整形來表示pthread_t資料型別。

​ POSIX執行緒ID的分配和維護是由執行緒實現決定的,它表示的是同一程序中各個執行緒的獨立標識,所以由pthread_self返回的執行緒ID僅在其所屬的程序上下文中才有意義。

​ 在實現上,其實pthread_self 即是獲取執行緒控制塊tcb首地址 相對於程序資料的段的偏移, 注:pthread_create也是返回該值。只能用來描述統一程序中的不同執行緒,不能適用於不同程序間的執行緒的區分。

Debug鎖使用及實現

​ Libevent還支援對鎖操作的一些檢測,進而捕抓一些典型的鎖錯誤。Libevent檢查:

  • 解鎖自己(執行緒)沒有持有的鎖

  • 在未解鎖前,自己(執行緒)再次鎖定一個非遞迴鎖。

​ Libevent通過一些變數記錄鎖的使用情況,當檢查到這些鎖的錯誤使用時,就呼叫abort,退出執行。

​ 使用者只需在呼叫evthread_use_pthreads或者evthread_use_windows_threads之後,呼叫evthread_enable_lock_debuging()函式即可開啟除錯鎖的功能。該函式有一個拼寫錯誤。在2.1.2-alpha版本中會改正為evthread_enable_lock_debugging,為了後向相容,兩者都會支援的。

​ 現在看一下Libevent是鎖除錯功能。

void
evthread_enable_lock_debuging(void)
{
    struct evthread_lock_callbacks cbs = {
        EVTHREAD_LOCK_API_VERSION,
        EVTHREAD_LOCKTYPE_RECURSIVE,
        debug_lock_alloc,
        debug_lock_free,
        debug_lock_lock,
        debug_lock_unlock
    };
    if (_evthread_lock_debugging_enabled)
        return;
 
    //把當前使用者定製的鎖操作複製到_original_lock_fns結構體變數中。
    memcpy(&_original_lock_fns, &_evthread_lock_fns,
        sizeof(struct evthread_lock_callbacks));
 
    //將當前的鎖操作設定成除錯鎖操作。但除錯鎖操作函式內部
    //還是使用_original_lock_fns的鎖操作函式
    memcpy(&_evthread_lock_fns, &cbs,
        sizeof(struct evthread_lock_callbacks));
 
    memcpy(&_original_cond_fns, &_evthread_cond_fns,
        sizeof(struct evthread_condition_callbacks));
    _evthread_cond_fns.wait_condition = debug_cond_wait;
    _evthread_lock_debugging_enabled = 1;
 
    /* XXX return value should get checked. */
    event_global_setup_locks_(0);
}

​ 現在看看Libevent是怎麼除錯(更準確來說,應該是檢測)鎖的。鎖的檢測,需要用到debug_lock 結構體,它對鎖的一些使用狀態進行了記錄。

struct debug_lock 
{
    unsigned locktype; //鎖的型別
    unsigned long held_by; //這個鎖是被哪個執行緒所擁有
    /* XXXX if we ever use read-write locks, we will need a separate
     * lock to protect count. */
    int count; //這個鎖的加鎖次數
    void *lock; //鎖型別,在pthreads下為pthread_mutex_t*型別
};

​ 當然也對其的加鎖函式進行了封裝,主要是對上述的成員進行操作

static void *
debug_lock_alloc(unsigned locktype)
{
    struct debug_lock *result = mm_malloc(sizeof(struct debug_lock));
    if (!result)
        return NULL;
    if (_original_lock_fns.alloc) {
        if (!(result->lock = _original_lock_fns.alloc(
                locktype|EVTHREAD_LOCKTYPE_RECURSIVE))) {
            mm_free(result);
            return NULL;
        }
    } else {
        result->lock = NULL;
    }
    result->locktype = locktype;
    result->count = 0;      //記錄加鎖次數
    result->held_by = 0;    
    return result;
}
​
​
static void
debug_lock_free(void *lock_, unsigned locktype)
{
    struct debug_lock *lock = lock_;
    EVUTIL_ASSERT(lock->count == 0); //如果不等於0,還有地方持有鎖
    EVUTIL_ASSERT(locktype == lock->locktype);
    if (_original_lock_fns.free) {
        _original_lock_fns.free(lock->lock,
            lock->locktype|EVTHREAD_LOCKTYPE_RECURSIVE);
    }
    lock->lock = NULL;
    lock->count = -100;
    mm_free(lock);
}
​
static int
debug_lock_lock(unsigned mode, void *lock_)
{
    struct debug_lock *lock = lock_;
    int res = 0;
    if (lock->locktype & EVTHREAD_LOCKTYPE_READWRITE)
        EVUTIL_ASSERT(mode & (EVTHREAD_READ|EVTHREAD_WRITE));
    else
        EVUTIL_ASSERT((mode & (EVTHREAD_READ|EVTHREAD_WRITE)) == 0);
    if (_original_lock_fns.lock)
        res = _original_lock_fns.lock(mode, lock->lock);
    if (!res) {
        evthread_debug_lock_mark_locked(mode, lock);
    }
    return res;
}
​
static void
evthread_debug_lock_mark_locked(unsigned mode, struct debug_lock *lock)
{
    ++lock->count;  //增加鎖定的次數
    if (!(lock->locktype & EVTHREAD_LOCKTYPE_RECURSIVE)) //不是遞迴鎖就報錯
        EVUTIL_ASSERT(lock->count == 1);
    if (_evthread_id_fn) {    //存在gettid函式
        unsigned long me;
        me = _evthread_id_fn();  
        if (lock->count > 1) 
            EVUTIL_ASSERT(lock->held_by == me); //遞迴鎖必須保證在同一執行緒
        lock->held_by = me;
    }
}

​ 加鎖解鎖的方式相反的,所以就不在贅述。

​ 另外,在libevent中,並不是呼叫上述函式,而是用巨集進一步包裝,向下面一樣。

#define EVLOCK_LOCK(lockvar,mode)                   \
    do {                                \
        if (lockvar)                        \
            _evthread_lock_fns.lock(mode, lockvar);     \
    } while (0)
​
#define EVLOCK_UNLOCK(lockvar,mode)                 \
    do {                                \
        if (lockvar)                        \
            _evthread_lock_fns.unlock(mode, lockvar);   \
    } while (0)