libevent原始碼分析--鎖和多執行緒
寫在前面:
這個原始碼是分析libevent-2.0.20-stable, 並非最新版本的libevent,作者並沒有全看原始碼,在這裡會推薦以下參考的一些網站,也歡迎大家在不足的地方提出來進行討論。
鎖
libevent的內部實現不需要多執行緒,為此我們在libevent的原始碼中也看不到有關執行緒的介面。但是我們很有可能在某個多執行緒程式中使用libevent,那麼就需要保證執行緒共享的結構體是執行緒安全的,為此libevent提供了鎖和條件變數來確保執行緒的同步。
libevent的結構體在多執行緒下通常有三種方式:
-
某些結構體只能使用在單執行緒:同時在多個執行緒中使用它們總是不安全的。
-
某些結構具有可選擇的鎖: 可以告知libevent是否需要在多個執行緒中使用每個物件。
-
某些結構體總是鎖定的:如果libevent在支援鎖的配置下執行,在多執行緒中使用它們總是安全的。
如果想要開啟多執行緒模式,需要呼叫eventhread_use_pthreads()函式
int evthread_use_pthreads(void) { struct evthread_lock_callbacks cbs = { EVTHREAD_LOCK_API_VERSION, EVTHREAD_LOCKTYPE_RECURSIVE, evthread_posix_lock_alloc, evthread_posix_lock_free, evthread_posix_lock, evthread_posix_unlock }; struct evthread_condition_callbacks cond_cbs = { EVTHREAD_CONDITION_API_VERSION, evthread_posix_cond_alloc, evthread_posix_cond_free, evthread_posix_cond_signal, evthread_posix_cond_wait }; /* Set ourselves up to get recursive locks. */ if (pthread_mutexattr_init(&attr_recursive)) return -1; if (pthread_mutexattr_settype(&attr_recursive, PTHREAD_MUTEX_RECURSIVE)) return -1; evthread_set_lock_callbacks(&cbs); evthread_set_condition_callbacks(&cond_cbs); evthread_set_id_callback(evthread_posix_get_id); return 0; }
接下來我們就針對這個函式,自頂向下看看原始碼的實現,首先我們來看看鎖回撥結構體的實現
struct evthread_lock_callbacks { int lock_api_version; //版本號,預設設定為巨集EVTHREAD_LOCK_API_VERSION unsigned supported_locktypes; //支援的鎖型別,有普通鎖,遞迴鎖,讀寫鎖三種 //分配一個鎖變數(指標型別),因為不同的平臺鎖變數是不同的型別 //所以用這個通用的void*型別 void *(*alloc)(unsigned locktype); void (*free)(void *lock, unsigned locktype); int (*lock)(unsigned mode, void *lock); int (*unlock)(unsigned mode, void *lock); };
第一個元素是api_version,但是似乎在libevent中,都只有一個版本被巨集定義成了1
#define EVTHREAD_LOCK_API_VERSION 1 #define EVTHREAD_CONDITION_API_VERSION 1
但是在我們實現的時候,我們可以自定義自己的鎖版本和實現。
目前Libevent支援的locktype鎖型別有三種:
-
普通鎖, 值為0
-
遞迴鎖, 值為EVTHREAD_LOCKTYPE_RECURSIVE
-
讀寫鎖, 值為EVTHREAD_LOCKTYPE_READWRITE
引數mode(鎖模式)則取下面的值:
-
EVTHREAD_READ:僅用於讀寫鎖:為讀操作請求或者釋放鎖
-
EVTHREAD_WRITE:僅用於讀寫鎖:為寫操作請求或者釋放鎖
-
EVTHREAD_TRY:僅用於鎖定:僅在可以立刻鎖定的時候才請求鎖定
剩餘的元素,我們可以自己定製屬於自己的鎖函式或者呼叫系統預設的版本,接下來我們先看看系統預設版本的實現。
系統預設的實現封裝在evthread_pthread.c檔案中(ps: 本文只討論跨平臺鎖的實現,但是不同時分析兩個平臺)
static void * evthread_posix_lock_alloc(unsigned locktype) { pthread_mutexattr_t *attr = NULL; pthread_mutex_t *lock = mm_malloc(sizeof(pthread_mutex_t)); //動態分配 if (!lock) return NULL; if (locktype & EVTHREAD_LOCKTYPE_RECURSIVE) //是否設定遞迴屬性 attr = &attr_recursive; if (pthread_mutex_init(lock, attr)) { mm_free(lock); return NULL; } return lock; } static void evthread_posix_lock_free(void *_lock, unsigned locktype) { pthread_mutex_t *lock = _lock; pthread_mutex_destroy(lock); mm_free(lock); } static int evthread_posix_lock(unsigned mode, void *_lock) { pthread_mutex_t *lock = _lock; if (mode & EVTHREAD_TRY) return pthread_mutex_trylock(lock); //僅在可以立刻鎖定的時候才請求鎖定 else return pthread_mutex_lock(lock); } static int evthread_posix_unlock(unsigned mode, void *_lock) { pthread_mutex_t *lock = _lock; return pthread_mutex_unlock(lock); }
條件變數的實現也是了類似,簡單的對於condiiton進行封裝就不在贅述了。
我們就接著分析如何實現自定義鎖的實現。
首先類似於前面的實現,libevent在實現的時候,定義了兩個全域性靜態變數
GLOBAL struct evthread_lock_callbacks _evthread_lock_fns = { 0, 0, NULL, NULL, NULL, NULL }; GLOBAL struct evthread_condition_callbacks _evthread_cond_fns = { 0, NULL, NULL, NULL, NULL };
然後當我們呼叫回撥設定函式進行函式,下面舉例lock的回撥實現
int evthread_set_lock_callbacks(const struct evthread_lock_callbacks *cbs) { // for debug struct evthread_lock_callbacks *target = _evthread_lock_debugging_enabled ? &_original_lock_fns : &_evthread_lock_fns; if (!cbs) { if (target->alloc) event_warnx("Trying to disable lock functions after " "they have been set up will probaby not work."); memset(target, 0, sizeof(_evthread_lock_fns)); return 0; } if (target->alloc) //如果存在的話,有可能分配了鎖,如果更改會出現dump core { /* Uh oh; we already had locking callbacks set up.*/ if (target->lock_api_version == cbs->lock_api_version && target->supported_locktypes == cbs->supported_locktypes && target->alloc == cbs->alloc && target->free == cbs->free && target->lock == cbs->lock && target->unlock == cbs->unlock) { /* no change -- allow this. */ return 0; } event_warnx("Can't change lock callbacks once they have been " "initialized."); return -1; } if (cbs->alloc && cbs->free && cbs->lock && cbs->unlock) { memcpy(target, cbs, sizeof(_evthread_lock_fns)); return event_global_setup_locks_(1); } else { return -1; } }
一旦使用者呼叫evthread_use_windows_threads()或者evthread_use_pthreads()函式,那麼使用者就為libevent定製了自己的執行緒鎖操作。Libevent的其他程式碼中,如果需要用到鎖,就會去呼叫這些執行緒鎖操作。在實現上,當呼叫evthread_use_windows_threads()或者evthread_use_pthreads()函式時,兩個函式的內部都會呼叫evthread_set_lock_callbacks函式。而這個設定函式會把前面兩個evthread_use_xxx函式中定義的cbs變數值複製到一個evthread_lock_callbacks型別的evthread_lock_fns全域性變數儲存起來。以後,Libevent需要用到多執行緒鎖操作,直接訪問這個evthread_lock_fn變數即可。對於條件變數,也是用這樣方式實現的。
但是如果使用者想要自定義自己的鎖,不可以呼叫evthread_use_windows_threads()或者evthread_use_pthreads()函式,可以仿照evthread_use_pthreads()實現,呼叫三個執行緒回撥函式,去初始話三個全域性變數即可。
執行緒id封裝
在多執行緒程式中,使用getpid()函式是不正確的,因為在多執行緒程式中,同一程序的不同的執行緒getpid()函式呼叫的結果是一致的,這個時候我們並不能區分出執行緒,我們可以通過封裝一個執行緒getpid()來解決。
執行緒的封裝,初始化,實現和上述鎖的實現技巧沒有任何區別,我們在這裡重點講講,如何封裝的問題。
static unsigned long evthread_posix_get_id(void) { union { pthread_t thr; #if _EVENT_SIZEOF_PTHREAD_T > _EVENT_SIZEOF_LONG ev_uint64_t id; #else unsigned long id; #endif } r; #if _EVENT_SIZEOF_PTHREAD_T < _EVENT_SIZEOF_LONG memset(&r, 0, sizeof(r)); #endif r.thr = pthread_self(); return (unsigned long)r.id; }
pthread_self是POSIX實現的,它返回一個由pthread_t資料型別表示的執行緒ID,在Linux系統中用無符號整形來表示pthread_t資料型別。
POSIX執行緒ID的分配和維護是由執行緒實現決定的,它表示的是同一程序中各個執行緒的獨立標識,所以由pthread_self返回的執行緒ID僅在其所屬的程序上下文中才有意義。
在實現上,其實pthread_self 即是獲取執行緒控制塊tcb首地址 相對於程序資料的段的偏移, 注:pthread_create也是返回該值。只能用來描述統一程序中的不同執行緒,不能適用於不同程序間的執行緒的區分。
Debug鎖使用及實現
Libevent還支援對鎖操作的一些檢測,進而捕抓一些典型的鎖錯誤。Libevent檢查:
-
解鎖自己(執行緒)沒有持有的鎖
-
在未解鎖前,自己(執行緒)再次鎖定一個非遞迴鎖。
Libevent通過一些變數記錄鎖的使用情況,當檢查到這些鎖的錯誤使用時,就呼叫abort,退出執行。
使用者只需在呼叫evthread_use_pthreads或者evthread_use_windows_threads之後,呼叫evthread_enable_lock_debuging()函式即可開啟除錯鎖的功能。該函式有一個拼寫錯誤。在2.1.2-alpha版本中會改正為evthread_enable_lock_debugging,為了後向相容,兩者都會支援的。
現在看一下Libevent是鎖除錯功能。
void evthread_enable_lock_debuging(void) { struct evthread_lock_callbacks cbs = { EVTHREAD_LOCK_API_VERSION, EVTHREAD_LOCKTYPE_RECURSIVE, debug_lock_alloc, debug_lock_free, debug_lock_lock, debug_lock_unlock }; if (_evthread_lock_debugging_enabled) return; //把當前使用者定製的鎖操作複製到_original_lock_fns結構體變數中。 memcpy(&_original_lock_fns, &_evthread_lock_fns, sizeof(struct evthread_lock_callbacks)); //將當前的鎖操作設定成除錯鎖操作。但除錯鎖操作函式內部 //還是使用_original_lock_fns的鎖操作函式 memcpy(&_evthread_lock_fns, &cbs, sizeof(struct evthread_lock_callbacks)); memcpy(&_original_cond_fns, &_evthread_cond_fns, sizeof(struct evthread_condition_callbacks)); _evthread_cond_fns.wait_condition = debug_cond_wait; _evthread_lock_debugging_enabled = 1; /* XXX return value should get checked. */ event_global_setup_locks_(0); }
現在看看Libevent是怎麼除錯(更準確來說,應該是檢測)鎖的。鎖的檢測,需要用到debug_lock 結構體,它對鎖的一些使用狀態進行了記錄。
struct debug_lock { unsigned locktype; //鎖的型別 unsigned long held_by; //這個鎖是被哪個執行緒所擁有 /* XXXX if we ever use read-write locks, we will need a separate * lock to protect count. */ int count; //這個鎖的加鎖次數 void *lock; //鎖型別,在pthreads下為pthread_mutex_t*型別 };
當然也對其的加鎖函式進行了封裝,主要是對上述的成員進行操作
static void * debug_lock_alloc(unsigned locktype) { struct debug_lock *result = mm_malloc(sizeof(struct debug_lock)); if (!result) return NULL; if (_original_lock_fns.alloc) { if (!(result->lock = _original_lock_fns.alloc( locktype|EVTHREAD_LOCKTYPE_RECURSIVE))) { mm_free(result); return NULL; } } else { result->lock = NULL; } result->locktype = locktype; result->count = 0; //記錄加鎖次數 result->held_by = 0; return result; } static void debug_lock_free(void *lock_, unsigned locktype) { struct debug_lock *lock = lock_; EVUTIL_ASSERT(lock->count == 0); //如果不等於0,還有地方持有鎖 EVUTIL_ASSERT(locktype == lock->locktype); if (_original_lock_fns.free) { _original_lock_fns.free(lock->lock, lock->locktype|EVTHREAD_LOCKTYPE_RECURSIVE); } lock->lock = NULL; lock->count = -100; mm_free(lock); } static int debug_lock_lock(unsigned mode, void *lock_) { struct debug_lock *lock = lock_; int res = 0; if (lock->locktype & EVTHREAD_LOCKTYPE_READWRITE) EVUTIL_ASSERT(mode & (EVTHREAD_READ|EVTHREAD_WRITE)); else EVUTIL_ASSERT((mode & (EVTHREAD_READ|EVTHREAD_WRITE)) == 0); if (_original_lock_fns.lock) res = _original_lock_fns.lock(mode, lock->lock); if (!res) { evthread_debug_lock_mark_locked(mode, lock); } return res; } static void evthread_debug_lock_mark_locked(unsigned mode, struct debug_lock *lock) { ++lock->count; //增加鎖定的次數 if (!(lock->locktype & EVTHREAD_LOCKTYPE_RECURSIVE)) //不是遞迴鎖就報錯 EVUTIL_ASSERT(lock->count == 1); if (_evthread_id_fn) { //存在gettid函式 unsigned long me; me = _evthread_id_fn(); if (lock->count > 1) EVUTIL_ASSERT(lock->held_by == me); //遞迴鎖必須保證在同一執行緒 lock->held_by = me; } }
加鎖解鎖的方式相反的,所以就不在贅述。
另外,在libevent中,並不是呼叫上述函式,而是用巨集進一步包裝,向下面一樣。
#define EVLOCK_LOCK(lockvar,mode) \ do { \ if (lockvar) \ _evthread_lock_fns.lock(mode, lockvar); \ } while (0) #define EVLOCK_UNLOCK(lockvar,mode) \ do { \ if (lockvar) \ _evthread_lock_fns.unlock(mode, lockvar); \ } while (0)