Nuttx工作佇列機制
Nuttx相關的歷史文章:
- ofollow,noindex">Nuttx Task Schedule
- Nuttx訊號機制
- Nuttx編譯系統
- Nuttx訊息佇列機制
介紹
Nuttx
提供工作佇列機制。工作佇列是一個存放執行緒的佇列,它對於將任務負載減荷到不同的執行緒上下文中,以便於延遲執行,或者序列執行很有幫助。
工作佇列分類
有三種不同型別的工作佇列,每一類都有不同的屬性和用途。
- 高優先順序核心工作佇列
-
高優先順序核心工作佇列
專用的高優先順序工作佇列用於中斷處理函式中的延遲處理,在有些驅動中可能需要這樣一個工作佇列,如果沒有必要的話,也可以安全的禁掉。高優先順序的執行緒也可以充當資源回收器--從中斷處理函式中完成記憶體的延遲釋放。如果高優先順序工作執行緒被
disable
了的話,清理工作有兩種方式來完成:1)如果使能了低優先順序的工作執行緒,在該執行緒中完成;2)如果低優先順序執行緒沒有使能,則IDLE
執行緒來完成(如果記憶體回收優先順序比較高,可能不太合適)。 -
裝置驅動底半部
高優先順序工作執行緒可以用於裝置驅動程式的底半部,因此它必須執行在一個非常高,並且固定的優先順序,與中斷處理程式本身的優先順序競爭。通常,高優先順序工作佇列應該是系統中最高優先順序的執行緒。預設的優先順序為224。
-
執行緒池
工作佇列可以被配置成支援多個低優先順序執行緒,這本質上是一個執行緒池,為佇列工作提供多執行緒服務,這打破了“佇列”的嚴格序列化(因此,工作佇列也不再是一種佇列)。
當在I/O操作,暫停等待輸入時,多個工作執行緒是需要的,如果只有一個工作執行緒的話,那麼整個工作佇列處理就會停止。這對於非同步I/O、AIO是必要的。
-
與低優先順序核心工作佇列比較
對於不太關鍵、較低優先順序、面向應用程式的工作執行緒支援,考慮使用較低優先順序的工作佇列。較低優先順序的工作佇列以較低的優先順序執行,但是它有一個額外的優點,那就是支援優先順序繼承(如果
CONFIG_PRIORITY_INHERITANCE=y
選中的話):低優先順序的工作執行緒可以被調整優先順序。 -
配置選項
CONFIG_SCHED_HPWORK
:使能高優先順序工作佇列CONFIG_SCHED_HPNTHREADS
:高優先順序工作佇列執行緒池中的執行緒數量,預設是1.CONFIG_SCHED_HPWORKPRIORITY
:高優先順序工作執行緒的執行優先順序,預設是224.CONFIG_SCHED_HPWORKSTACKSIZE
:工作執行緒的棧空間大小,預設是2048位元組 -
通用配置選項
這個選項通用於所有的工作佇列:
CONFIG_SIG_SIGWORK
:用於喚醒工作執行緒的訊號值,預設使用17.
- 低優先順序核心工作佇列
-
低優先順序核心工作佇列
低優先順序工作佇列更適合於具備擴充套件性的,面向應用程式處理的場景,比如檔案系統清理、記憶體垃圾回收、非同步I/O操作等。
-
與高優先核心工作佇列比較
低優先順序核心工作佇列,由於優先順序會低一些,因此不適合用作驅動程式的底半部。除此之外,它與高優先順序核心工作佇列非常相似,上文中關於高優先順序工作佇列的大部分討論同樣適用。但是低優先順序核心工作佇列,有一個重要的特點就是優先順序繼承,這個讓它更適合於某些任務。
-
優先順序繼承
低優先順序核心工作執行緒支援優先順序繼承(需要選擇
CONFIG_PRIORITY_INHERITANCE=y
),可以根據實際情況調整優先順序。優先順序繼承不是自動完成的,低優先順序工作執行緒總是執行在一個固定的優先順序上。可以通過呼叫lpwork_bootstpriority()
介面來提升優先順序(通常在排程這個任務之前呼叫),在任務完成之後可以通過lpwork_restorepriority()
介面來恢復優先順序(一般在任務完成時的work handler
中呼叫)。目前,只有Nuttx非同步I/O邏輯使用了這個動態優先順序特性。 -
配置選項
CONFIG_SCHED_LPWORK
:使能低優先順序工作佇列CONFIG_SCHED_LPNTHREADS
:低優先順序工作佇列中執行緒數量,預設值為1CONFIG_SCHED_LPWORKPRIORITY
:低優先順序工作執行緒中最小的執行優先順序,佇列中每個執行緒都以這個優先順序的值開始執行。如果優先順序繼承使能了的話,優先順序會在這個基礎上往上提升,預設50.CONFIG_SCHED_LPWORKPRIOMAX
:低優先順序執行緒中最大的執行優先順序。執行的優先順序不能超過這個值,預設176.CONFIG_SCHED_LPWORKSTACKSIZE
:低優先順序工作執行緒的棧大小,預設2048Byte。
- 使用者模式工作佇列
-
工作佇列訪問許可權
低優先順序和高優先順序工作執行緒,都是核心執行緒。在Nuttx
flat build
模式下編譯時,應用程式是可以訪問和使用的。但是,在Nuttxprotected/kernel build
模式下編譯時,核心模式下的程式碼是獨立的,使用者模式是沒法訪問的。 -
工作模式工作佇列
使用者模式工作佇列介面與核心模式工作佇列介面相同,使用者模式工作佇列的功能等效於高優先順序工作佇列,不同之處在於,它的實現不依賴於核心內部提供的資源。
-
配置選項
CONFIG_LIB_USRWORK
:使能使用者模式工作佇列CONFIG_LIB_USRWORKPRIORITY
:使用者模式下工作執行緒的執行優先順序,預設為100.CONFIG_LIB_USRWORKSTACKSIZE
:使用者模式下工作執行緒的棧大小,預設2048.
資料結構及介面
資料結構
資料結構分為兩部分,一部分是使用者使用的結構,另一部分是核心實現用到的結構:
- 使用者資料結構
/* Defines the work callback */ typedef void (*worker_t)(FAR void *arg); /* Defines one entry in the work queue.The user only needs this structure * in order to declare instances of the work structure.Handling of all * fields is performed by the work APIs */ struct work_s { struct dq_entry_s dq;/* Implements a doubly linked list */ worker_tworker;/* Work callback */ FAR void *arg;/* Callback argument */ systime_t qtime;/* Time work queued */ systime_t delay;/* Delay until work performed */ };
struct work_s
結構只需要用來宣告例項即可,該資料結構中的內部成員,全部由相應的API介面來操作,其中 qtime
表示的是該任務入隊的時間,而 delay
表示的是需要延遲多長時間去執行,如果 delay
值為0,表明立刻執行。
- 核心實現資料結構
/* This represents one worker */ struct kworker_s { pid_tpid;/* The task ID of the worker thread */ volatile boolbusy;/* True: Worker is not available */ }; /* This structure defines the state of one kernel-mode work queue */ struct kwork_wqueue_s { systime_tdelay;/* Delay between polling cycles (ticks) */ struct dq_queue_s q;/* The queue of pending work */ struct kworker_sworker[1]; /* Describes a worker thread */ }; /* This structure defines the state of one high-priority work queue.This * structure must be cast-compatible with kwork_wqueue_s. */ #ifdef CONFIG_SCHED_HPWORK struct hp_wqueue_s { systime_tdelay;/* Delay between polling cycles (ticks) */ struct dq_queue_s q;/* The queue of pending work */ struct kworker_sworker[1]; /* Describes the single high priority worker */ }; #endif /* This structure defines the state of one high-priority work queue.This * structure must be cast compatible with kwork_wqueue_s */ #ifdef CONFIG_SCHED_LPWORK struct lp_wqueue_s { systime_tdelay;/* Delay between polling cycles (ticks) */ struct dq_queue_s q;/* The queue of pending work */ /* Describes each thread in the low priority queue's thread pool */ struct kworker_sworker[CONFIG_SCHED_LPNTHREADS]; }; #endif /**************************************************************************** * Public Data ****************************************************************************/ #ifdef CONFIG_SCHED_HPWORK /* The state of the kernel mode, high priority work queue. */ extern struct hp_wqueue_s g_hpwork; #endif #ifdef CONFIG_SCHED_LPWORK /* The state of the kernel mode, low priority work queue(s). */ extern struct lp_wqueue_s g_lpwork; #endif
上述結構體中:
struct kworker_s
:對應一個工作執行緒,其中包含了執行緒ID號及執行狀態。
struct kwork_wqueue_s
:描述核心模式下的工作佇列,在介面中都使用這個資料結構,實際上是將 struct hp_wqueue_s
/ struct lp_wqueue_s
資料結構進行強制型別轉換。
struct hp_wqueue_s
:描述高優先順序核心工作佇列,從資料結構中可以看出,該佇列中預設只支援1個工作執行緒。
struct lp_wqueue_s
:描述低優先順序核心工作佇列,從資料結構中可以看出,該佇列中的工作執行緒是可以配置的, CONFIG_SCHED_LPNTHREADS
的值就代表執行緒數量。
g_hpwork/g_lpwork
:分別為兩個全域性描述符,對應到兩種型別的核心工作佇列。
介面定義
-
int work_usrstart(void)
:啟動使用者模式下的工作佇列。 -
int work_queue(int qid, FAR struct work_s *work, worker_t worker, FAR void *arg, systime_t delay)
:將任務新增到工作佇列中,任務將會在工作佇列中的執行緒上延遲執行。 -
int work_cancel(int qid, FAR struct work_s *work)
:將之前入列的任務刪除掉。 -
int work_signal(int qid)
:通過工作佇列中的執行緒去執行任務處理。 -
work_available(work)
:檢查任務的結構體是否可用。 -
void lpwork_boostpriority(uint8_t reqprio)
:提升執行緒執行的優先順序。 -
void lpwork_restorepriority(uint8_t reqprio)
:恢復執行緒執行的優先順序。
程式碼說明一切:
/**************************************************************************** * Name: work_usrstart * * Description: *Start the user mode work queue. * * Input parameters: *None * * Returned Value: *The task ID of the worker thread is returned on success.A negated *errno value is returned on failure. * ****************************************************************************/ #if defined(CONFIG_LIB_USRWORK) && !defined(__KERNEL__) int work_usrstart(void); #endif
/**************************************************************************** * Name: work_queue * * Description: *Queue work to be performed at a later time.All queued work will be *performed on the worker thread of execution (not the caller's). * *The work structure is allocated by caller, but completely managed by *the work queue logic.The caller should never modify the contents of *the work queue structure; the caller should not call work_queue() *again until either (1) the previous work has been performed and removed *from the queue, or (2) work_cancel() has been called to cancel the work *and remove it from the work queue. * * Input parameters: *qid- The work queue ID *work- The work structure to queue *worker - The worker callback to be invoked.The callback will invoked *on the worker thread of execution. *arg- The argument that will be passed to the worker callback when *it is invoked. *delay- Delay (in clock ticks) from the time queue until the worker *is invoked. Zero means to perform the work immediately. * * Returned Value: *Zero on success, a negated errno on failure * ****************************************************************************/ int work_queue(int qid, FAR struct work_s *work, worker_t worker, FAR void *arg, systime_t delay);
/**************************************************************************** * Name: work_cancel * * Description: *Cancel previously queued work.This removes work from the work queue. *After work has been cancelled, it may be re-queue by calling work_queue() *again. * * Input parameters: *qid- The work queue ID *work- The previously queue work structure to cancel * * Returned Value: *Zero on success, a negated errno on failure * *-ENOENT - There is no such work queued. *-EINVAL - An invalid work queue was specified * ****************************************************************************/ int work_cancel(int qid, FAR struct work_s *work);
/**************************************************************************** * Name: work_signal * * Description: *Signal the worker thread to process the work queue now.This function *is used internally by the work logic but could also be used by the *user to force an immediate re-assessment of pending work. * * Input parameters: *qid- The work queue ID * * Returned Value: *Zero on success, a negated errno on failure * ****************************************************************************/ int work_signal(int qid);
/**************************************************************************** * Name: work_available * * Description: *Check if the work structure is available. * * Input parameters: *work - The work queue structure to check. *None * * Returned Value: *true if available; false if busy (i.e., there is still pending work). * ****************************************************************************/ #define work_available(work) ((work)->worker == NULL)
/**************************************************************************** * Name: lpwork_boostpriority * * Description: *Called by the work queue client to assure that the priority of the low- *priority worker thread is at least at the requested level, reqprio. This *function would normally be called just before calling work_queue(). * * Parameters: *reqprio - Requested minimum worker thread priority * * Return Value: *None * ****************************************************************************/ #if defined(CONFIG_SCHED_LPWORK) && defined(CONFIG_PRIORITY_INHERITANCE) void lpwork_boostpriority(uint8_t reqprio); #endif
/**************************************************************************** * Name: lpwork_restorepriority * * Description: *This function is called to restore the priority after it was previously *boosted.This is often done by client logic on the worker thread when *the scheduled work completes.It will check if we need to drop the *priority of the worker thread. * * Parameters: *reqprio - Previously requested minimum worker thread priority to be *"unboosted" * * Return Value: *None * ****************************************************************************/ #if defined(CONFIG_SCHED_LPWORK) && defined(CONFIG_PRIORITY_INHERITANCE) void lpwork_restorepriority(uint8_t reqprio); #endif
原理
按慣例,先來一張圖吧:

工作佇列
簡單來說,工作佇列就如上圖所示,由三個部分組成:
work_queue() delay
Nuttx作業系統執行的入口在 os_start()
,從這開始,最終會呼叫到工作佇列執行緒的建立,呼叫關係如下:
os_start() ---> os_bringup() ---> os_workqueue() ---> work_hpstart()/work_lpstart()/USERSPACE->work_usrstart()
其中 work_hpstart()/work_lpstart()/USERSPACE->work_usrstart()
分別對應核心高優先順序工作佇列、核心低優先順序工作佇列、使用者模式工作佇列三種情況,由於原理類似,我將選擇核心高優先順序工作佇列來進行分析。入口為: work_hpstart()
。
work_hpstart()
主要完成以下幾點:
work_hpthread
int work_hpstart(void) { pid_t pid; /* Initialize work queue data structures */ g_hpwork.delay= CONFIG_SCHED_HPWORKPERIOD / USEC_PER_TICK; dq_init(&g_hpwork.q); /* Start the high-priority, kernel mode worker thread */ sinfo("Starting high-priority kernel worker thread\n"); pid = kernel_thread(HPWORKNAME, CONFIG_SCHED_HPWORKPRIORITY, CONFIG_SCHED_HPWORKSTACKSIZE, (main_t)work_hpthread, (FAR char * const *)NULL); DEBUGASSERT(pid > 0); if (pid < 0) { int errcode = errno; DEBUGASSERT(errcode > 0); serr("ERROR: kernel_thread failed: %d\n", errcode); return -errcode; } g_hpwork.worker[0].pid= pid; g_hpwork.worker[0].busy = true; return pid;
實際的工作由 work_hpthread
執行緒來處理,在該函式中執行一個死迴圈,在迴圈中呼叫 work_process()
來處理實際的任務。
/**************************************************************************** * Name: work_hpthread * * Description: *This is the worker thread that performs the actions placed on the high *priority work queue. * *This, along with the lower priority worker thread(s) are the kernel *mode work queues (also build in the flat build).One of these threads *also performs periodic garbage collection (that would otherwise be *performed by the idle thread if CONFIG_SCHED_WORKQUEUE is not defined). *That will be the higher priority worker thread only if a lower priority *worker thread is available. * *All kernel mode worker threads are started by the OS during normal *bring up.This entry point is referenced by OS internally and should *not be accessed by application logic. * * Input parameters: *argc, argv (not used) * * Returned Value: *Does not return * ****************************************************************************/ static int work_hpthread(int argc, char *argv[]) { /* Loop forever */ for (; ; ) { #ifndef CONFIG_SCHED_LPWORK /* First, perform garbage collection.This cleans-up memory * de-allocations that were queued because they could not be freed in * that execution context (for example, if the memory was freed from * an interrupt handler). * * NOTE: If the work thread is disabled, this clean-up is performed by * the IDLE thread (at a very, very low priority).If the low-priority * work thread is enabled, then the garbage collection is done on that * thread instead. */ sched_garbage_collection(); #endif /* Then process queued work.work_process will not return until: (1) * there is no further work in the work queue, and (2) the polling * period provided by g_hpwork.delay expires. */ work_process((FAR struct kwork_wqueue_s *)&g_hpwork, g_hpwork.delay, 0); } return OK; /* To keep some compilers happy */ }
所以工作佇列的任務處理核心是 work_process()
介面,該介面對於核心的高優先順序工作佇列和核心低優先順序工作佇列是一致的。
work_process()
完成的主要任務有:
- 獲取執行時候的系統時間,這個時間主要用於統計任務進入工作佇列後,消耗了多久,是否到了需要去執行的時間點。
- 從工作佇列的頭部獲取一個任務,通過比較兩個時間值:1)消耗的時間,也就是當前的系統時間減去任務入列的時間;2)任務延遲執行的時間,也就是資料結構中描述的
delay
時間。 - 如果消耗的時間大於延遲執行的時間,那就立刻執行任務的回撥函式。
- 如果消耗的時間小於延遲執行的時間,計算剩餘時間,並最終讓任務睡眠等待一下。
5.高優先順序核心工作佇列和低優先核心工作佇列的實現方式有一些細微的差異,主要體現在,高優先順序的情況下,如果還不到執行時間,工作執行緒選擇睡眠讓出CPU;低優先順序的情況下,會選擇讓第一個執行緒輪詢(與高優先順序工作執行緒行為一致),而讓其他的工作執行緒呼叫sigwaitinfo()
介面等待訊號。
程式碼如下:
void work_process(FAR struct kwork_wqueue_s *wqueue, systime_t period, int wndx) { volatile FAR struct work_s *work; worker_tworker; irqstate_t flags; FAR void *arg; systime_t elapsed; systime_t remaining; systime_t stick; systime_t ctick; systime_t next; /* Then process queued work.We need to keep interrupts disabled while * we process items in the work list. */ next= period; flags = enter_critical_section(); /* Get the time that we started this polling cycle in clock ticks. */ stick = clock_systimer(); /* And check each entry in the work queue.Since we have disabled * interrupts we know:(1) we will not be suspended unless we do * so ourselves, and (2) there will be no changes to the work queue */ work = (FAR struct work_s *)wqueue->q.head; while (work) { /* Is this work ready?It is ready if there is no delay or if * the delay has elapsed. qtime is the time that the work was added * to the work queue.It will always be greater than or equal to * zero.Therefore a delay of zero will always execute immediately. */ ctick= clock_systimer(); elapsed = ctick - work->qtime; if (elapsed >= work->delay) { /* Remove the ready-to-execute work from the list */ (void)dq_rem((struct dq_entry_s *)work, &wqueue->q); /* Extract the work description from the entry (in case the work * instance by the re-used after it has been de-queued). */ worker = work->worker; /* Check for a race condition where the work may be nullified * before it is removed from the queue. */ if (worker != NULL) { /* Extract the work argument (before re-enabling interrupts) */ arg = work->arg; /* Mark the work as no longer being queued */ work->worker = NULL; /* Do the work.Re-enable interrupts while the work is being * performed... we don't have any idea how long this will take! */ leave_critical_section(flags); worker(arg); /* Now, unfortunately, since we re-enabled interrupts we don't * know the state of the work list and we will have to start * back at the head of the list. */ flags = enter_critical_section(); work= (FAR struct work_s *)wqueue->q.head; } else { /* Cancelled.. Just move to the next work in the list with * interrupts still disabled. */ work = (FAR struct work_s *)work->dq.flink; } } else /* elapsed < work->delay */ { /* This one is not ready. * * NOTE that elapsed is relative to the the current time, * not the time of beginning of this queue processing pass. * So it may need an adjustment. */ elapsed += (ctick - stick); if (elapsed > work->delay) { /* The delay has expired while we are processing */ elapsed = work->delay; } /* Will it be ready before the next scheduled wakeup interval? */ remaining = work->delay - elapsed; if (remaining < next) { /* Yes.. Then schedule to wake up when the work is ready */ next = remaining; } /* Then try the next in the list. */ work = (FAR struct work_s *)work->dq.flink; } } #if defined(CONFIG_SCHED_LPWORK) && CONFIG_SCHED_LPNTHREADS > 0 /* Value of zero for period means that we should wait indefinitely until * signalled.This option is used only for the case where there are * multiple, low-priority worker threads.In that case, only one of * the threads does the poll... the others simple.In all other cases * period will be non-zero and equal to wqueue->delay. */ if (period == 0) { sigset_t set; /* Wait indefinitely until signalled with SIGWORK */ sigemptyset(&set); sigaddset(&set, SIGWORK); wqueue->worker[wndx].busy = false; DEBUGVERIFY(sigwaitinfo(&set, NULL)); wqueue->worker[wndx].busy = true; } else #endif { /* Get the delay (in clock ticks) since we started the sampling */ elapsed = clock_systimer() - stick; if (elapsed < period && next > 0) { /* How much time would we need to delay to get to the end of the * sampling period?The amount of time we delay should be the smaller * of the time to the end of the sampling period and the time to the * next work expiry. */ remaining = period - elapsed; next= MIN(next, remaining); /* Wait awhile to check the work list.We will wait here until * either the time elapses or until we are awakened by a signal. * Interrupts will be re-enabled while we wait. */ wqueue->worker[wndx].busy = false; usleep(next * USEC_PER_TICK); wqueue->worker[wndx].busy = true; } } leave_critical_section(flags); }
總結
Nuttx中的工作佇列機制還是比較簡單的:一個工作佇列,對應到一個任務的佇列,以及一個工作執行緒的陣列。核心負責來排程這些工作執行緒,而任務佇列中的任務會分發到各個執行緒上執行。三種類型的工作佇列,實現都是大同小異。