1. 程式人生 > >Python系列之迴圈定時器

Python系列之迴圈定時器

近期在學習並使用Python開發一些小工具,在這裡記錄方便回憶,也與各位開始走上這條路的朋友共勉,如有不正確希望指正,謝謝!

開始使用定時器時,度娘了下有沒好的例子,本人比較懶,希望能直接使用。確實找到了一些,但是大多隻是很直白的程式碼,自己打算整理一下。

我選用了threading模組中的定時器,使用執行緒的優勢就是可以不干擾現有程序的正常執行。首先我們看下原始碼:

很簡單的封裝加上對執行緒的繼承,函式也就是執行和取消,並有案例說明

# The timer class was contributed by Itamar Shtull-Trauring

def Timer(*args, **kwargs):
    """Factory function to create a Timer object.

    Timers call a function after a specified number of seconds:

        t = Timer(30.0, f, args=[], kwargs={})
        t.start()
        t.cancel()     # stop the timer's action if it's still waiting

    """
    return _Timer(*args, **kwargs)

class _Timer(Thread):
    """Call a function after a specified number of seconds:

            t = Timer(30.0, f, args=[], kwargs={})
            t.start()
            t.cancel()     # stop the timer's action if it's still waiting

    """

    def __init__(self, interval, function, args=[], kwargs={}):
        Thread.__init__(self)
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.finished = Event()

    def cancel(self):
        """Stop the timer if it hasn't finished yet"""
        self.finished.set()

    def run(self):
        self.finished.wait(self.interval)
        if not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
        self.finished.set()

在run函式中 我們不免可以看出,當啟動定時器後,一直在等待,然後進行判斷set狀態,是否執行使用者函式,那麼所謂的cancel也不是立即取消,而是簡單的置狀態。判斷後也是運行了與cancel一樣的程式碼。所以我們看出這裡的定時器是一次性的定時器,而我們需要迴圈定時器。

現有使用這種定時器進行迴圈執行的思路是啟用2個定時器,進行相互呼叫。但是是不是邏輯和使用太複雜呢?

那麼我們使用更簡單的,直接繼承timer修改下run函式即可:

class LoopTimer(_Timer):  
    """Call a function after a specified number of seconds:


            t = LoopTimer(30.0, f, args=[], kwargs={})
            t.start()
            t.cancel()     # stop the timer's action if it's still waiting


    """
    def __init__(self, interval, function, args=[], kwargs={}):
        _Timer.__init__(self,interval, function, args, kwargs)


    def run(self):
        '''self.finished.wait(self.interval)
        if not self.finished.is_set():
            self.function(*self.args, **self.kwargs)
        self.finished.set()'''
        while True:
            self.finished.wait(self.interval)
            if self.finished.is_set():
                self.finished.set()
                break
            self.function(*self.args, **self.kwargs)  

那麼我們寫了簡單的例子測試下
def testlooptimer():  
    print("I am loop timer.")     
      
t = LoopTimer(2,testlooptimer)  
t.start()

執行結果:
I am loop timer.
I am loop timer.
I am loop timer.
I am loop timer.