Nuttx訊號量機制
介紹
- 訊號量
在Nuttx中,訊號量是同步和互斥的基礎,Nuttx支援POSIX訊號量。
訊號量是獲得對資源獨佔訪問的首選機制,儘管 sched_lock()
和 sched_unlock()
介面也能實現這個功能,但是這兩個介面還是會在系統中帶來一些副作用, sched_lock()
會同時禁止高優先順序任務的執行,這些任務不依賴於訊號量管理的資源,這會對系統的相應時間產生負面影響。
- 優先順序反轉
正確使用訊號量可以避免 sched_lock()
的問題,但是存在以下的情況:
- 低優先順序任務
Task C
,獲取一個訊號量,獲得對保護資源的獨佔使用; - 任務
Task C
掛起,讓高優先順序任務Task A
執行; - 任務
Task A
試圖獲取任務Task C
所持有的訊號量而被阻塞,直到任務Task C
放棄訊號量; - 任務
Task C
允許被再次執行,但是被某個中等優先順序的任務Task B
掛起。
在這種情況下,高優先順序任務 Task A
在任務 Task B
(可能還有其他中等優先順序的任務)完成和任務 Task C
釋放訊號量之前不能執行。表現出來就是任務 Task A
的優先順序好像比任務 Task C
優先順序要低一樣,這種現象就叫優先順序反轉。
在一些作業系統中通過增加低優先順序任務 Task C
來避免優先順序反轉(這種行為的可操作術語叫優先順序繼承)。Nuttx在 CONFIG_PRIORITY_INHERITANCE
被選中時是支援這種行為,否則的話,需要設計人員提供不會發生優先順序反轉的實現,比如:
sched_lock()
- 優先順序繼承
上文中提到,當 CONFIG_PRIORITY_INHERITANCE
被選中時,Nuttx支援優先順序繼承,但是這個過程比較複雜。
-
CONFIG_SEM_PREALLOCHOLDERS
首先,在Nuttx中,優先順序繼承是在POSIX訊號量基礎上實現的,這是因為這些訊號量是Nuttx中最原始的等待機制,其他大多數等待方式都是基於訊號量來實現的,因此,如果為POSIX訊號量實現了優先順序繼承,那麼大多數Nuttx等待機制也就具備這個功能了。
複雜性的出現是因為訊號量可能有許多訊號量計數持有者,為了實現所有持有者的優先順序繼承,必須分配內部資料結構來管理與訊號量關聯的各種持有者。
CONFIG_SEM_PREALLOCHOLDERS
定義了對具有優先順序繼承支援的訊號量進行計數的不同執行緒的最大數量。這個設定也定義了預分配資料結構池的大小。如果禁用了優先順序繼承,或者只使用訊號量作為互斥體(只有一個持有者),或者使用計數訊號量的執行緒不超過兩個,則可以將其設定為0. -
CONFIG_SEM_NNESTPRIO
此外,可能存在多個不同優先順序的執行緒需要等待來自訊號量的計數,低優先順序執行緒持有信號量需要被提高,但是又必須跟蹤所有提高優先順序的值以便最後能恢復,這個會讓事情變得複雜。CONFIG_SEM_NNESTPRIO
定義陣列的大小,每個活動執行緒都有一個數組。這個值設定為等待另一個執行緒釋放訊號量上的高優先順序執行緒的最大數量(-1)。 -
給執行緒行為帶來未知風險
優先順序繼承相關的一些資料結構與訊號量的實現緊密耦合在一起,可能帶來某些影響。比如,如果執行緒在訊號量進行計數時執行;或者如果執行緒在不呼叫
sem_destroy()
時退出;或者優先順序提高後的執行緒重新確定自己的優先順序又會怎樣。Nuttx在實現優先順序繼承的時候會嘗試去處理所有的corner case
,但是也很有可能會遺漏,最壞的情況是,記憶體在優先順序繼承的情況下出現問題。
-
Locking訊號量
VSSignaling訊號量
訊號量(互斥鎖)有很多種用途。
-
Locking訊號量
其中一種典型的用法是對資源的獨佔訪問,也就是對臨界區的保護。需要獨佔訪問臨界區時,通過訊號量來訪問資源,訪問完畢後,該執行緒隨後釋放訊號量的計數。優先順序繼承只適用於這種用途。
-
Signaling訊號量
另一種用途是用於發出訊號:執行緒
A
等待訊號量上的事件發生。當事件發生時,另一個執行緒B
將傳送訊號量喚醒等待的執行緒A
。在獨佔訪問的用法中,是由同一個執行緒來對訊號量進行計數;而在這個用途中,是由一個執行緒等待在訊號量上,另一個執行緒來發送訊號,這本質上是一種執行緒的同步機制。在這種情況下,不應該使用優先順序繼承,否則會出現一些奇怪的行為。
資料結構及介面
資料結構
/* This structure contains information about the holder of a semaphore */ #ifdef CONFIG_PRIORITY_INHERITANCE struct tcb_s; /* Forward reference */ struct semholder_s { #if CONFIG_SEM_PREALLOCHOLDERS > 0 struct semholder_s *flink;/* Implements singly linked list */ #endif FAR struct tcb_s *htcb;/* Holder TCB */ int16_t counts;/* Number of counts owned by this holder */ }; /* This is the generic semaphore structure. */ struct sem_s { volatile int16_t semcount;/* >0 -> Num counts available */ /* <0 -> Num tasks waiting for semaphore */ /* If priority inheritance is enabled, then we have to keep track of which * tasks hold references to the semaphore. */ #ifdef CONFIG_PRIORITY_INHERITANCE uint8_t flags;/* See PRIOINHERIT_FLAGS_* definitions */ # if CONFIG_SEM_PREALLOCHOLDERS > 0 FAR struct semholder_s *hhead; /* List of holders of semaphore counts */ # else struct semholder_s holder;/* Single holder */ # endif #endif };
主要的資料結構分為兩部分:
-
struct sem_s
:用於描述通用的訊號量,其中該結構中包含了訊號量的計數變數,以及struct semholder_s
成員; -
struct semholder_s
:用於描述訊號量的持有者,對應一個TCB
,以及在該TCB
所描述的任務中訊號量的計數值。由於可能會存在多個任務等待一個訊號量,因此這個結構實現為一個單鏈表形式。
介面
-
int sem_init(sem_t *sem, int pshared, unsigned int value)
完成未命名訊號量sem
的初始化,pshared
未使用,value
為訊號量的初始化值。完成初始化之後,訊號量就能被用於sem_wait()
/sem_post()
/sem_trywait()
等介面了。 -
int sem_destroy(sem_t *sem)
完成未命名訊號量sem
的銷燬,只有呼叫sem_init()
介面建立的訊號量,才能被sem_destroy()
銷燬。呼叫sem_destroy()
去銷燬一個命名訊號量的行為是未定義的,在sem_destroy()
之後再去使用訊號量的行為也是未定義的。 -
sem_t *sem_open(const char *name, int oflag, ...)
在Task
和命名訊號量之間建立一個連線,在使用訊號量名稱呼叫sem_open()
之後,關聯的Task
可以使用該函式的返回地址來引用對應的訊號量。 -
int sem_close(sem_t *sem)
當呼叫任務結束使用這個命名訊號量時,可以呼叫此介面。sem_close()
會釋放系統為這個命名的訊號量分配的任何系統資源。如果沒有使用sem_unlink()
來刪除訊號量,那麼sem_close()
對指定的訊號量沒有影響,但是,當指定的訊號量被完全解除連結時,訊號量將在最後一個任務關閉它時消失。必須小心避免刪除另一個呼叫任務已經鎖定的訊號量。 -
int sem_unlink(const char *name)
這個函式將刪除由輸入名引數命名的訊號量,如果有一個或多個任務正在使用訊號量時呼叫sem_unlink()
,訊號量的銷燬會被延遲,直到所有引用都被呼叫sem_close()
為止。 -
int sem_wait(sem_t *sem)
嘗試去鎖住訊號量sem
,如果sem
訊號量已經被鎖住了,呼叫該介面的Task
不會返回,直到它成功的獲取鎖,或者呼叫被訊號中斷。 -
int sem_timedwait(sem_t *sem, const struct timespec *abstime)
這個函式類似於sem_wait()
,不同的是,當沒有其他執行緒通過sem_post()
來釋放訊號量的話,那麼在指定時間超時過期時,這個等待將會終止。 -
int sem_trywait(sem_t *sem)
該函式僅在當前訊號量未鎖定的情況下鎖定指定的訊號量,無論如何,呼叫返回時不會阻塞。 -
int sem_post(sem_t *sem)
當一個任務使用完一個訊號量時,將呼叫sem_post()
,該函式會解鎖訊號量。如果該該操作產生的訊號量值為正數,則不會阻塞等待訊號量解鎖的任務,訊號量的值只是簡單的遞增。如果該操作產生的訊號量值為0,那麼在阻塞的任務中,等待訊號量的任務將被允許從sem_wait()
呼叫中成功返回。注意:可以從中斷處理程式中呼叫sem_post()
。 -
int sem_getvalue(sem_t *sem, int *sval)
該函式用於獲取訊號量的值,當訊號量被鎖住時,得到的值要麼為0,要麼為負數,其絕對值表示等待訊號量的任務數。 -
int sem_getprotocol(FAR const pthread_mutexattr_t *attr, FAR int *protocol)
獲取訊號量協議屬性值,值有:SEM_PRIO_NONE
,SEM_PRIO_INHERIT
,SEM_PRIO_PROTECT
。 -
int sem_setprotocol(FAR pthread_mutexattr_t *attr, int protocol)
設定訊號量協議屬性,值有:SEM_PRIO_NONE
,SEM_PRIO_INHERIT
,SEM_PRIO_PROTECT
。SEM_PRIO_INHERIT
只有在CONFIG_PRIORITY_INHERITANCE
被選中時才支援,此外,SEM_PRIO_PROTECT
在當前的配置下不支援。
原理
還是來一張圖吧:

semaphore原理
訊號量整體的框架如上圖所示,與之相關的結構如下:
-
struct sem_s
:該結構中維護了一個 訊號燈計數值,當有任務在等待這個訊號量時,該計數值就加1,釋放訊號量時,計數值則減1.此外還維護了一個holder
持有者連結串列,把所有想獲取這個訊號量的任務組織成連結串列形式。 -
g_freeholder
:全域性佇列結構,該結構預先靜態分配好了所有的holder
持有者資料結構,當有新的任務需要等待訊號量時,便從這個全域性佇列中分配一個,如果釋放訊號量,則將holder
持有者資料結構返回到這個佇列中。 -
g_waitingforsemaphore
:全域性任務佇列,當有任務呼叫sem_wait()
等待訊號量,但是沒法獲取的時候,就將該任務新增到g_waitingforsemaphore
佇列中,並讓出CPU,當有任務呼叫sem_post()
釋放訊號量時,會去查詢g_waitingforsemaphore
佇列,是否有等待該訊號量的任務被阻塞,如果有的話,則喚醒對應的任務。 -
struct semholder_s
:訊號量持有者,該結構中主要包含了struct tcb_s
,對應到等待該訊號量的任務,struct tcb_s
結構中有一個waitsem
欄位,用於指向這個任務在等待的訊號量。此外還有一個counts
計數值,用於記錄該任務想獲取同一個訊號量的次數。
還是從幾個關鍵的函式來分析吧:
sem_wait()
sem_wait()
函式主要完成以下幾個工作:
- 判斷是否在中斷上下文中,由於
sem_wait()
可能觸發任務排程,造成本身睡眠,因此不能在中斷上下文中呼叫; - 如果訊號量可用,將計數值減1,並將呼叫任務新增到訊號量的持有者連結串列中;
- 如果訊號量不可用,將計數值減1,將呼叫任務中
waitsem
值設定成當前訊號量。如果使能了優先順序繼承,則提升該訊號量持有者中比當前呼叫任務優先順序低的任務優先順序。最後將呼叫任務新增到訊號量等待佇列g_waitingforsemaphore
中。
/**************************************************************************** * Name: sem_wait * * Description: *This function attempts to lock the semaphore referenced by 'sem'.If *the semaphore value is (<=) zero, then the calling task will not return *until it successfully acquires the lock. * * Parameters: *sem - Semaphore descriptor. * * Return Value: *0 (OK), or -1 (ERROR) is unsuccessful *If this function returns -1 (ERROR), then the cause of the failure will *be reported in 'errno' as: *- EINVAL:Invalid attempt to get the semaphore *- EINTR:The wait was interrupted by the receipt of a signal. * * Assumptions: * ****************************************************************************/ int sem_wait(FAR sem_t *sem) { FAR struct tcb_s *rtcb = this_task(); irqstate_t flags; int ret= ERROR; /* This API should not be called from interrupt handlers */ DEBUGASSERT(sem != NULL && up_interrupt_context() == false); /* The following operations must be performed with interrupts * disabled because sem_post() may be called from an interrupt * handler. */ flags = enter_critical_section(); /* sem_wait() is a cancellation point */ if (enter_cancellation_point()) { #ifdef CONFIG_CANCELLATION_POINTS /* If there is a pending cancellation, then do not perform * the wait.Exit now with ECANCELED. */ set_errno(ECANCELED); leave_cancellation_point(); leave_critical_section(flags); return ERROR; #endif } /* Make sure we were supplied with a valid semaphore. */ if (sem != NULL) { /* Check if the lock is available */ if (sem->semcount > 0) { /* It is, let the task take the semaphore. */ sem->semcount--; sem_addholder(sem); rtcb->waitsem = NULL; ret = OK; } /* The semaphore is NOT available, We will have to block the * current thread of execution. */ else { /* First, verify that the task is not already waiting on a * semaphore */ ASSERT(rtcb->waitsem == NULL); /* Handle the POSIX semaphore (but don't set the owner yet) */ sem->semcount--; /* Save the waited on semaphore in the TCB */ rtcb->waitsem = sem; /* If priority inheritance is enabled, then check the priority of * the holder of the semaphore. */ #ifdef CONFIG_PRIORITY_INHERITANCE /* Disable context switching.The following operations must be * atomic with regard to the scheduler. */ sched_lock(); /* Boost the priority of any threads holding a count on the * semaphore. */ sem_boostpriority(sem); #endif /* Add the TCB to the prioritized semaphore wait queue */ set_errno(0); up_block_task(rtcb, TSTATE_WAIT_SEM); /* When we resume at this point, either (1) the semaphore has been * assigned to this thread of execution, or (2) the semaphore wait * has been interrupted by a signal or a timeout.We can detect these * latter cases be examining the errno value. * * In the event that the semaphore wait was interrupted by a signal or * a timeout, certain semaphore clean-up operations have already been * performed (see sem_waitirq.c).Specifically: * * - sem_canceled() was called to restore the priority of all threads *that hold a reference to the semaphore, * - The semaphore count was decremented, and * - tcb->waitsem was nullifed. * * It is necesaary to do these things in sem_waitirq.c because a long * time may elapse between the time that the signal was issued and * this thread is awakened and this leaves a door open to several * race conditions. */ if (get_errno() != EINTR && get_errno() != ETIMEDOUT) { /* Not awakened by a signal or a timeout... * * NOTE that in this case sem_addholder() was called by logic * in sem_wait() fore this thread was restarted. */ ret = OK; } #ifdef CONFIG_PRIORITY_INHERITANCE sched_unlock(); #endif } } else { set_errno(EINVAL); } leave_cancellation_point(); leave_critical_section(flags); return ret; }
sem_post()
sem_post()
主要完成以下幾個任務:
sem_releaseholder() g_waitingforsemaphore sem_restorebaseprio()
/**************************************************************************** * Name: sem_post * * Description: *When a task has finished with a semaphore, it will call sem_post(). *This function unlocks the semaphore referenced by sem by performing the *semaphore unlock operation on that semaphore. * *If the semaphore value resulting from this operation is positive, then *no tasks were blocked waiting for the semaphore to become unlocked; the *semaphore is simply incremented. * *If the value of the semaphore resulting from this operation is zero, *then one of the tasks blocked waiting for the semaphore shall be *allowed to return successfully from its call to sem_wait(). * * Parameters: *sem - Semaphore descriptor * * Return Value: *0 (OK) or -1 (ERROR) if unsuccessful * * Assumptions: *This function may be called from an interrupt handler. * ****************************************************************************/ int sem_post(FAR sem_t *sem) { FAR struct tcb_s *stcb = NULL; irqstate_t flags; int ret = ERROR; /* Make sure we were supplied with a valid semaphore. */ if (sem) { /* The following operations must be performed with interrupts * disabled because sem_post() may be called from an interrupt * handler. */ flags = enter_critical_section(); /* Perform the semaphore unlock operation, releasing this task as a * holder then also incrementing the count on the semaphore. * * NOTE:When semaphores are used for signaling purposes, the holder * of the semaphore may not be this thread!In this case, * sem_releaseholder() will do nothing. * * In the case of a mutex this could be simply resolved since there is * only one holder but for the case of counting semaphores, there may * be many holders and if the holder is not this thread, then it is * not possible to know which thread/holder should be released. * * For this reason, it is recommended that priority inheritance be * disabled via sem_setprotocol(SEM_PRIO_NONE) when the semahore is * initialixed if the semaphore is to used for signaling purposes. */ ASSERT(sem->semcount < SEM_VALUE_MAX); sem_releaseholder(sem); sem->semcount++; #ifdef CONFIG_PRIORITY_INHERITANCE /* Don't let any unblocked tasks run until we complete any priority * restoration steps.Interrupts are disabled, but we do not want * the head of the read-to-run list to be modified yet. * * NOTE: If this sched_lock is called from an interrupt handler, it * will do nothing. */ sched_lock(); #endif /* If the result of of semaphore unlock is non-positive, then * there must be some task waiting for the semaphore. */ if (sem->semcount <= 0) { /* Check if there are any tasks in the waiting for semaphore * task list that are waiting for this semaphore. This is a * prioritized list so the first one we encounter is the one * that we want. */ for (stcb = (FAR struct tcb_s *)g_waitingforsemaphore.head; (stcb && stcb->waitsem != sem); stcb = stcb->flink); if (stcb != NULL) { /* The task will be the new holder of the semaphore when * it is awakened. */ sem_addholder_tcb(stcb, sem); /* It is, let the task take the semaphore */ stcb->waitsem = NULL; /* Restart the waiting task. */ up_unblock_task(stcb); } } /* Check if we need to drop the priority of any threads holding * this semaphore.The priority could have been boosted while they * held the semaphore. */ #ifdef CONFIG_PRIORITY_INHERITANCE sem_restorebaseprio(stcb, sem); sched_unlock(); #endif ret = OK; /* Interrupts may now be enabled. */ leave_critical_section(flags); } else { set_errno(EINVAL); } return ret; }
sem_timedwait()
sem_timedwait()
機制與 sem_wait()
大體類似,它們的區別跟訊息佇列進行訊息接收時 mq_receive()/mq_timedreceive()
區別類似,也是在程式碼中建立一個 watchdog
進行計時,當計時結束後還沒等到訊號量時,此時會回撥 sem_timeout()
介面,在該介面中取消該任務的等待,並重新排程該任務執行。
/**************************************************************************** * Name: sem_timedwait * * Description: *This function will lock the semaphore referenced by sem as in the *sem_wait() function. However, if the semaphore cannot be locked without *waiting for another process or thread to unlock the semaphore by *performing a sem_post() function, this wait will be terminated when the *specified timeout expires. * *The timeout will expire when the absolute time specified by abstime *passes, as measured by the clock on which timeouts are based (that is, *when the value of that clock equals or exceeds abstime), or if the *absolute time specified by abstime has already been passed at the *time of the call. * * Parameters: *sem - Semaphore object *abstime - The absolute time to wait until a timeout is declared. * * Return Value: *Zero (OK) is returned on success.On failure, -1 (ERROR) is returned *and the errno is set appropriately: * *EINVALThe sem argument does not refer to a valid semaphore.Or the *thread would have blocked, and the abstime parameter specified *a nanoseconds field value less than zero or greater than or *equal to 1000 million. *ETIMEDOUT The semaphore could not be locked before the specified timeout *expired. *EDEADLKA deadlock condition was detected. *EINTRA signal interrupted this function. * ****************************************************************************/ int sem_timedwait(FAR sem_t *sem, FAR const struct timespec *abstime) { FAR struct tcb_s *rtcb = this_task(); irqstate_t flags; intticks; interrcode; intret = ERROR; DEBUGASSERT(up_interrupt_context() == false && rtcb->waitdog == NULL); /* sem_timedwait() is a cancellation point */ (void)enter_cancellation_point(); /* Verify the input parameters and, in case of an error, set * errno appropriately. */ #ifdef CONFIG_DEBUG_FEATURES if (!abstime || !sem) { errcode = EINVAL; goto errout; } #endif /* Create a watchdog.We will not actually need this watchdog * unless the semaphore is unavailable, but we will reserve it up * front before we enter the following critical section. */ rtcb->waitdog = wd_create(); if (!rtcb->waitdog) { errcode = ENOMEM; goto errout; } /* We will disable interrupts until we have completed the semaphore * wait.We need to do this (as opposed to just disabling pre-emption) * because there could be interrupt handlers that are asynchronously * posting semaphores and to prevent race conditions with watchdog * timeout.This is not too bad because interrupts will be re- * enabled while we are blocked waiting for the semaphore. */ flags = enter_critical_section(); /* Try to take the semaphore without waiting. */ ret = sem_trywait(sem); if (ret == OK) { /* We got it! */ goto success_with_irqdisabled; } /* We will have to wait for the semaphore.Make sure that we were provided * with a valid timeout. */ if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000) { errcode = EINVAL; goto errout_with_irqdisabled; } /* Convert the timespec to clock ticks.We must have interrupts * disabled here so that this time stays valid until the wait begins. */ errcode = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks); /* If the time has already expired return immediately. */ if (errcode == OK && ticks <= 0) { errcode = ETIMEDOUT; goto errout_with_irqdisabled; } /* Handle any time-related errors */ if (errcode != OK) { goto errout_with_irqdisabled; } /* Start the watchdog */ (void)wd_start(rtcb->waitdog, ticks, (wdentry_t)sem_timeout, 1, getpid()); /* Now perform the blocking wait */ ret = sem_wait(sem); if (ret < 0) { /* sem_wait() failed.Save the errno value */ errcode = get_errno(); } /* Stop the watchdog timer */ wd_cancel(rtcb->waitdog); if (errcode != OK) { goto errout_with_irqdisabled; } /* We can now restore interrupts and delete the watchdog */ /* Success exits */ success_with_irqdisabled: leave_critical_section(flags); wd_delete(rtcb->waitdog); rtcb->waitdog = NULL; leave_cancellation_point(); return OK; /* Error exits */ errout_with_irqdisabled: leave_critical_section(flags); wd_delete(rtcb->waitdog); rtcb->waitdog = NULL; errout: set_errno(errcode); leave_cancellation_point(); return ERROR; }
總結
總體來說,在Nuttx中訊號量既可用於同步和互斥處理,當任務等不到訊號量時,便新增到相關的任務佇列中進行阻塞睡眠,當釋放訊號量時,再去該任務佇列中進行查詢,重新排程該任務執行。如果遇到優先順序反轉的情況,優先順序繼承是一種解決方法。