1. 程式人生 > >IO 的阻塞和非阻塞一:等待佇列

IO 的阻塞和非阻塞一:等待佇列

阻塞操作是指在執行裝置操作時,若不能獲得資源,則掛起程序直到滿足可操作的條件後再 進行操作。被掛起的程序進入休眠狀態,被從排程器的執行佇列移走,直到等待的條件被滿足。 阻塞,預設的形式簡單直接效率低 非阻塞,相反,佔用資源比較多 阻塞從字面上聽起來似乎意味著低效率,實則不然,如果裝置驅動不阻塞,則使用者想獲取設 備資源只能不停地查詢,這反而會無謂地耗費 CPU 資源。而阻塞訪問時,不能獲取資源的程序將 進入休眠,它將 CPU 資源“禮讓”給其他程序。 介紹:以佇列為基礎資料結構,與程序排程機制緊密結合,能夠用於實現核心中的非同步事件通知機制,也可以用來同步對系統資源的訪問 注意:雖然說的是佇列,但不是 fifo,沒有 fifo 的特性

1. 等待佇列: -- wait queue

在 Linux 驅動程式中,可以使用等待佇列(wait queue)來實現阻塞程序的喚醒。它以佇列為基礎資料結構,與程序排程機制緊密結合,能夠用於實現核心中的非同步事件通知機制。等待佇列可以用來同步對系統資源的訪問,訊號量在核心中依賴等待佇列來實現。 定義
“等待佇列頭”
wait_queue_head_t my_queue;
初始化“等待佇列頭”
init_waitqueue_head(&my_queue);
而下面的 DECLARE_WAIT_QUEUE_HEAD()巨集可以作為定義並初始化等待佇列頭的“快捷方式”
DECLARE_WAIT_QUEUE_HEAD (name)
定義等待佇列
DECLARE_WAITQUEUE(name, tsk)
該巨集用於定義並初始化一個名為 name 的等待佇列。 新增/移除等待佇列
void fastcall add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait); void fastcall remove_wait_queue(wait_queue_head_t *q, wait_queue_t *wait);
add_wait_queue()用於將等待佇列 wait 新增到等待佇列頭 q 指向的等待佇列連結串列中,而remove_wait_queue()用於將等待佇列 wait 從附屬的等待佇列頭 q 指向的等待佇列連結串列中移除。 等待事件
wait_event(wait_queue_head_t queue, condition) --> 深睡,不可以被訊號打斷 wait_event_interruptible(wait_queue_head_t queue, condition) --> 淺睡,可以被訊號打斷 wait_event_timeout(wait_queue_head_t queue, condition, timeout) wait_event_interruptible_timeout(wait_queue_head_t queue, condition, timeout)
queue,作為等待佇列頭的等待佇列被喚醒
condition,條件,滿足 喚醒,否則 阻塞
timeout,阻塞等待的超時時間,單位是 jiffy,等待時間 timeout 後,無論條件滿足不滿足,都返回
喚醒佇列
void wake_up(wait_queue_head_t *queue); void wake_up_interruptible(wait_queue_head_t *queue);
喚醒時會判斷 condition ①wake_up() -- wait_event() / wait_event_timeout() ②wake_up_interruptible() -- wait_event_interruptible() / wait_event_interruptible_timeout() ①可以喚醒處於 TASK_INTERRUPTIBLE 和 TASK_UNINTERRUPTIBLE 的程序 ②可以喚醒處於 TASK_INTERRUPTIBLE 的程序 在等待佇列上睡眠
sleep_on(wait_queue_head_t *q); interruptible_sleep_on(wait_queue_head_t *q); 
sleep_on 將目前程序的狀態置成 TASK_UNINTERRUPTIBLE,並定義一個等待佇列,之後把它附屬到等待佇列頭 q,直到資源可獲得,q 引導的等待佇列被喚醒。 wake_up_interruptible將目前程序的狀態置成 TASK_ INTERRUPTIBLE,並定義一個等待佇列,之後把它附屬到等待佇列頭q,直到資源可獲得,q引導的等待佇列被喚醒或者程序收到訊號。 sleep_on()函式應該與 wake_up()成對使用,interruptible_sleep_on()應該wake_up_interruptible() 成對使用。 注意
在許多裝置驅動中,並不呼叫 sleep_on()或 interruptible_sleep_on(),而是親自進行程序的狀態改變和切換

2. 例子

為了讓 驅動支援 阻塞和非阻塞,需要在驅動中使用等待佇列:

waitqueue.c

#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <linux/cdev.h>
#include <asm/uaccess.h>
#include <linux/sched.h>
#include <linux/semaphore.h>
#include <linux/device.h>

MODULE_LICENSE ("GPL");

int hello_major = 250;
int hello_minor = 0;
int number_of_devices = 1;
struct class *my_class;

struct hello_device
{
    char data[128];
    int len;
    wait_queue_head_t rq, wq;
    struct semaphore sem;
    struct cdev cdev;
} hello_device;

static int hello_open (struct inode *inode, struct file *filp)
{
    filp->private_data = container_of(inode->i_cdev, struct hello_device, cdev);
    printk (KERN_INFO "Hey! device opened\n");
    printk (KERN_INFO "len = %d\n",(*((int *)filp->private_data)));

    return 0;
}

static int hello_release (struct inode *inode, struct file *filp)
{
    printk (KERN_INFO "Hmmm... device closed\n");

    return 0;
}

ssize_t hello_read (struct file *filp, char *buff, size_t count, loff_t *offp)
{
    ssize_t result = 0;
    struct hello_device *dev = filp->private_data;

    down(&dev->sem);
    while (hello_device.len == 0)
    {
        up(&dev->sem);
        if (filp->f_flags & O_NONBLOCK)
            return -EAGAIN;
        if (wait_event_interruptible(dev->rq, (dev->len != 0))) return -ERESTARTSYS;
        down(&dev->sem);
    }

    if (count > dev->len) count = dev->len;
    if (copy_to_user (buff, dev->data, count)) 
    {   
        result = -EFAULT;
    }
    else
    {
        printk (KERN_INFO "read %d bytes\n", (int)count);
        dev->len -= count;
        result = count;
        memcpy(dev->data, dev->data+count, dev->len);
    }
    up(&dev->sem);
    wake_up(&dev->wq);

    return result;
}

ssize_t hello_write (struct file *filp, const char  *buf, size_t count, loff_t *f_pos)
{
    ssize_t ret = 0;
    struct hello_device *dev = filp->private_data;

    if (count > 128) return -ENOMEM;
    down(&dev->sem);
    while (dev->len == 128)
    {
        up(&dev->sem);
        if (filp->f_flags & O_NONBLOCK)
            return -EAGAIN;
        if (wait_event_interruptible(dev->wq, (dev->len != 128))) return -ERESTARTSYS;
        down(&dev->sem);
    }

    if (count > (128 - dev->len)) count = 128 - dev->len;
    if (copy_from_user (dev->data+dev->len, buf, count)) {
        ret = -EFAULT;
    }
    else {
        printk (KERN_INFO "write %d bytes\n", (int)count);
        dev->len += count;
        ret = count;
    }
    up(&dev->sem);
    wake_up(&dev->rq);

    return ret;
}

struct file_operations hello_fops = {
    .owner = THIS_MODULE,
    .open  = hello_open,
    .release = hello_release,
    .read  = hello_read,
    .write = hello_write
};

static void char_reg_setup_cdev (void)
{
    int error;
    dev_t devno;

    devno = MKDEV (hello_major, hello_minor);
    cdev_init (&hello_device.cdev, &hello_fops);
    hello_device.cdev.owner = THIS_MODULE;
    error = cdev_add (&hello_device.cdev, devno , 1);
    if (error)
        printk (KERN_NOTICE "Error %d adding char_reg_setup_cdev", error);
}

static int char_dev_create (void)
{
    my_class = class_create(THIS_MODULE,"waitqueue_class");
    if(IS_ERR(my_class)) 
    {
        printk("Err: failed in creating class.\n");
        return -1; 
    }
    device_create(my_class, NULL, MKDEV (hello_major, hello_minor), NULL, "waitqueue");

    return 0;
}

static int __init hello_2_init (void)
{
    int result;
    dev_t devno;

    devno = MKDEV (hello_major, hello_minor);
    result = register_chrdev_region (devno, number_of_devices, "waitqueue");

    if (result < 0) {
        printk (KERN_WARNING "hello: can't get major number %d\n", hello_major);
        goto err1;
    }

    char_dev_create();
    char_reg_setup_cdev ();
    init_waitqueue_head(&hello_device.rq);
    init_waitqueue_head(&hello_device.wq);
    sema_init(&hello_device.sem, 1);
    memset(hello_device.data, 0, 128);
    hello_device.len = 0;

    printk (KERN_INFO "char device registered\n");

    return 0;

err1:
    device_destroy(my_class, devno);
    class_destroy(my_class);
    unregister_chrdev_region(devno, 1);

    return result;
}

static void __exit hello_2_exit (void)
{
    dev_t devno = MKDEV (hello_major, hello_minor);
    cdev_del (&hello_device.cdev);
    device_destroy(my_class, devno);         //delete device node under /dev//必須先刪除裝置,再刪除class類
    class_destroy(my_class);                 //delete class created by us
    unregister_chrdev_region (devno, number_of_devices);
    printk("waitqueue module exit \n");
    return;
}

module_init (hello_2_init);
module_exit (hello_2_exit);

test_write.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>

#define N 90

int main()
{
    int i, fd;
    char buf[N];

    for (i=0; i<90; i++) 
    {
        buf[i] = i + 33;
    }

    if ((fd = open("/dev/waitqueue", O_WRONLY)) < 0)
    {
        perror("fail to open");
    }
    printf("wrote %d bytes\n", (int)write(fd, buf, N));
    close(fd);

    return 0;
}

test_read.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>

#define N 90

int main()
{
    int i, fd;
    char buf[N] = {0};
    int num = 0;

    if ((fd = open("/dev/waitqueue", O_RDWR)) < 0)
    {
        perror("fail to open");
        return -1;

    }
    puts("open is ok");
    if((num = read(fd, buf, 10)) < 0)
    {
        printf("num = %d \n", num);
        perror("read:");

    }
    printf("read num = %d \n", num);
    printf("Is:");
    puts(buf);

    close(fd);

    return 0;
}

Makefile
ifeq ($(KERNELRELEASE),)

KERNELDIR ?= /lib/modules/$(shell uname -r)/build 
#KERNELDIR ?= ~/wor_lip/linux-3.4.112
PWD := $(shell pwd)

modules:
	$(MAKE) -C $(KERNELDIR) M=$(PWD) modules

modules_install:
	$(MAKE) -C $(KERNELDIR) M=$(PWD) modules_install

clean:
	rm -rf *.o *~ core .depend .*.cmd *.ko *.mod.c .tmp_versions modules* Module*

.PHONY: modules modules_install clean

else
	obj-m := waitqueue.o
endif

程式思路:
①在驅動中的讀和寫的方法中分別判斷 filp->f_flags 是不是 O_NONBLOCK ,如果標誌是非阻塞,就馬上返回, ②在讀和寫的方法中還要加上訊號量實現 PV 操作,防止多個函式讀的時候出現混亂的情況 ③在條件不滿足(讀的時候,緩衝區中內容長度 = 0.寫的時候緩衝區長度 = 128)呼叫相應的讀寫等待佇列
程式的功能:
寫函式一次寫 90 個字元, 讀函式每次只讀 10 個字元,
如果讀了很多次,讀完了 buf 中的內容,就會阻塞的等在哪裡,開啟另外的客戶端,進行寫操作,讀客戶端在寫完的剎那,能夠讀出資料 在有 A 客戶端讀完了,並且處於阻塞狀態,再開一個 B 客戶端依然讀阻塞,在 C 客戶端進行寫的時候,寫完後,A 客戶端會先得到資料, B 再得到資料