深入理解GCD之dispatch_semaphore
原文連結 深入理解GCD之dispatch_semaphore
再研究完 dispatch_queue
之後,本來是打算進入到 dispath_group
的原始碼,但是 dispath_group
基本是圍繞著 dispatch_semaphore
即訊號量實現的,所以我們先進入到 dispatch_semaphore
的原始碼學習。在GCD中使用 dispatch_semaphore
用來保證資源使用的安全性(佇列的同步執行就是依賴訊號量實現)。可想而知, dispatch_semaphore
的效能應該是不差的。
dispatch_semaphore_t
dispatch_semaphore_s
是訊號量的結構體。程式碼如下:
struct dispatch_semaphore_s { DISPATCH_STRUCT_HEADER(dispatch_semaphore_s, dispatch_semaphore_vtable_s); long dsema_value;//當前訊號量 long dsema_orig;//初始化訊號量 size_t dsema_sent_ksignals; #if USE_MACH_SEM && USE_POSIX_SEM #error "Too many supported semaphore types" #elif USE_MACH_SEM semaphore_t dsema_port; semaphore_t dsema_waiter_port; #elif USE_POSIX_SEM sem_t dsema_sem; #else #error "No supported semaphore type" #endif size_t dsema_group_waiters; struct dispatch_sema_notify_s *dsema_notify_head; //notify連結串列頭部 struct dispatch_sema_notify_s *dsema_notify_tail; //notify連結串列尾部 }; typedef mach_port_tsemaphore_t; struct dispatch_sema_notify_s { struct dispatch_sema_notify_s *volatile dsn_next; //下一個訊號節點 dispatch_queue_t dsn_queue;//操作的佇列 void *dsn_ctxt;//上下文 void (*dsn_func)(void *);//執行函式 }; 複製程式碼
雖然上面還有一些屬性不知道是做什麼作用的,但我們繼續往下走。
dispatch_semaphore_create
dispatch_semaphore_create
用於訊號量的建立。
dispatch_semaphore_t dispatch_semaphore_create(long value) { dispatch_semaphore_t dsema; // If the internal value is negative, then the absolute of the value is // equal to the number of waiting threads. Therefore it is bogus to // initialize the semaphore with a negative value. if (value < 0) {//value必須大於等於0 return NULL; } //申請dispatch_semaphore_s的記憶體 dsema = calloc(1, sizeof(struct dispatch_semaphore_s)); if (fastpath(dsema)) { //設定dispatch_semaphore_s 的操作函式 dsema->do_vtable = &_dispatch_semaphore_vtable; //設定連結串列尾部 dsema->do_next = DISPATCH_OBJECT_LISTLESS; //引用計數 dsema->do_ref_cnt = 1; dsema->do_xref_cnt = 1; //目標佇列的設定 dsema->do_targetq = dispatch_get_global_queue( DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); //當前訊號量和初始化訊號的賦值 dsema->dsema_value = value; dsema->dsema_orig = value; #if USE_POSIX_SEM int ret = sem_init(&dsema->dsema_sem, 0, 0); DISPATCH_SEMAPHORE_VERIFY_RET(ret); #endif } return dsema; } 複製程式碼
上面的原始碼中 dsema->do_vtable = &_dispatch_semaphore_vtable;
_dispatch_semaphore_vtable
定義如下:
const struct dispatch_semaphore_vtable_s _dispatch_semaphore_vtable = { .do_type = DISPATCH_SEMAPHORE_TYPE, .do_kind = "semaphore", .do_dispose = _dispatch_semaphore_dispose, .do_debug = _dispatch_semaphore_debug, }; 複製程式碼
這裡有個 _dispatch_semaphore_dispose
函式就是訊號量的銷燬函式。程式碼如下:
static void _dispatch_semaphore_dispose(dispatch_semaphore_t dsema) { //訊號量的當前值小於初始化,會發生閃退。因為訊號量已經被釋放了 if (dsema->dsema_value < dsema->dsema_orig) { DISPATCH_CLIENT_CRASH( "Semaphore/group object deallocated while in use"); } #if USE_MACH_SEM kern_return_t kr; //釋放訊號,這個訊號是dispatch_semaphore使用的訊號 if (dsema->dsema_port) { kr = semaphore_destroy(mach_task_self(), dsema->dsema_port); DISPATCH_SEMAPHORE_VERIFY_KR(kr); } //釋放訊號,這個訊號是dispatch_group使用的訊號 if (dsema->dsema_waiter_port) { kr = semaphore_destroy(mach_task_self(), dsema->dsema_waiter_port); DISPATCH_SEMAPHORE_VERIFY_KR(kr); } #elif USE_POSIX_SEM int ret = sem_destroy(&dsema->dsema_sem); DISPATCH_SEMAPHORE_VERIFY_RET(ret); #endif _dispatch_dispose(dsema); } 複製程式碼
dispatch_semaphore_wait
建立好一個訊號量後就會開始進入等待訊號發訊息。
long dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout) { //原子性減1,這裡說明dsema_value是當前訊號值,並將新值賦給value long value = dispatch_atomic_dec2o(dsema, dsema_value); dispatch_atomic_acquire_barrier(); if (fastpath(value >= 0)) { //說明有資源可用,直接返回0,表示等到訊號量的資訊了 return 0; } //等待訊號量喚醒或者timeout超時 return _dispatch_semaphore_wait_slow(dsema, timeout); } 複製程式碼
_dispatch_semaphore_wait_slow
在 dispatch_semaphore_wait
中,如果 value
小於 0
,就會執行 _dispatch_semaphore_wait_slow
等待訊號量喚醒或者timeout超時。 _dispatch_semaphore_wait_slow
的程式碼如下:
static long _dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema, dispatch_time_t timeout) { long orig; again: // Mach semaphores appear to sometimes spuriously wake up. Therefore, // we keep a parallel count of the number of times a Mach semaphore is // signaled (6880961). //第一部分: //只要dsema->dsema_sent_ksignals不為零就會進入迴圈 //dispatch_atomic_cmpxchg2o(dsema, dsema_sent_ksignals, orig,orig - 1)的意思是 //dsema->dsema_sent_ksignals如果等於orig,則將orig - 1賦值給dsema_sent_ksignals, //並且返回true,否則返回false。 //如果返回true,說明又獲取了資源 while ((orig = dsema->dsema_sent_ksignals)) { if (dispatch_atomic_cmpxchg2o(dsema, dsema_sent_ksignals, orig, orig - 1)) { return 0; } } #if USE_MACH_SEM mach_timespec_t _timeout; kern_return_t kr; //第二部分:dispatch_semaphore_s中的dsema_port賦值,以懶載入的形式 _dispatch_semaphore_create_port(&dsema->dsema_port); // From xnu/osfmk/kern/sync_sema.c: // wait_semaphore->count = -1; /* we don't keep an actual count */ // // The code above does not match the documentation, and that fact is // not surprising. The documented semantics are clumsy to use in any // practical way. The above hack effectively tricks the rest of the // Mach semaphore logic to behave like the libdispatch algorithm. //第三部分: switch (timeout) { default: //計算剩餘時間,呼叫mach核心的等待函式semaphore_timedwait()進行等待。 //如果在指定時間內沒有得到通知,則會一直阻塞住,監聽dsema_port等待其通知; //當超時的時候,會執行下面的case程式碼(這個default沒有break)。 do { uint64_t nsec = _dispatch_timeout(timeout); _timeout.tv_sec = (typeof(_timeout.tv_sec))(nsec / NSEC_PER_SEC); _timeout.tv_nsec = (typeof(_timeout.tv_nsec))(nsec % NSEC_PER_SEC); kr = slowpath(semaphore_timedwait(dsema->dsema_port, _timeout)); } while (kr == KERN_ABORTED); if (kr != KERN_OPERATION_TIMED_OUT) { DISPATCH_SEMAPHORE_VERIFY_KR(kr); break; } // Fall through and try to undo what the fast path did to // dsema->dsema_value case DISPATCH_TIME_NOW: //若當前訊號量desma_value小於0,對其加一併返回超時訊號KERN_OPERATION_TIMED_OUT。 //KERN_OPERATION_TIMED_OUT代表等待超時而返回 //由於一開始在第一部分程式碼中進行了減1操作,所以需要加1以撤銷之前的操作。 while ((orig = dsema->dsema_value) < 0) { if (dispatch_atomic_cmpxchg2o(dsema, dsema_value, orig, orig + 1)) { return KERN_OPERATION_TIMED_OUT; } } // Another thread called semaphore_signal(). // Fall through and drain the wakeup. case DISPATCH_TIME_FOREVER: //一直等待直到有訊號。當有訊號的時候說明dsema_value大於0,會跳轉到again,重新執行本函式的流程 do { kr = semaphore_wait(dsema->dsema_port); } while (kr == KERN_ABORTED); DISPATCH_SEMAPHORE_VERIFY_KR(kr); break; } #elif USE_POSIX_SEM //此處的程式碼省略,跟上面USE_MACH_SEM程式碼類似 #endif goto again; } 複製程式碼
在上面的原始碼還有幾個地方需要注意:
-
第一部分的那個while迴圈和if條件。在
dsema_sent_ksignals
非0的情況下便會進入while迴圈,if的條件是dsema->dsema_sent_ksignals
如果等於orig
,則將orig - 1
賦值給dsema_sent_ksignals
,並且返回true
,否則返回false
。很明顯,只要能進入迴圈,這個條件是一定成立的,函式直接返回0,表示等到訊號。而在初始化訊號量的時候沒有對dsema_sent_ksignals
賦值,所以就會進入之後的程式碼。也就是說 沒有訊號量的實際通知或者遭受了系統異常通知,並不會解除等待 -
在上面中出現了
semaphore_timedwait
和semaphore_wait
。這些方法是在semaphore.h
中的。所以說 dispatch_semaphore是基於mach核心的訊號量介面實現的 。另外這兩個方法傳入的引數是dsema_port
即dsema_port
被mach核心semaphore監聽,所以我們理解dsema_port
是dispatch_semaphore的訊號。 -
我們回過頭再看一下
dispatch_semaphore_s
結構體中的dsema_waiter_port
。全域性搜尋一下可以發現,這個屬性是用在dispatch_group
中。之前也說了dispatch_group
的實現是基於dispatch_semaphore
,在dispatch_group
裡semaphore_wait
監聽的並不是dsema_port
而是dsema_waiter_port
。
dispatch_semaphore_wait
流程如下圖所示:

dispatch_semaphore_signal
傳送訊號的程式碼相對等待訊號來說簡單很多,它不需要阻塞,只發送喚醒。
long dispatch_semaphore_signal(dispatch_semaphore_t dsema) { dispatch_atomic_release_barrier(); //原子性加1,value大於0 說明有資源立即返回 long value = dispatch_atomic_inc2o(dsema, dsema_value); if (fastpath(value > 0)) { return 0; } if (slowpath(value == LONG_MIN)) { DISPATCH_CLIENT_CRASH("Unbalanced call to dispatch_semaphore_signal()"); } return _dispatch_semaphore_signal_slow(dsema); } 複製程式碼
_dispatch_semaphore_signal_slow
long _dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema) { // Before dsema_sent_ksignals is incremented we can rely on the reference // held by the waiter. However, once this value is incremented the waiter // may return between the atomic increment and the semaphore_signal(), // therefore an explicit reference must be held in order to safely access // dsema after the atomic increment. _dispatch_retain(dsema); (void)dispatch_atomic_inc2o(dsema, dsema_sent_ksignals); #if USE_MACH_SEM _dispatch_semaphore_create_port(&dsema->dsema_port); kern_return_t kr = semaphore_signal(dsema->dsema_port); DISPATCH_SEMAPHORE_VERIFY_KR(kr); #elif USE_POSIX_SEM int ret = sem_post(&dsema->dsema_sem); DISPATCH_SEMAPHORE_VERIFY_RET(ret); #endif _dispatch_release(dsema); return 1; } 複製程式碼
_dispatch_semaphore_signal_slow
的作用就是核心的 semaphore_signal
函式喚醒在 dispatch_semaphore_wait
中等待的執行緒量,然後返回1。
dispatch_semaphore_signal
流程如下圖所示:

總結
-
dispatch_semaphore
是基於mach核心的訊號量介面實現的 -
呼叫
dispatch_semaphore_wait
訊號量減1,呼叫dispatch_semaphore_signal
訊號量加1 -
在
wait
中,訊號量大於等於0代表有資源立即返回,否則等待訊號量或者返回超時;在signal
中,訊號量大於0代表有資源立即返回,否則喚醒某個正在等待的執行緒 -
dispatch_semaphore
利用了兩個變數desma_value
和dsema_sent_ksignals
來處理wait
和signal
,在singnal
中如果有資源,則不需要喚醒執行緒,那麼此時只需要使用desma_value
。當需要喚醒執行緒的時候,傳送的訊號是dsema_sent_ksignals
的值,此時會重新執行wait
的流程,所以在wait
中一開始是用dsema_sent_ksignals
做判斷。 -
再看一下
dispatch_semaphore_s
結構體的變數。
struct dispatch_semaphore_s { DISPATCH_STRUCT_HEADER(dispatch_semaphore_s, dispatch_semaphore_vtable_s); long dsema_value;//當前訊號量 long dsema_orig;//初始化訊號量 size_t dsema_sent_ksignals; //喚醒時候的訊號量 #if USE_MACH_SEM && USE_POSIX_SEM #error "Too many supported semaphore types" #elif USE_MACH_SEM semaphore_t dsema_port; //結構體使用的semaphore訊號 semaphore_t dsema_waiter_port;//dispatch_group使用的使用的semaphore訊號 #elif USE_POSIX_SEM sem_t dsema_sem; #else #error "No supported semaphore type" #endif size_t dsema_group_waiters; struct dispatch_sema_notify_s *dsema_notify_head; //notify連結串列頭部 struct dispatch_sema_notify_s *dsema_notify_tail; //notify連結串列尾部 }; 複製程式碼
補充
如何控制執行緒併發數
方法1:使用訊號量進行併發控制
dispatch_queue_t concurrentQueue = dispatch_queue_create("concurrentQueue", DISPATCH_QUEUE_CONCURRENT); dispatch_queue_t serialQueue = dispatch_queue_create("serialQueue",DISPATCH_QUEUE_SERIAL); dispatch_semaphore_t semaphore = dispatch_semaphore_create(4); for (NSInteger i = 0; i < 15; i++) { dispatch_async(serialQueue, ^{ dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER); dispatch_async(concurrentQueue, ^{ NSLog(@"thread:%@開始執行任務%d",[NSThread currentThread],(int)i); sleep(1); NSLog(@"thread:%@結束執行任務%d",[NSThread currentThread],(int)i); dispatch_semaphore_signal(semaphore);}); }); } NSLog(@"主執行緒...!"); 複製程式碼
結果

方法2:YYDispatchQueuePool的實現思路
YYKit元件中的 YYDispatchQueuePool
也能控制併發佇列的併發數
在iOS 保持介面流暢的技巧
原文中提到:
其思路是為不同優先順序建立和 CPU 數量相同的 serial queue,每次從 pool 中獲取 queue 時,會輪詢返回其中一個 queue。我把 App 內所有非同步操作,包括影象解碼、物件釋放、非同步繪製等,都按優先順序不同放入了全域性的 serial queue 中執行,這樣儘量避免了過多執行緒導致的效能問題。