1. 程式人生 > >Python 協程與多工排程

Python 協程與多工排程

協程與多工排程

時間 2016-03-31 23:02:15 IT技術部落格大學習

原文

在電腦科學中,多工(multitasking)是指在同一個時間段內執行多個任務,現代計算機作為一個複雜的系統,執行的任務往往不止一個,所以多工排程對於計算機來說尤為重要。現階段多工排程主要分為搶佔式多工和協作式多工,搶佔式多工由作業系統決定程序的排程方案,而協作式多工是當前任務主動放棄執行後,下一個任務繼續進行。由於協作式任務管理受惡意程式的威脅更大,現階段幾乎所有的計算機都採用搶佔式多工管理。

現階段,主要靠多程序或多執行緒的方式來實現多工:

#include <stdio.h>

#include <unistd.h>

int main()

{

    pid_t pid;

    pid = fork();

    if(pid < 0){

        printf("Fork Error!\n");

    }else if (pid > 0){

        printf("This is the parent Process! Process Id is %d, Child id is %d\n",getpid(),pid);

        int i = 0;

        while(i < 10){

            printf("This is parent Process output of i %d

!\n",i);

            i++;

        }

    }else if (pid == 0){

        printf("This is the child Process! Process Id is %d, parent id is %d\n",getpid(),getppid());

        int j = 0;

        while(j < 10){

            printf("This is child Process output of j %d\n",j);

            j++;

        }

    }

    return 0;

}

《協程與yield》 中,我們說到了協程是一種比程序和執行緒更加輕量級的解決方案,也通過yield實現了協程,但最大的疑問是沒有提供像程序或執行緒類的任務排程,沒有體現出協程的優勢,下面我們來實現一個簡單的協程和協作式的多工排程。

首先我們需要對任務(Task)進行包裝:

class Task():

    def __init__(self,taskid,coroutine):

        self.__taskId = taskid

        self.__coroutine = coroutine

        self.__sendValue = ''

        self.__beforeFirstYield = True

        self.isFinished = False

    def getTaskId(self):

        return self.__taskId

    def setValue(self,value):

        self.__sendValue == value

    def run(self):

        if(self.__beforeFirstYield):

            self.__beforeFirstYield = False

            return self.__coroutine.next()

        else:

            try:

                retval = self.__coroutine.send(self.__sendValue)

                return retval

            except StopIteration:

                self.isFinished = True

                return ""

這裡的“任務”類似系統的程序,有ID,有傳送給使用者程式的訊息sendValue.

接下來需要一個任務排程器,專門用來管理任務:

from Queue import Queue

class Scheduler():

    def __init__(self):

        self.taskQueue = Queue()

        self.maxTaskId = 0

        self.taskMap = dict()

    def scheduler(self,task):

        self.taskQueue.put(task)

    def newTask(self,coroutine):

        self.maxTaskId+=1

        task = Task(self.maxTaskId,coroutine)

        self.taskMap[self.maxTaskId] = task

        self.scheduler(task)

        return self.maxTaskId

        def KillTask(self,taskid):

        if  not taskid in self.taskMap:

            return False

        i = 0

        while i < self.taskQueue.qsize():

            tmp = self.taskQueue.get()

            if tmp == self.taskMap[taskid]:

                del self.taskMap[taskid]

                break

            else:

                self.scheduler(tmp)

            i+=1

        return True

    def run(self):

        while not self.taskQueue.empty():

            task = self.taskQueue.get()

            retval = task.run()

            if task.isFinished:

                tid = task.getTaskId()

                del self.taskMap[tid]

            else:

                self.scheduler(task)

任務排程器是系統最核心的功能,相當於Linux中的init程式,用來管理所有的系統任務。其它任務通過註冊到任務排程器來實現其功能:

def task1():

    i = 0

    while i < 10:

        print "This is task 1 i is %s"%i

        i+=1

        yield

def task2():

    i = 0

    while i < 10:

        print "This is task 2 i is %s"%i

        i+=1

        yield

sch = Scheduler()

sch.newTask(task1())

sch.newTask(task2())

sch.run()

其結果輸出如下,可以看出任務一和任務二確實是交替執行,實現了任務排程的功能

This is task 1 i is 0

This is task 2 i is 0

This is task 1 i is 1

This is task 2 i is 1

This is task 1 i is 2

This is task 2 i is 2

This is task 1 i is 3

This is task 2 i is 3

This is task 1 i is 4

This is task 2 i is 4

This is task 1 i is 5

This is task 2 i is 5

This is task 1 i is 6

This is task 2 i is 6

This is task 1 i is 7

This is task 2 i is 7

This is task 1 i is 8

This is task 2 i is 8

This is task 1 i is 9

This is task 2 i is 9

上面我們實現多個任務的排程,它們能夠很好的交替執行,yield在這裡實現上提供類一個類似 中斷 的功能,一旦系統出現yield,排程器會自動呼叫另外的任務繼續執行。

然而,在上面的例子中,一但我們把任務提交給排程器,對程式就沒有了控制權,必須要等到任務執行結束。我們需要對任務有必要的控制權,如獲取任務ID,結束任務,複製任務等等,這裡需要用到和排程器的通訊,這裡就用到了yield的進行傳值。類似Linux一樣,我們可以給任務提供一些函式介面,任務通過yield把需要呼叫的函式傳給排程器,排程器返回結果給任務,如下:

def task3():

    pid = yield getpid()

    print "This taskid is %d"%pid

    i = 0

    while i < 10:

        print "This is task 3 i is %d"%i

        yield

要實現上面的呼叫,可以新增一個系統呼叫類:

class SysCall():

    def __init__(self,callback):

        self.__callback= callback

    def __call__(self,task,schedular):

        if not isinstance(task,Task):

            raise TypeError(task.__name__+" is not instance of Task")

        self.__callback(task,schedular)

然後對Scheduler類的run方法作出更改

def run(self):

    while not self.taskQueue.empty():

            task = self.taskQueue.get()

            retval = task.run()

            if isinstance(retval,SysCall):

                retval(task,self)

                continue

            if task.isFinished:

                tid = task.getTaskId()

                del self.taskMap[tid]

            else:

                self.scheduler(task)

然後新增供任務使用的介面函式:

def getpid():

    def tmp(task,schedular):

        task.setValue(task.getTaskId())

        schedular.scheduler(task)

    return SysCall(tmp)

def Killpid():

    def tmp(task,scheduler):

        task.setValue(scheduler.KillTask(taskid))

    return SysCall(tmp)

def fork():

    pass

這裡實現SysCall的主要目的是方便排程器對傳遞過去的函式型別進行控制,為了系統安全考慮,防止使用者提交危險函式破壞系統,不屬於SysCall類的函式一律不以執行。

至此,我們實現了一個完整協程任務排程器,而不是利用yield進行簡單的資料傳遞,yield是如此好用,以至於很多語言都逐漸加入對其的支援,如PHP5.5開始加入yield,javascript 6(ECMAScript 6)也加入了對其的支援,雖然其使用起來有一些區別,但是原理是相通的,深入理解協程和yield,對於理解任務排程,系統原理意義重大。

檔案下載

參考資料