1. 程式人生 > >資料包接收系列 — NAPI的原理和實現

資料包接收系列 — NAPI的原理和實現

本文主要內容:簡單分析NAPI的原理和實現。

核心版本:2.6.37

Author:zhangskd @ csdn

概述

NAPI是linux新的網絡卡資料處理API,據說是由於找不到更好的名字,所以就叫NAPI(New API),在2.5之後引入。

簡單來說,NAPI是綜合中斷方式與輪詢方式的技術。

中斷的好處是響應及時,如果資料量較小,則不會佔用太多的CPU事件;缺點是資料量大時,會產生過多中斷,

而每個中斷都要消耗不少的CPU時間,從而導致效率反而不如輪詢高。輪詢方式與中斷方式相反,它更適合處理

大量資料,因為每次輪詢不需要消耗過多的CPU時間;缺點是即使只接收很少資料或不接收資料時,也要佔用CPU

時間。

NAPI是兩者的結合,資料量低時採用中斷,資料量高時採用輪詢。平時是中斷方式,當有資料到達時,會觸發中斷

處理函式執行,中斷處理函式關閉中斷開始處理。如果此時有資料到達,則沒必要再觸發中斷了,因為中斷處理函

數中會輪詢處理資料,直到沒有新資料時才打開中斷。

很明顯,資料量很低與很高時,NAPI可以發揮中斷與輪詢方式的優點,效能較好。如果資料量不穩定,且說高不高

說低不低,則NAPI則會在兩種方式切換上消耗不少時間,效率反而較低一些。

實現

來看下NAPI和非NAPI的區別:

(1) 支援NAPI的網絡卡驅動必須提供輪詢方法poll()。

(2) 非NAPI的核心介面為netif_rx(),NAPI的核心介面為napi_schedule()。

(3) 非NAPI使用共享的CPU佇列softnet_data->input_pkt_queue,NAPI使用裝置記憶體(或者

裝置驅動程式的接收環)。

(1) NAPI裝置結構

/* Structure for NAPI scheduling similar to tasklet but with weighting */

struct napi_struct {
    /* The poll_list must only be managed by the entity which changes the
     * state of the NAPI_STATE_SCHED bit. This means whoever atomically
     * sets that bit can add this napi_struct to the per-cpu poll_list, and
     * whoever clears that bit can remove from the list right before clearing the bit.
     */
    struct list_head poll_list; /* 用於加入處於輪詢狀態的裝置佇列 */
    unsigned long state; /* 裝置的狀態 */
    int weight; /* 每次處理的最大數量,非NAPI預設為64 */
    int (*poll) (struct napi_struct *, int); /* 此裝置的輪詢方法,非NAPI為process_backlog() */

#ifdef CONFIG_NETPOLL
    ...
#endif

    unsigned int gro_count;
    struct net_device *dev;
    struct list_head dev_list;
    struct sk_buff *gro_list;
    struct sk_buff *skb;
};

(2) 初始化

初始napi_struct例項。

void netif_napi_add(struct net_device *dev, struct napi_struct *napi,
        int (*poll) (struct napi_struct *, int), int weight)
{
    INIT_LIST_HEAD(&napi->poll_list);
    napi->gro_count = 0;
    napi->gro_list = NULL;
    napi->skb = NULL;
    napi->poll = poll; /* 裝置的poll函式 */
    napi->weight = weight; /* 裝置每次poll能處理的資料包個數上限 */

    list_add(&napi->dev_list, &dev->napi_list); /* 加入裝置的napi_list */
    napi->dev = dev; /* 所屬裝置 */

#ifdef CONFIG_NETPOLL
    spin_lock_init(&napi->poll_lock);
    napi->poll_owner = -1;
#endif
    set_bit(NAPI_STATE_SCHED, &napi->state); /* 設定NAPI標誌位 */
}

(3) 排程

在網絡卡驅動的中斷處理函式中呼叫napi_schedule()來使用NAPI。

/**
 * napi_schedule - schedule NAPI poll
 * @n: napi context
 * Schedule NAPI poll routine to be called if it is not already running.
 */

static inline void napi_schedule(struct napi_struct *n)
{
    /* 判斷是否可以排程NAPI */
    if (napi_schedule_prep(n))
        __napi_schedule(n);
}

判斷NAPI是否可以排程。如果NAPI沒有被禁止,且不存在已被排程的NAPI,

則允許排程NAPI,因為同一時刻只允許有一個NAPI poll instance。

/**
 * napi_schedule_prep - check if napi can be scheduled
 * @n: napi context
 * Test if NAPI routine is already running, and if not mark it as running.
 * This is used as a condition variable insure only one NAPI poll instance runs.
 * We also make sure there is no pending NAPI disable.
 */

static inline int napi_schedule_prep(struct napi_struct *n)
{
    return !napi_disable_pending(n) && !test_and_set_bit(NAPI_STATE_SCHED, &n->state);
}
 
static inline int napi_disable_pending(struct napi_struct *n)
{
    return test_bit(NAPI_STATE_DISABLE, &n->state);
} 

enum {
    NAPI_STATE_SCHED, /* Poll is scheduled */
    NAPI_STATE_DISABLE, /* Disable pending */
    NAPI_STATE_NPSVC, /* Netpoll - don't dequeue from poll_list */
};

NAPI的排程函式。把裝置的napi_struct例項新增到當前CPU的softnet_data的poll_list中,

以便於接下來進行輪詢。然後設定NET_RX_SOFTIRQ標誌位來觸發軟中斷。

void __napi_schedule(struct napi_struct *n)
{
    unsigned long flags;
    local_irq_save(flags);
    ____napi_schedule(&__get_cpu_var(softnet_data), n);
    local_irq_restore(flags);
}

static inline void ____napi_schedule(struct softnet_data *sd, struct napi_struct *napi)
{
    /* 把napi_struct新增到softnet_data的poll_list中 */
    list_add_tail(&napi->poll_list, &sd->poll_list);
    __raise_softirq_irqoff(NET_RX_SOFTIRQ); /* 設定軟中斷標誌位 */
}

(4) 輪詢方法

NAPI方式中的POLL方法由驅動程式提供,在通過netif_napi_add()加入napi_struct時指定。

在驅動的poll()中,從自身的佇列中獲取sk_buff後,如果網絡卡開啟了GRO,則會呼叫

napi_gro_receive()處理skb,否則直接呼叫netif_receive_skb()。

POLL方法應該和process_backlog()大體一致,多了一些具體裝置相關的部分。

(5) 非NAPI和NAPI處理流程對比

以下是非NAPI裝置和NAPI裝置的資料包接收流程對比圖:

NAPI方式在上半部中sk_buff是儲存在驅動自身的佇列中的,軟中斷處理過程中驅動POLL方法呼叫

netif_receive_skb()直接處理skb並提交給上層。

/**
 * netif_receive_skb - process receive buffer from network
 * @skb: buffer to process
 * netif_receive_skb() is the main receive data processing function.
 * It always succeeds. The buffer may be dropped during processing
 * for congestion control or by the protocol layers.
 * This function may only be called from softirq context and interrupts
 * should be enabled.
 * Return values (usually ignored):
 * NET_RX_SUCCESS: no congestion
 * NET_RX_DROP: packet was dropped
 */

int netif_receive_skb(struct sk_buff *skb)
{
    /* 記錄接收時間到skb->tstamp */
    if (netdev_tstamp_prequeue)
        net_timestamp_check(skb);
 
    if (skb_defer_rx_timestamp(skb))
        return NET_RX_SUCCESS;

#ifdef CONFIG_RPS
    ...
#else
    return __netif_receive_skb(skb);
#endif
}

__netif_receive_skb()在上篇blog中已分析過了,接下來就是網路層來處理接收到的資料包了。