1. 程式人生 > >FreeRTOS(20)---FreeRTOS 任務通知分析

FreeRTOS(20)---FreeRTOS 任務通知分析

FreeRTOS 任務通知分析

FreeRTOS 任務通知分析

在FreeRTOS版本V8.2.0中推出了全新的功能:任務通知。在大多數情況下,任務通知可以替代二進位制訊號量、計數訊號量、事件組,可以替代長度為1的佇列(可以儲存一個32位整數或指標值),並且任務通知速度更快、使用的RAM更少!我在

《 FreeRTOS(8)—FreeRTOS 任務通知》一文中介紹了任務通知如何使用以及侷限性,今天我們將分析任務通知的實現原始碼,看一下任務通知是如何做到效率與RAM消耗雙贏的。

《FreeRTOS(18)—FreeRTOS 訊號量分析》一文中我們已經知道,FreeRTOS的訊號量是使用佇列機制實現的,資料結構也完全是佇列的那一套。而任務通知則不同,它的資料結構嵌在任務TCB(任務控制塊,見《FreeRTOS(14)—FreeRTOS 任務建立分析》)中的,並且資料結構十分簡單,涉及到任務TCB的兩個欄位,我們將它單獨列出來:

volatile uint32_t ulNotifiedValue; 	/*任務通知值*/  
volatile uint8_t ucNotifyState;	/*任務通知狀態,標識任務是否在等待通知等*/

這兩個欄位佔用5位元組RAM(本文都是在32位系統下討論),而一個佇列資料結構至少佔用76位元組RAM!這不是同一數量級的,所以任務通知在RAM消耗上完勝。

在分析佇列和訊號量的文章中,我們知道在使用佇列、訊號量前,必須先建立佇列和訊號量,目的是為了建立佇列資料結構。比如使用API函式xQueueCreate()建立佇列,用API函式xSemaphoreCreateBinary()建立二進位制訊號量等等。再來看任務通知,由於任務通知的資料結構包含在任務TCB中,只要任務存在,任務通知資料結構就已經建立完畢,可以直接使用!在易用性上,任務通知再次獲勝。

要想了解任務通知在效能上佔優的原因,就要分析原始碼了。

只有任務可以等待通知,中斷服務函式中不可以。如果等待的通知無效,任務會進入阻塞狀態,我們可以將等待通知的任務看作是消費者;其它任務和中斷可以向等待通知的任務傳送通知,傳送通知的任務和中斷服務函式可以認為是生產者。處於阻塞的消費者得到通知後會再次進入就緒態。

任務通知API函式主要有兩類,一類傳送通知,一類等待通知。傳送通知API函式可以用於任務和中斷服務函式,等待通知API函式只能用在任務中。

傳送通知

我們先看一下發送通知API函式。這類函式比較多,有6個。但仔細分析會發現它們只能完成3種操作,每種操作有兩個API函式,分別為帶中斷保護版本和不帶中斷保護版本。FreeRTOS將API細分為帶中斷保護版本和不帶中斷保護版本是為了節省中斷服務程式處理時間,提升效能。

和訊號量類似,大多數傳送通知API介面也是由巨集實現的,如表1-1所示。

表1-1:傳送通知API函式與實際呼叫函式列表

在這裡插入圖片描述

xTaskGenericNotify()

不帶中斷保護的傳送通知API函式實際都是呼叫函式xTaskGenericNotify()實現的,我們看一下這個函式原型:

BaseType_t xTaskGenericNotify( 
        TaskHandle_t xTaskToNotify, 
        uint32_t ulValue, 
        eNotifyAction eAction, 
        uint32_t *pulPreviousNotificationValue )
  • xTaskToNotify:被通知的任務控制代碼。
  • ulValue:更新的通知值
  • eAction:列舉型別,指明更新通知值的方法,列舉變數成員以及作用見表1-2所示。
  • pulPreviousNotifyValue:回傳未被更新的任務通知值。如果不需要回傳未被更新的任務通知值,這裡設定為NULL。

表1-2:eNotifyAction列舉成員以及作用
在這裡插入圖片描述
與入隊操作相比較,傳送通知API函式顯得非常簡單,整理後的原始碼如下所示:

BaseType_t xTaskGenericNotify( TaskHandle_t xTaskToNotify, uint32_t ulValue, eNotifyAction eAction, uint32_t *pulPreviousNotificationValue )
{
TCB_t * pxTCB;
BaseType_t xReturn = pdPASS;
uint8_t ucOriginalNotifyState;


    configASSERT( xTaskToNotify );
    pxTCB = ( TCB_t * ) xTaskToNotify;


    taskENTER_CRITICAL();
    {
        if( pulPreviousNotificationValue != NULL )
        {
			/* 回傳更新前的通知值*/
            *pulPreviousNotificationValue = pxTCB->ulNotifiedValue;
        }


        ucOriginalNotifyState = pxTCB->ucNotifyState;


        pxTCB->ucNotifyState = taskNOTIFICATION_RECEIVED;


        switch( eAction )
        {
            case eSetBits   :
                pxTCB->ulNotifiedValue |= ulValue;
                break;


            case eIncrement :
                ( pxTCB->ulNotifiedValue )++;
                break;


            case eSetValueWithOverwrite :
                pxTCB->ulNotifiedValue = ulValue;
                break;


            case eSetValueWithoutOverwrite :
                if( ucOriginalNotifyState != taskNOTIFICATION_RECEIVED )
                {
                    pxTCB->ulNotifiedValue = ulValue;
                }
                else
                {
                    /* 上次的通知值還未取走,本次通知值丟棄 */
                    xReturn = pdFAIL;
                }
                break;


            case eNoAction:
                /* 不需要更新通知值*/
                break;
        }


        traceTASK_NOTIFY();


        /* 如果被通知的任務因為等待通知而阻塞,現在將它解除阻塞 */
        if( ucOriginalNotifyState == taskWAITING_NOTIFICATION )
        {
            ( void ) uxListRemove( &( pxTCB->xStateListItem ) );
            prvAddTaskToReadyList( pxTCB );


            if( pxTCB->uxPriority > pxCurrentTCB->uxPriority )
            {
                /* 如果被通知的任務優先順序高於當前任務,則觸發PendSV中斷,退出臨界區後進行上下文切換T*/
                taskYIELD_IF_USING_PREEMPTION();
            }
        }
    }
    taskEXIT_CRITICAL();


    return xReturn;
}

函式的功能可以概括為:按照指定的方法更新通知值,如果被通知的任務處於阻塞狀態,則將它解除阻塞,解除阻塞任務的優先順序如果大於當前任務的優先順序,則觸發一次任務切換。

與釋放訊號量API函式相比,本函式少了很多呼叫子函式開銷、少了很多判斷、少了對事件列表的操作等等,確實是比釋放訊號量的實現要簡潔的多。這也是有原因的,因為任務通知有它自己的侷限性,並不能完全代替訊號量。比如一個任務只能阻塞到一個通知上,如想要實現多個任務阻塞到同一個事件上,只能使用訊號量了。也正是因為這種侷限性,使得任務通知實現起來簡單高效,並且大多數情況下,任務通知的方法就已經能解決問題了。

vTaskNotifyGiveFromISR()

這個API函式是vTaskNotifyGive()的帶中斷保護版本,是專門設計用來在某些情況下代替二進位制訊號量和計數訊號量的。函式也很簡單,我們直接看原始碼,原始碼經過整理和註釋,以方便理解。

void vTaskNotifyGiveFromISR( TaskHandle_t xTaskToNotify, BaseType_t *pxHigherPriorityTaskWoken )
{
TCB_t * pxTCB;
uint8_t ucOriginalNotifyState;
UBaseType_t uxSavedInterruptStatus;


    pxTCB = ( TCB_t * ) xTaskToNotify;


    uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR();
    {
        ucOriginalNotifyState = pxTCB->ucNotifyState;
        pxTCB->ucNotifyState = taskNOTIFICATION_RECEIVED;


        /* 通知值加1,相當於釋放了一個訊號量 */
        ( pxTCB->ulNotifiedValue )++;


        /* 如果目標任務因為等待通知而阻塞,現在將它解除阻塞*/
        if( ucOriginalNotifyState == taskWAITING_NOTIFICATION )
        {
            /* 如果排程器正常,將任務放入就緒列表,否則放入掛起就緒列表 */
            if( uxSchedulerSuspended == ( UBaseType_t ) pdFALSE )
            {
                ( void ) uxListRemove( &( pxTCB->xStateListItem ) );
                prvAddTaskToReadyList( pxTCB );
            }
            else
            {
                vListInsertEnd( &( xPendingReadyList ), &( pxTCB->xEventListItem ) );
            }


            if( pxTCB->uxPriority > pxCurrentTCB->uxPriority )
            {
                /* 如果解除阻塞的任務優先順序大於當前任務優先順序,則設定上下文切換標識,等退出函式後手動切換上下文,或者在系統節拍中斷服務程式中自動切換上下文*/
                if( pxHigherPriorityTaskWoken != NULL )
                {
                    *pxHigherPriorityTaskWoken = pdTRUE;    /* 設定手動切換標誌 */
                }
                else
                {
                    xYieldPending = pdTRUE;                 /* 設定自動切換標誌 */
                }
            }
        }
    }
    portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus );
}

xTaskGenericNotifyFromISR()

如表1-1所示,帶中斷保護版本的API函式xTaskNotifyFromISR()和xTaskNotifyAndQueryFromISR()都是巨集定義,真正被呼叫的函式為xTaskGenericNotifyFromISR()。這個函式用於在中斷在中傳送通知,與不帶中斷保護的API函式xTaskGenericNotify()非常相似,只是增加了一些中斷保護措施,我們直接看原始碼。通用原始碼經過整理和註釋,以方便理解。

BaseType_t xTaskGenericNotifyFromISR( TaskHandle_t xTaskToNotify, uint32_t ulValue, eNotifyAction eAction, uint32_t *pulPreviousNotificationValue, BaseType_t *pxHigherPriorityTaskWoken )
{
TCB_t * pxTCB;
uint8_t ucOriginalNotifyState;
BaseType_t xReturn = pdPASS;
UBaseType_t uxSavedInterruptStatus;


    pxTCB = ( TCB_t * ) xTaskToNotify;


    uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR();
    {
        if( pulPreviousNotificationValue != NULL )
        {
            /* 回傳更新前的通知值 */
            *pulPreviousNotificationValue = pxTCB->ulNotifiedValue;
        }


        ucOriginalNotifyState = pxTCB->ucNotifyState;
        pxTCB->ucNotifyState = taskNOTIFICATION_RECEIVED;
        
        /* 根據引數設定通知值 */
        switch( eAction )
        {
            case eSetBits   :
                pxTCB->ulNotifiedValue |= ulValue;
                break;


            case eIncrement :
                ( pxTCB->ulNotifiedValue )++;
                break;


            case eSetValueWithOverwrite :
                pxTCB->ulNotifiedValue = ulValue;
                break;


            case eSetValueWithoutOverwrite :
                if( ucOriginalNotifyState != taskNOTIFICATION_RECEIVED )
                {
                    pxTCB->ulNotifiedValue = ulValue;
                }
                else
                {
                    /* 上次的通知值還未取走,本次通知值丟棄 */
                    xReturn = pdFAIL;
                }
                break;


            case eNoAction :
                /* 不需要更新通知值*/
                break;
        }


        traceTASK_NOTIFY_FROM_ISR();


        /* 如果被通知的任務因為等待通知而阻塞,現在將它解除阻塞 */
        if( ucOriginalNotifyState == taskWAITING_NOTIFICATION )
        {   
            /* 如果排程器正常,將任務放入就緒列表,否則放入掛起就緒列表 */
            if( uxSchedulerSuspended == ( UBaseType_t ) pdFALSE )
            {
                ( void ) uxListRemove( &( pxTCB->xStateListItem ) );
                prvAddTaskToReadyList( pxTCB );
            }
            else
            {
                vListInsertEnd( &( xPendingReadyList ), &( pxTCB->xEventListItem ) );
            }


            if( pxTCB->uxPriority > pxCurrentTCB->uxPriority )
            {
                /* 如果解除阻塞的任務優先順序大於當前任務優先順序,則設定上下文切換標識,等退出函式後手動切換上下文,或者在系統節拍中斷服務程式中自動切換上下文*/
                if( pxHigherPriorityTaskWoken != NULL )
                {
                    *pxHigherPriorityTaskWoken = pdTRUE;    /* 設定手動切換標誌 */
                }
                else
                {
                    xYieldPending = pdTRUE;                 /* 設定自動切換標誌 */
                }
            }
        }
    }
    portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus );


    return xReturn;
}

等待通知

等待通知API函式只能用在任務中,沒有帶中斷保護版本,因此只有兩個API函式:ulTaskNotifyTake()和xTaskNotifyWait ()。前者是為代替二進位制訊號量和計數訊號量而專門設計的,它和傳送通知API函式xTaskNotifyGive()、vTaskNotifyGiveFromISR()配合使用;後者是全功能版的等待通知,可以根據不同的引數實現輕量級二進位制訊號量、計數訊號量、事件組和長度為1的佇列。

等待通知API函式都帶有最大阻塞時間引數,當任務因為等待通知而進入阻塞時,用來規定最大阻塞時間。

ulTaskNotifyTake()

這個API函式用於實現輕量級的二進位制訊號量和計數訊號量,原始碼如下所示。它有兩個引數,如果第一個引數xClearCountOnExit設定為pdFALSE,則用來實現二進位制訊號量,函式退出時將通知值清零;如果第一個引數設定為pdTRUE,則用來實現計數訊號量,函式退出時,將通知值減一。

uint32_t ulTaskNotifyTake( BaseType_t xClearCountOnExit, TickType_t xTicksToWait )
{
uint32_t ulReturn;


    taskENTER_CRITICAL();
    {
        /* 僅當通知值為0,才進行阻塞操作*/
        if( pxCurrentTCB->ulNotifiedValue == 0UL )
        {
            /* 設定標誌,表示當前任務等待一個通知*/
            pxCurrentTCB->ucNotifyState = taskWAITING_NOTIFICATION;


            if( xTicksToWait > ( TickType_t ) 0 )
            {   
                /* 將任務加入延時列表 */
                prvAddCurrentTaskToDelayedList( xTicksToWait, pdTRUE );
                traceTASK_NOTIFY_TAKE_BLOCK();


                /* 觸發PendSV中斷,等到退出臨界區時立即執行任務切換 */
                portYIELD_WITHIN_API();
            }
        }
    }
    taskEXIT_CRITICAL();
    /* 到這裡說明其它任務或中斷向這個任務傳送了通知,或者任務阻塞超時,現在繼續處理*/
    taskENTER_CRITICAL();
    {
        traceTASK_NOTIFY_TAKE();
        ulReturn = pxCurrentTCB->ulNotifiedValue;   


        if( ulReturn != 0UL )
        {
            if( xClearCountOnExit != pdFALSE )
            {
                pxCurrentTCB->ulNotifiedValue = 0UL;
            }
            else
            {
                pxCurrentTCB->ulNotifiedValue = ulReturn - 1;
            }
        }
        /* 設定標誌,表示不需要等待通知 */
        pxCurrentTCB->ucNotifyState = taskNOT_WAITING_NOTIFICATION;
    }
    taskEXIT_CRITICAL();


    return ulReturn;    /* 如果返回值為0,說明是任務阻塞超時了 */
}

與獲取二進位制訊號量和獲取計數訊號量函式相比,本函式少了很多呼叫子函式開銷、少了很多判斷、少了事件列表處理、少了佇列上鎖與解鎖處理等等,因此本函式相對效率很高。

xTaskNotifyWait()

這個函式用於實現全功能版的等待通知,根據引數的不同,可以靈活的用於實現輕量級的佇列、二進位制訊號量、計數訊號量和事件組功能,函式原型為:

BaseType_t xTaskNotifyWait( uint32_tulBitsToClearOnEntry,
                     uint32_tulBitsToClearOnExit,
                     uint32_t*pulNotificationValue,
                     TickType_txTicksToWait );
  • ulBitsToClearOnEntry:在使用通知之前,先將任務的通知值與引數ulBitsToClearOnEntry的按位取反值按位與操作。設定引數ulBitsToClearOnEntry為0xFFFFFFFF(ULONG_MAX),表示清零任務通知值。
  • ulBitsToClearOnExit:在函式xTaskNotifyWait()退出前,將任務的通知值與引數ulBitsToClearOnExit的按位取反值按位與操作。設定引數ulBitsToClearOnExit為0xFFFFFFFF(ULONG_MAX),表示清零任務通知值。
  • pulNotificationValue:用於向外回傳任務的通知值。這個通知值在引數ulBitsToClearOnExit起作用前將通知值拷貝到*pulNotificationValue中。如果不需要返回任務的通知值,這裡設定成NULL。
  • xTicksToWait:因等待通知而進入阻塞狀態的最大時間。時間單位為系統節拍週期。巨集pdMS_TO_TICKS用於將指定的毫秒時間轉化為相應的系統節拍數。

這個函式的實現和ulTaskNotifyTake()有很多相通之處,我將整個流程以註釋形式置於原始碼中,原始碼如下所示:

BaseType_t xTaskNotifyWait( uint32_t ulBitsToClearOnEntry, uint32_t ulBitsToClearOnExit, uint32_t *pulNotificationValue, TickType_t xTicksToWait )
{
BaseType_t xReturn;


    taskENTER_CRITICAL();
    {
        /* 只有任務沒有等待通知,才會將任務阻塞 */
        if( pxCurrentTCB->ucNotifyState != taskNOTIFICATION_RECEIVED )
        {
            /* 使用任務通知值之前,先將引數ulBitsToClearOnEntryClear取反後與任務通知值位與.可以用這種方法在使用任務通知值之前,將通知值的某些或全部位清零 */
            pxCurrentTCB->ulNotifiedValue &= ~ulBitsToClearOnEntry;


            /* 設定任務狀態標識:等待通知 */
            pxCurrentTCB->ucNotifyState = taskWAITING_NOTIFICATION;


            if( xTicksToWait > ( TickType_t ) 0 )
            {
                /* 阻塞當前任務 */
                prvAddCurrentTaskToDelayedList( xTicksToWait, pdTRUE );
                traceTASK_NOTIFY_WAIT_BLOCK();


                /* 觸發PendSV中斷,等到退出臨界區後,執行任務切換 */
                portYIELD_WITHIN_API();
            }
        }
    }
    taskEXIT_CRITICAL();


    /* 到這裡說明其它任務或中斷向這個任務傳送了通知,或者任務阻塞超時,現在繼續處理*/
    taskENTER_CRITICAL();
    {
        traceTASK_NOTIFY_WAIT();


        if( pulNotificationValue != NULL )
        {
            /* 輸出當前通知值,通過指標引數傳遞*/
            *pulNotificationValue = pxCurrentTCB->ulNotifiedValue;
        }


        /* 判斷是否是因為任務阻塞超時 */
        if( pxCurrentTCB->ucNotifyState == taskWAITING_NOTIFICATION )
        {
            /* 沒有收到任務通知,是阻塞超時 */
            xReturn = pdFALSE;
        }
        else
        {
            /* 收到任務值,先將引數ulBitsToClearOnExit取反後與通知值位與,用於在退出函式前,將通知值的某些或者全部位清零. */
            pxCurrentTCB->ulNotifiedValue &= ~ulBitsToClearOnExit;
            xReturn = pdTRUE;
        }
        /* 更改任務通知狀態,解除任務通知等待 */
        pxCurrentTCB->ucNotifyState = taskNOT_WAITING_NOTIFICATION;
    }
    taskEXIT_CRITICAL();


    return xReturn;
}

縱觀整個任務通知的實現,可以發現它比佇列、訊號量相比要簡單很多。它可以實現輕量級的佇列、二進位制訊號量、計數訊號量和事件組,並且使用更方便、更節省RAM、更高效。FreeRTOS的作者做過測試,在同一平臺下,使用使用GCC編譯器、-o2優化級別,相比使用訊號量解除任務阻塞,使用任務通知可以快45%!這個效能的提升是巨大的。我們分析過訊號量的原始碼,今天又分析了任務通知的原始碼,這使得我們知道,之所以有這麼大的效能提升,一方面緣於任務通知資料結構簡單、實現簡潔;另一方面也跟FreeRTOS的訊號量機制臃腫、效率低下有關。因為訊號量的實現全部是使用佇列機制,並沒有為訊號量做專門優化。

此外,著重說明一下任務通知並不能完全代替佇列、二進位制訊號量、計數訊號量和事件組,任務通知有自己的侷限性,我們就以它的侷限性來結束本文:

  • 只能有一個任務接收通知事件。
  • 接收通知的任務可以因為等待通知而進入阻塞狀態,但是傳送通知的任務即便不能立即完成傳送通知,也不能進入阻塞狀態。