1. 程式人生 > >Linux內核中的軟中斷、tasklet和工作隊列具體解釋

Linux內核中的軟中斷、tasklet和工作隊列具體解釋

模塊 單向 處理流 了解 討論 cancel spi execution 大循環

[TOC]
本文基於Linux2.6.32內核版本號。

引言

軟中斷、tasklet和工作隊列並非Linux內核中一直存在的機制,而是由更早版本號的內核中的“下半部”(bottom half)演變而來。

下半部的機制實際上包含五種,但2.6版本號的內核中。下半部和任務隊列的函數都消失了,僅僅剩下了前三者。
介紹這三種下半部實現之前。有必要說一下上半部與下半部的差別。


上半部指的是中斷處理程序,下半部則指的是一些盡管與中斷有相關性可是能夠延後運行的任務。

舉個樣例:在網絡傳輸中。網卡接收到數據包這個事件不一定須要立即被處理,適合用下半部去實現;可是用戶敲擊鍵盤這種事件就必須立即被響應,應該用中斷實現。
兩者的主要差別在於:中斷不能被同樣類型的中斷打斷。而下半部依舊能夠被中斷打斷;中斷對於時間很敏感,而下半部基本上都是一些能夠延遲的工作。由於二者的這種差別,所以對於一個工作是放在上半部還是放在下半部去運行,能夠參考以下4條:

  1. 假設一個任務對時間很敏感。將其放在中斷處理程序中運行。
  2. 假設一個任務和硬件相關,將其放在中斷處理程序中運行。
  3. 假設一個任務要保證不被其它中斷(特別是同樣的中斷)打斷,將其放在中斷處理程序中運行。
  4. 其它全部任務,考慮放在下半部去運行。
    有寫內核任務須要延後運行。因此才有的下半部,進而實現了三種實現下半部的方法。這就是本文要討論的軟中斷tasklet工作隊列

下表能夠更直觀的看到它們之間的關系。
技術分享

軟中斷

軟中斷作為下半部機制的代表,是隨著SMP(share memory processor)的出現應運而生的,它也是tasklet實現的基礎(tasklet實際上僅僅是在軟中斷的基礎上加入了一定的機制)。軟中斷通常是“可延遲函數”的總稱,有時候也包含了tasklet(請讀者在遇到的時候依據上下文判斷是否包含tasklet)。它的出現就是由於要滿足上面所提出的上半部和下半部的差別,使得對時間不敏感的任務延後運行,而且能夠在多個CPU上並行運行。使得總的系統效率能夠更高。

它的特性包含:

  • 產生後並非立即能夠運行,必須要等待內核的調度才幹運行。軟中斷不能被自己打斷(即單個cpu上軟中斷不能嵌套運行)。僅僅能被硬件中斷打斷(上半部)。
  • 能夠並發運行在多個CPU上(即使同一類型的也能夠)。所以軟中斷必須設計為可重入的函數(同意多個CPU同一時候操作),因此也須要使用自旋鎖來保其數據結構。

相關數據結構

  • 軟中斷描寫敘述符
    struct softirq_action{ void (*action)(struct softirq_action *);};
    描寫敘述每一種類型的軟中斷,其中void(*action)是軟中斷觸發時的運行函數。
  • 軟中斷全局數據和類型
static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;  
    enum  
    {  
       HI_SOFTIRQ=0, /*用於高優先級的tasklet*/  
       TIMER_SOFTIRQ, /*用於定時器的下半部*/  
       NET_TX_SOFTIRQ, /*用於網絡層發包*/  
       NET_RX_SOFTIRQ, /*用於網絡層收報*/  
       BLOCK_SOFTIRQ,  
       BLOCK_IOPOLL_SOFTIRQ,  
       TASKLET_SOFTIRQ, /*用於低優先級的tasklet*/  
       SCHED_SOFTIRQ,  
       HRTIMER_SOFTIRQ,  
       RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */  
       NR_SOFTIRQS  
   };

相關API

  • 註冊軟中斷
void open_softirq(int nr, void (*action)(struct softirq_action *))

即註冊相應類型的處理函數到全局數組softirq_vec中。比如網絡發包相應類型為NET_TX_SOFTIRQ的處理函數net_tx_action.

  • 觸發軟中斷
void raise_softirq(unsigned int nr)

實際上即以軟中斷類型nr作為偏移量置位每cpu變量irq_stat[cpu_id]的成員變量__softirq_pending。這也是同一類型軟中斷能夠在多個cpu上並行運行的根本原因。

  • 軟中斷運行函數
do_softirq-->__do_softirq

運行軟中斷處理函數__do_softirq前首先要滿足兩個條件:
(1)不在中斷中(硬中斷、軟中斷和NMI) 。1
(2)有軟中斷處於pending狀態。


系統這麽設計是為了避免軟件中斷在中斷嵌套中被調用,而且達到在單個CPU上軟件中斷不能被重入的目的。對於ARM架構的CPU不存在中斷嵌套中調用軟件中斷的問題,由於ARM架構的CPU在處理硬件中斷的過程中是關閉掉中斷的。

僅僅有在進入了軟中斷處理過程中之後才會開啟硬件中斷,假設在軟件中斷處理過程中有硬件中斷嵌套,也不會再次調用軟中斷,because硬件中斷是軟件中斷處理過程中再次進入的,此時preempt_count已經記錄了軟件中斷!

對於其它架構的CPU,有可能在觸發調用軟件中斷前,也就是還在處理硬件中斷的時候,就已經開啟了硬件中斷,可能會發生中斷嵌套,在中斷嵌套中是不同意調用軟件中斷處理的。Why?我的理解是,在發生中斷嵌套的時候,表明這個時候是系統突發繁忙的時候,內核第一要務就是趕緊把中斷中的事情處理完畢,退出中斷嵌套。避免多次嵌套,哪裏有時間處理軟件中斷。所以把軟件中斷推遲到了全部中斷處理完畢的時候才幹觸發軟件中斷。

實現原理和實例

軟中斷的調度時機:

  1. do_irq完畢I/O中斷時調用irq_exit。
  2. 系統使用I/O APIC,在處理完本地時鐘中斷時。
  3. local_bh_enable,即開啟本地軟中斷時。
  4. SMP系統中。cpu處理完被CALL_FUNCTION_VECTOR處理器間中斷所觸發的函數時。

  5. ksoftirqd/n線程被喚醒時。
    以下以從中斷處理返回函數irq_exit中調用軟中斷為例詳細說明。


    觸發和初始化的的流程如圖所看到的:
    技術分享

軟中斷處理流程

asmlinkage void __do_softirq(void)
{
    struct softirq_action *h;
    __u32 pending;
    int max_restart = MAX_SOFTIRQ_RESTART;
    int cpu;

    pending = local_softirq_pending();
    account_system_vtime(current);

    __local_bh_disable((unsigned long)__builtin_return_address(0));
    lockdep_softirq_enter();

    cpu = smp_processor_id();
restart:
    /* Reset the pending bitmask before enabling irqs */
    set_softirq_pending(0);

    local_irq_enable();

    h = softirq_vec;

    do {
        if (pending & 1) {
            int prev_count = preempt_count();
            kstat_incr_softirqs_this_cpu(h - softirq_vec);

            trace_softirq_entry(h, softirq_vec);
            h->action(h);
            trace_softirq_exit(h, softirq_vec);
            if (unlikely(prev_count != preempt_count())) {
                printk(KERN_ERR "huh, entered softirq %td %s %p"
                       "with preempt_count %08x,"
                       " exited with %08x?

\n", h - softirq_vec, softirq_to_name[h - softirq_vec], h->action, prev_count, preempt_count()); preempt_count() = prev_count; } rcu_bh_qs(cpu); } h++; pending >>= 1; } while (pending); local_irq_disable(); pending = local_softirq_pending(); if (pending && --max_restart) goto restart; if (pending) wakeup_softirqd(); lockdep_softirq_exit(); account_system_vtime(current); _local_bh_enable(); }

  1. 首先調用local_softirq_pending函數取得眼下有哪些位存在軟件中斷。
  2. 調用__local_bh_disable關閉軟中斷,事實上就是設置正在處理軟件中斷標記,在同一個CPU上使得不能重入__do_softirq函數。

  3. 又一次設置軟中斷標記為0,set_softirq_pending又一次設置軟中斷標記為0,這樣在之後又一次開啟中斷之後硬件中斷中又能夠設置軟件中斷位。
  4. 調用local_irq_enable。開啟硬件中斷。
  5. 之後在一個循環中。遍歷pending標誌的每一位,假設這一位設置就會調用軟件中斷的處理函數。在這個過程中硬件中斷是開啟的,隨時能夠打斷軟件中斷。這樣保證硬件中斷不會丟失。
  6. 之後關閉硬件中斷(local_irq_disable),查看是否又有軟件中斷處於pending狀態。假設是,而且在本次調用__do_softirq函數過程中沒有累計反復進入軟件中斷處理的次數超過max_restart=10次,就能夠又一次調用軟件中斷處理。假設超過了10次,就調用wakeup_softirqd()喚醒內核的一個進程來處理軟件中斷。設立10次的限制。也是為了避免影響系統響應時間。

  7. 調用_local_bh_enable開啟軟中斷。

軟中斷內核線程

之前我們分析的觸發軟件中斷的位置事實上是中斷上下文中,而在軟中斷的內核線程中實際已經是進程的上下文。
這裏說的軟中斷上下文指的就是系統為每一個CPU建立的ksoftirqd進程。
軟中斷的內核進程中主要有兩個大循環,外層的循環處理有軟件中斷就處理。沒有軟件中斷就休眠。內層的循環處理軟件中斷,每循環一次都試探一次是否過長時間占領了CPU,須要調度就釋放CPU給其它進程。詳細的操作在凝視中做了解釋。

    set_current_state(TASK_INTERRUPTIBLE);
    //外層大循環。
    while (!kthread_should_stop()) {
        preempt_disable();//禁止內核搶占,自己掌握cpu
        if (!local_softirq_pending()) {
            preempt_enable_no_resched();
            //假設沒有軟中斷在pending中就讓出cpu
            schedule();
            //調度之後又一次掌握cpu
            preempt_disable();
        }

        __set_current_state(TASK_RUNNING);

        while (local_softirq_pending()) {
            /* Preempt disable stops cpu going offline.
               If already offline, we‘ll be on wrong CPU:
               don‘t process */
            if (cpu_is_offline((long)__bind_cpu))
                goto wait_to_die;
            //有軟中斷則開始軟中斷調度
            do_softirq();
            //查看是否須要調度,避免一直占用cpu
            preempt_enable_no_resched();
            cond_resched();
            preempt_disable();
            rcu_sched_qs((long)__bind_cpu);
        }
        preempt_enable();
        set_current_state(TASK_INTERRUPTIBLE);
    }
    __set_current_state(TASK_RUNNING);
    return 0;

wait_to_die:
    preempt_enable();
    /* Wait for kthread_stop */
    set_current_state(TASK_INTERRUPTIBLE);
    while (!kthread_should_stop()) {
        schedule();
        set_current_state(TASK_INTERRUPTIBLE);
    }
    __set_current_state(TASK_RUNNING);
    return 0;

tasklet

由於軟中斷必須使用可重入函數,這就導致設計上的復雜度變高。作為設備驅動程序的開發人員來說,添加了負擔。而假設某種應用並不須要在多個CPU上並行運行,那麽軟中斷事實上是沒有必要的。

因此誕生了彌補以上兩個要求的tasklet。它具有以下特性:
a)一種特定類型的tasklet僅僅能運行在一個CPU上,不能並行,僅僅能串行運行。
b)多個不同類型的tasklet能夠並行在多個CPU上。


c)軟中斷是靜態分配的。在內核編譯好之後,就不能改變。但tasklet就靈活很多,能夠在運行時改變(比方加入模塊時)。


tasklet是在兩種軟中斷類型的基礎上實現的。因此假設不須要軟中斷的並行特性,tasklet就是最好的選擇。也就是說tasklet是軟中斷的一種特殊使用方法。即延遲情況下的串行運行

相關數據結構

  • tasklet描寫敘述符
struct tasklet_struct
{
      struct tasklet_struct *next;//將多個tasklet鏈接成單向循環鏈表
      unsigned long state;//TASKLET_STATE_SCHED(Tasklet is scheduled for execution)  TASKLET_STATE_RUN(Tasklet is running (SMP only))
      atomic_t count;//0:激活tasklet 非0:禁用tasklet
      void (*func)(unsigned long); //用戶自己定義函數
      unsigned long data;  //函數入參
};
  • tasklet鏈表
static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);//低優先級
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);//高優先級

相關API

  • 定義tasklet
#define DECLARE_TASKLET(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }
//定義名字為name的非激活tasklet
#define DECLARE_TASKLET_DISABLED(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data } 
//定義名字為name的激活tasklet
void tasklet_init(struct tasklet_struct *t,void (*func)(unsigned long), unsigned long data)
//動態初始化tasklet
  • tasklet操作
static inline void tasklet_disable(struct tasklet_struct *t)
//函數臨時禁止給定的tasklet被tasklet_schedule調度。直到這個tasklet被再次被enable;若這個tasklet當前在運行, 這個函數忙等待直到這個tasklet退出
static inline void tasklet_enable(struct tasklet_struct *t)
//使能一個之前被disable的tasklet。若這個tasklet已經被調度, 它會很快運行。

tasklet_enable和tasklet_disable必須匹配調用, 由於內核跟蹤每一個tasklet的"禁止次數" static inline void tasklet_schedule(struct tasklet_struct *t) //調度 tasklet 運行,假設tasklet在運行中被調度, 它在完畢後會再次運行; 這保證了在其它事件被處理其中發生的事件受到應有的註意. 這個做法也同意一個 tasklet 又一次調度它自己 tasklet_hi_schedule(struct tasklet_struct *t) //和tasklet_schedule相似,僅僅是在更高優先級運行。當軟中斷處理運行時, 它處理高優先級 tasklet 在其它軟中斷之前,僅僅有具有低響應周期要求的驅動才應使用這個函數, 可避免其它軟件中斷處理引入的附加周期. tasklet_kill(struct tasklet_struct *t) //確保了 tasklet 不會被再次調度來運行,通常當一個設備正被關閉或者模塊卸載時被調用。假設 tasklet 正在運行, 這個函數等待直到它運行完畢。

若 tasklet 又一次調度它自己,則必須阻止在調用 tasklet_kill 前它又一次調度它自己,如同使用 del_timer_sync

實現原理

  • 調度原理
static inline void tasklet_schedule(struct tasklet_struct *t)
{
    if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
        __tasklet_schedule(t);
}
void __tasklet_schedule(struct tasklet_struct *t)
{
    unsigned long flags;

    local_irq_save(flags);
    t->next = NULL;
    *__get_cpu_var(tasklet_vec).tail = t;
    __get_cpu_var(tasklet_vec).tail = &(t->next);//加入低優先級列表
    raise_softirq_irqoff(TASKLET_SOFTIRQ);//觸發軟中斷
    local_irq_restore(flags);
}
  • tasklet運行過程
    TASKLET_SOFTIRQ相應運行函數為tasklet_action。HI_SOFTIRQ為tasklet_hi_action,以tasklet_action為例說明。tasklet_hi_action大同小異。
static void tasklet_action(struct softirq_action *a)
{
    struct tasklet_struct *list;

    local_irq_disable();
    list = __get_cpu_var(tasklet_vec).head;
    __get_cpu_var(tasklet_vec).head = NULL;
    __get_cpu_var(tasklet_vec).tail = &__get_cpu_var(tasklet_vec).head;//取得tasklet鏈表
    local_irq_enable();

    while (list) {
        struct tasklet_struct *t = list;

        list = list->next;

        if (tasklet_trylock(t)) {
            if (!atomic_read(&t->count)) {
                //運行tasklet
                if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
                    BUG();
                t->func(t->data);
                tasklet_unlock(t);
                continue;
            }
            tasklet_unlock(t);
        }
        //假設t->count的值不等於0,說明這個tasklet在調度之後。被disable掉了。所以會將tasklet結構體又一次放回到tasklet_vec鏈表。並又一次調度TASKLET_SOFTIRQ軟中斷,在之後enable這個tasklet之後又一次再運行它
        local_irq_disable();
        t->next = NULL;
        *__get_cpu_var(tasklet_vec).tail = t;
        __get_cpu_var(tasklet_vec).tail = &(t->next);
        __raise_softirq_irqoff(TASKLET_SOFTIRQ);
        local_irq_enable();
    }
}

技術分享

工作隊列

從上面的介紹看以看出,軟中斷運行在中斷上下文中。因此不能堵塞和睡眠。而tasklet使用軟中斷實現。當然也不能堵塞和睡眠。但假設某延遲處理函數須要睡眠或者堵塞呢?沒關系工作隊列就能夠如您所願了。
把推後運行的任務叫做工作(work),描寫敘述它的數據結構為work_struct ,這些工作以隊列結構組織成工作隊列(workqueue),其數據結構為workqueue_struct 。而工作線程就是負責運行工作隊列中的工作。系統默認的工作者線程為events。
工作隊列(work queue)是第二種將工作推後運行的形式。

工作隊列能夠把工作推後,交由一個內核線程去運行—這個下半部分總是會在進程上下文運行,但由於是內核線程,其不能訪問用戶空間。最重要特點的就是工作隊列同意又一次調度甚至是睡眠


通常。在工作隊列和軟中斷/tasklet中作出選擇很easy。可使用以下規則:
- 假設推後運行的任務須要睡眠,那麽僅僅能選擇工作隊列。


- 假設推後運行的任務須要延時指定的時間再觸發,那麽使用工作隊列。由於其能夠利用timer延時(內核定時器實現)。
- 假設推後運行的任務須要在一個tick之內處理。則使用軟中斷或tasklet。由於其能夠搶占普通進程和內核線程。同一時候不可睡眠。


- 假設推後運行的任務對延遲的時間沒有不論什麽要求。則使用工作隊列,此時通常為無關緊要的任務。
實際上。工作隊列的本質就是將工作交給內核線程處理,因此其能夠用內核線程替換。

可是內核線程的創建和銷毀對編程者的要求較高,而工作隊列實現了內核線程的封裝,不易出錯,所以我們也推薦使用工作隊列。

相關數據結構

  • 正常工作結構體
struct work_struct {
    atomic_long_t data; //傳遞給工作函數的參數
#define WORK_STRUCT_PENDING 0       /* T if work item pending execution */
#define WORK_STRUCT_FLAG_MASK (3UL)
#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
    struct list_head entry; //鏈表結構。鏈接同一工作隊列上的工作。
    work_func_t func; //工作函數,用戶自己定義實現
#ifdef CONFIG_LOCKDEP
    struct lockdep_map lockdep_map;
#endif
};
//工作隊列運行函數的原型:
void (*work_func_t)(struct work_struct *work);
//該函數會由一個工作者線程運行,因此其在進程上下文中。能夠睡眠也能夠中斷。但僅僅能在內核中運行,無法訪問用戶空間。

  • 延遲工作結構體(延遲的實現是在調度時延遲插入相應的工作隊列)
struct delayed_work {
    struct work_struct work;
    struct timer_list timer; //定時器。用於實現延遲處理
};
  • 工作隊列結構體
struct workqueue_struct {
    struct cpu_workqueue_struct *cpu_wq; //指針數組,其每一個元素為per-cpu的工作隊列
    struct list_head list;
    const char *name;
    int singlethread; //標記是否僅僅創建一個工作者線程
    int freezeable;     /* Freeze threads during suspend */
    int rt;
#ifdef CONFIG_LOCKDEP
    struct lockdep_map lockdep_map;
#endif
};
  • 每cpu工作隊列(每cpu都相應一個工作者線程worker_thread)
struct cpu_workqueue_struct {
    spinlock_t lock;
    struct list_head worklist;
    wait_queue_head_t more_work;
    struct work_struct *current_work;
    struct workqueue_struct *wq;
    struct task_struct *thread;
} ____cacheline_aligned;

相關API

  • 缺省工作隊列
靜態創建 
DECLARE_WORK(name,function); //定義正常運行的工作項
DECLARE_DELAYED_WORK(name,function);//定義延後運行的工作項

動態創建
INIT_WORK(_work, _func) //創建正常運行的工作項
INIT_DELAYED_WORK(_work, _func)//創建延後運行的工作項

調度默認工作隊列
int schedule_work(struct work_struct *work)

//對正常運行的工作進行調度,即把給定工作的處理函數提交給缺省的工作隊列和工作者線程。工作者線程本質上是一個普通的內核線程。在默認情況下。每一個CPU均有一個類型為“events”的工作者線程,當調用schedule_work時,這個工作者線程會被喚醒去運行工作鏈表上的全部工作。

系統默認的工作隊列名稱是:keventd_wq,默認的工作者線程叫:events/n。這裏的n是處理器的編號,每一個處理器相應一個線程。比方。單處理器的系統僅僅有events/0這樣一個線程。

而雙處理器的系統就會多一個events/1線程。

默認的工作隊列和工作者線程由內核初始化時創建: start_kernel()-->rest_init-->do_basic_setup-->init_workqueues 調度延遲工作 int schedule_delayed_work(struct delayed_work *dwork,unsigned long delay) 刷新缺省工作隊列 void flush_scheduled_work(void) //此函數會一直等待。直到隊列中的全部工作都被運行。 取消延遲工作 static inline int cancel_delayed_work(struct delayed_work *work) //flush_scheduled_work並不取消不論什麽延遲運行的工作,因此。假設要取消延遲工作,應該調用cancel_delayed_work。

以上均是採用缺省工作者線程來實現工作隊列。其長處是簡單易用,缺點是假設缺省工作隊列負載太重。運行效率會很低。這就須要我們創建自己的工作者線程和工作隊列。

  • 自己定義工作隊列
create_workqueue(name) 
//宏定義 返回值為工作隊列,name為工作線程名稱。創建新的工作隊列和相應的工作者線程,name用於該內核線程的命名。

int queue_work(struct workqueue_struct *wq, struct work_struct *work)
//相似於schedule_work。差別在於queue_work把給定工作提交給創建的工作隊列wq而不是缺省隊列。

int queue_delayed_work(struct workqueue_struct *wq,struct delayed_work *dwork, unsigned long delay)
//調度延遲工作。

void flush_workqueue(struct workqueue_struct *wq)
//刷新指定工作隊列。

void destroy_workqueue(struct workqueue_struct *wq) //釋放創建的工作隊列。

實現原理

  1. 工作隊列的組織結構
    即workqueue_struct、cpu_workqueue_struct與work_struct的關系。
    一個工作隊列相應一個work_queue_struct。工作隊列中每cpu的工作隊列由cpu_workqueue_struct表示,而work_struct為其上的詳細工作。
    關系例如以下圖所看到的:
    技術分享
    2.工作隊列的工作過程
    技術分享
  2. 應用實例
    linux各個接口的狀態(up/down)的消息須要通知netdev_chain上感興趣的模塊同一時候上報用戶空間消息。這裏使用的就是工作隊列。
    詳細流程圖例如以下所看到的:
    技術分享

  1. 是否處於中斷中在Linux中是通過preempt_count來判斷的,詳細例如以下: 在linux系統的進程數據結構裏,有這麽一個數據結構:
    #define preempt_count() (current_thread_info()->preempt_count)
    利用preempt_count能夠表示是否處於中斷處理或者軟件中斷處理過程中,例如以下所看到的:
    # define hardirq_count() (preempt_count() & HARDIRQ_MASK)
    #define softirq_count() (preempt_count() & SOFTIRQ_MASK)
    #define irq_count() (preempt_count() & (HARDIRQ_MASK | SOFTIRQ_MASK | NMI_MASK))
    #define in_irq() (hardirq_count())
    #define in_softirq() (softirq_count())
    #define in_interrupt() (irq_count())
    技術分享
    preempt_count的8~23位記錄中斷處理和軟件中斷處理過程的計數。

    假設有計數,表示系統在硬件中斷或者軟件中斷處理過程中。 ?

Linux內核中的軟中斷、tasklet和工作隊列具體解釋