1. 程式人生 > >Gevent的協程實現原理

Gevent的協程實現原理

handle 保存 ont expires 了吧 理解 cal easy try

之前之所以看greenlet的代碼實現,主要就是想要看看gevent庫的實現代碼。

。。

然後知道了gevent的協程是基於greenlet來實現的。。。所以就又先去看了看greenlet的實現。。。

這裏就不說greenlet的詳細實現了。關鍵就是棧數據的復制拷貝,棧指針的位移。

。。


由於gevent帶有自己的I/O以及定時循環,所以它對greenlet又加了一層的擴展。。

這裏我們用例如以下的代碼來舉樣例,然後再來詳細的分析gevent是怎樣擴展greenlet的吧:

import gevent

def hello(fname):
	print "hello : ", fname
	gevent.sleep(0)
	print "12321 : ", fname


task1 = gevent.spawn(hello, "fjs1")
task2 = gevent.spawn(hello, "fjs2")

task1.join()

這段代碼的輸出例如以下:

技術分享


嗯,那麽閑來看看spawn方法是怎樣創建協程的吧:

    #類方法,這個說白了gevent提供的一層構造
    @classmethod
    def spawn(cls, *args, **kwargs):
        """Return a new :class:`Greenlet` object, scheduled to start.

        The arguments are passed to :meth:`Greenlet.__init__`.
        """
        g = cls(*args, **kwargs)  #先構造greenlet對象
        g.start() #調用start方法,相當於在hub對象的loop上註冊回調,這個回調的作用就是調用當前greenlet的switch切換到這個greenlet的運行
        return g

這種方法是一個類方法,用於創建一個Greenlet,只是這個要註意。當前這個greenlet已經不是前面提到的greenlet庫中定義的那樣了,其做了一層簡單的擴展。

來看看構造函數:

#繼承了greenlet,相當於是過擴展了一些功能
class Greenlet(greenlet):
    """A light-weight cooperatively-scheduled execution unit."""

    def __init__(self, run=None, *args, **kwargs):
        hub = get_hub()
        greenlet.__init__(self, parent=hub)   #這裏將全部創建的greenlet對象的parent都指向了這個唯一的hub對象
        if run is not None:
            self._run = run  #記錄run信息
        self.args = args
        self.kwargs = kwargs
        self._links = deque()
        self.value = None
        self._exception = _NONE
        self._notifier = None
        self._start_event = None

這裏直接繼承了greenlet庫中greenlet的定義。然後在構造函數中有比較重要的地方,能夠看到,全部構造出來的協程的parent都將會指向一個名字叫hub的主協程。。

這個非常關鍵。它就是整個gevent的主循環協程,全部創建的業務協程的執行都要依賴於它的調度和管理。。

好了。在上面spawn過程中。最後還調用了start方法啟動協程。那麽我們來看看這種方法的定義吧:

#事實上這個主要是在hub對象的loop上面掛起一個要運行的回調,而這個回調的功能就是切換到這個greenlet的運行
    def start(self):
        """Schedule the greenlet to run in this loop iteration"""
        if self._start_event is None:
            #事實上這個僅僅是在hub的loop上面掛起一個回調,然後在hub的loop裏面會運行這個回調
            self._start_event = self.parent.loop.run_callback(self.switch)  #在hub對象的loop裏面調用當前greenlet的switch回調,開始run方法的運行

代碼還是非常easy。事實上無非就是在parent也就是hub的loop上面註冊了一個回調,而這個回調就是當前這個協程的switch方法,。。那麽等到這個回調被運行的時候,那麽也就是開始這個協程的運行的時候。

。。

這裏我們先不去看hub以及它的loop的實現。

。。就先將其理解為主循環,管理全部的回調。定時,以及I/O事件。

。。


嗯,接下來來看看join方法的實現吧。

熟悉多線程的都知道。在多線程環境下,join方法就是堵塞當前線程。直到join的目的線程返回了為止。

。當然這裏就不是線程了。變成了協程。

。。。

來看看這個join方法的代碼吧:

    #將當前運行環境掛起,知道join的greenlet運行完了
    def join(self, timeout=None):
        """Wait until the greenlet finishes or *timeout* expires.
        Return ``None`` regardless.
        """
        if self.ready(): #假設都已經跑完了,那麽直接返回吧
            return
        else:
            switch = getcurrent().switch  #獲取當前greenlet的switch
            self.rawlink(switch)   #註冊當前環境greenlet的回調,那麽以後在這個要等待的greenlet運行完後。將會回調這個
            try:
                t = Timeout.start_new(timeout)  #創建一個timer對象
                try:
                    result = self.parent.switch()  #停止當前環境greenlet的運行,調度hub運行
                    assert result is self, ‘Invalid switch into Greenlet.join(): %r‘ % (result, )
                finally:
                    t.cancel()  #取消timeout
            except Timeout:
                self.unlink(switch)  #在掛起的回調中去除
                if sys.exc_info()[1] is not t:
                    raise
            except:
                self.unlink(switch)
                raise

首先調用getCurrent方法來獲取當前環境的協程,然後獲取它的switch方法,將其放置到要join的協程的回調隊列裏面,當這個要join的協程執行完了之後,將會調用這些回調。這樣,就能夠恢復當前協程的執行了。。。

在以下我們能夠看到。調用了parent也就是hub的switch方法。切換到hub的運行,這個裏面將會開始要join的協程的運行,這裏並非直接切換到join的協程的運行。。這點須要註意。。

另外,gevent自己的greenlet的定義增加了run方法,也就是每次運行都將會從這裏開始。

。。代碼例如以下:

    #當前greenlet的運行部分,事實上就是調用傳進來的函數。然後運行完了之後再調用那些掛起的回調
    def run(self):
        try:
            if self._start_event is None:
                self._start_event = _dummy_event
            else:
                self._start_event.stop()
            try:
                result = self._run(*self.args, **self.kwargs) #運行傳進來的函數
            except:
                self._report_error(sys.exc_info())
                return
            self._report_result(result)  #這個主要是用於運行掛起的回調
        finally:
            self.__dict__.pop(‘_run‘, None)
            self.__dict__.pop(‘args‘, None)
            self.__dict__.pop(‘kwargs‘, None)

在運行完了之後,將會調用_report_result方法來運行全部掛在這個協程上面的回調函數,這樣對於上面join掛起的回調,就會在這裏得到運行,從而讓join方法返回繼續運行,這樣join方法的實現也就比較的清楚了。。事實上還算是比較簡單的。。。另外對於怎樣運行掛起在這個協程上的回調,比如join的回調,還是比較有講究的。並非馬上在當前協程中運行。而是在hub的loop上掛起一個回調,嗯,代碼例如以下:
    #這個主要是為了在hub的loop中掛起回調,用於運行當前這個greenlet全部掛起的回調
    #這裏也不是馬上運行這些在這個greenlet上面掛起的回調,而是運行繼續掛到loop的回調上面去。這樣能夠讓當前協程盡快返回
    #並且假設就在當前協程運行這些回調會出問題,由於假設回調帶有別的協程的switch方法,那麽switch之後。就再也回不到這個協程繼續運行別的回調了
    #而在loop上面運行這些回調,也就是hub上,運行這些回調,即使切換到別的協程,以後也會遲早回到hub上繼續運行,所以能保證回調能全部運行完。。
    def _report_result(self, result):
        self._exception = None
        self.value = result
        if self._links and not self._notifier:
            self._notifier = self.parent.loop.run_callback(self._notify_links)

至於為什麽這麽大費周章。上面的凝視應該說的非常清楚了吧。。。


好了,接下來再來分析一下sleep的實現,代碼例如以下:

#事實上sleep的主要目的就是將當前的運行切換出去,回到hub的主循環
def sleep(seconds=0, ref=True):
    """Put the current greenlet to sleep for at least *seconds*.

    *seconds* may be specified as an integer, or a float if fractional seconds
    are desired.

    If *ref* is false, the greenlet running sleep() will not prevent gevent.wait()
    from exiting.
    """
    hub = get_hub()  #獲取hub對象
    loop = hub.loop  #獲取hub的loop對象
    if seconds <= 0:  #假設這裏並沒有時間
        waiter = Waiter() #創建waiter對象。主要是為了維護當前greenlet與hub之間的切換
        loop.run_callback(waiter.switch)   #在loop上面掛起一個回調,事實上就是在loop中再恢復當前sleep的greenlet的運行
        waiter.get()  #在這個裏面最基本的功能就是記錄當前的greenlet對象。然後將棧切換到hub上面運行
    else:
        hub.wait(loop.timer(seconds, ref=ref))  #帶定時的wait

事實上這裏分為了兩種種類,就是在sleep的時候傳入的超時時間。小於等於0的以及大於0的。。。

對於sleep操作,假設是在多線程的環境裏,比如java的sleep,事實上就是堵塞當前的線程。這樣子jvm會調度別的線程的執行,而對於gevent,事實上很多其它的是能夠理解為當前協程主動的放棄CPU資源,等到以後再執行。

首先來看看對於超時小於等於零的。事實上原理非常easy,就是進行switch,切換到hub協程的運行,而且在hub的loop上面註冊一個回調。用於切換回到當前協程的運行。。。


這裏有一點須要的註意的就是。並沒有直接在代碼中體現switch的操作,而是多了一個waiter對象。。。然後在loop上面註冊的回調是waiter的switch方法,然後調用了waiter對象的get方法。。。


這裏看gevent的凝視才知道。waiter對象能夠理解為gevent封裝的協程之間的協作工具,詳細的協程之間的切換都由waiter來做。避免讓用戶自己的代碼涉及到switch操作。由於這樣子非常easy出錯。。

。我們來看看waiter的定義吧:

#事實上這個對象僅僅是為了維護用戶greenlet與hub之間的切換關系
#將會在hub裏面註冊當前waiter對象的switch方法作為回調,然後在hub的loop裏面將會運行這個回調
class Waiter(object):
    """A low level communication utility for greenlets.

    Wrapper around greenlet‘s ``switch()`` and ``throw()`` calls that makes them somewhat safer:

    * switching will occur only if the waiting greenlet is executing :meth:`get` method currently;
    * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
    * if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter`
      will store the value/exception. The following :meth:`get` will return the value/raise the exception.

    The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
    The :meth:`get` method must be called from a greenlet other than :class:`Hub`.

        >>> result = Waiter()
        >>> timer = get_hub().loop.timer(0.1)
        >>> timer.start(result.switch, ‘hello from Waiter‘)
        >>> result.get() # blocks for 0.1 seconds
        ‘hello from Waiter‘

    If switch is called before the greenlet gets a chance to call :meth:`get` then
    :class:`Waiter` stores the value.

        >>> result = Waiter()
        >>> timer = get_hub().loop.timer(0.1)
        >>> timer.start(result.switch, ‘hi from Waiter‘)
        >>> sleep(0.2)
        >>> result.get() # returns immediatelly without blocking
        ‘hi from Waiter‘

    .. warning::

        This a limited and dangerous way to communicate between greenlets. It can easily
        leave a greenlet unscheduled forever if used incorrectly. Consider using safer
        :class:`Event`/:class:`AsyncResult`/:class:`Queue` classes.
    """

    __slots__ = [‘hub‘, ‘greenlet‘, ‘value‘, ‘_exception‘]

    def __init__(self, hub=None):
        if hub is None:
            self.hub = get_hub()  #獲取頂層hub對象
        else:
            self.hub = hub
        self.greenlet = None
        self.value = None
        self._exception = _NONE

    def clear(self):  
        self.greenlet = None
        self.value = None
        self._exception = _NONE

    def __str__(self):
        if self._exception is _NONE:
            return ‘<%s greenlet=%s>‘ % (type(self).__name__, self.greenlet)
        elif self._exception is None:
            return ‘<%s greenlet=%s value=%r>‘ % (type(self).__name__, self.greenlet, self.value)
        else:
            return ‘<%s greenlet=%s exc_info=%r>‘ % (type(self).__name__, self.greenlet, self.exc_info)

    def ready(self):
        """Return true if and only if it holds a value or an exception"""
        return self._exception is not _NONE

    def successful(self):
        """Return true if and only if it is ready and holds a value"""
        return self._exception is None

    @property
    def exc_info(self):
        "Holds the exception info passed to :meth:`throw` if :meth:`throw` was called. Otherwise ``None``."
        if self._exception is not _NONE:
            return self._exception

    #調度greenlet的運行,這種方法僅僅能在hub的loop裏面運行
    def switch(self, value=None):
        """Switch to the greenlet if one‘s available. Otherwise store the value."""
        greenlet = self.greenlet
        if greenlet is None:
            self.value = value
            self._exception = None
        else:
            #僅僅能在hub裏面調用waiter的switch方法
            assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
            switch = greenlet.switch
            try:
                switch(value)   #恢復記錄的greenlet的運行
            except:
                self.hub.handle_error(switch, *sys.exc_info())

    def switch_args(self, *args):
        return self.switch(args)

    def throw(self, *throw_args):
        """Switch to the greenlet with the exception. If there‘s no greenlet, store the exception."""
        greenlet = self.greenlet
        if greenlet is None:
            self._exception = throw_args
        else:
            assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
            throw = greenlet.throw
            try:
                throw(*throw_args)
            except:
                self.hub.handle_error(throw, *sys.exc_info())

    #這個的最基本的作用就是記錄要等待的greenlet
    def get(self):
        """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
        if self._exception is not _NONE:
            if self._exception is None:
                return self.value
            else:
                getcurrent().throw(*self._exception)
        else:
            assert self.greenlet is None, ‘This Waiter is already used by %r‘ % (self.greenlet, )
            self.greenlet = getcurrent()  #記錄當前的greenlet對象。在hub的loop裏面將會調用當前waiter的switch回調,將會恢復這個greenlet的運行
            try:
                return self.hub.switch()  #切換到hub上面去運行,那麽原來那個greenlet的運行到這裏就臨時中斷了,待會switch會這裏繼續運行
            finally:
                self.greenlet = None

    def __call__(self, source):
        if source.exception is None:
            self.switch(source.value)
        else:
            self.throw(source.exception)

    # can also have a debugging version, that wraps the value in a tuple (self, value) in switch()
    # and unwraps it in wait() thus checking that switch() was indeed called

這個代碼應非常好理解,並且凝視都說的非常清楚。

。。

比較重要的就是get方法,這種方法將會保存當前運行的協程,然後切換到hub的運行,對於switch方法,將會切換回剛開始的協程的運行。。


好了,上面介紹了sleep不帶超時的實現。

接下來來看看帶超時的實現:

hub.wait(loop.timer(seconds, ref=ref))  #帶定時的wait

這裏首先創建了一個timer對象。這個能夠理解為在loop上面註冊了一個超時,接著看代碼:

#用於在loop上面註冊watcher並等待
    def wait(self, watcher):
        waiter = Waiter() #首先創建一個waiter對象
        unique = object() 
        watcher.start(waiter.switch, unique) #當watcher超時的時候將會調用waiter的switch方法
        try:
            result = waiter.get() #調用waiter的get方法,主要是讓將當前調用sleep的greenlet切換出去。然後切換到hub的執行
            assert result is unique, ‘Invalid switch into %s: %r (expected %r)‘ % (getcurrent(), result, unique)
        finally:
            watcher.stop()

依舊是創建waiter對象,以及它的get方法,只是這裏要註意的是,將waiter的switch回調是註冊到剛剛創建的timer對象上的,而不是直接註冊到loop上面。這樣待會timer超時的時候將會調用回調。恢復sleep的協程的運行。。


好了。這裏gevent的大體上協程,以及切換關系都幾乎相同了。

。。

Gevent的協程實現原理