1. 程式人生 > >Python量化交易平臺開發教程系列4-事件驅動引擎原理和使用

Python量化交易平臺開發教程系列4-事件驅動引擎原理和使用

原創文章,轉載請註明出處:用Python的交易員

前言

從這篇開始,後面的教程都會基於Python(終於可以跟C++說再見了)。

經過上一篇複雜繁瑣的API編譯後,我們已經有了一個可以在Python環境中用來收行情和發單的介面,但是儘管作者在Github上也放了簡單的API功能測試程式碼作為介面使用方法的示例,絕大部分讀者應該對於如何用這個介面去開發自己的交易系統毫無頭緒。

類似的情況也常常發生於當我們從萬得、恆生、網上的其他開源專案(比如pyctp)等等拿到開發介面和文件示例後:

看了半天覺得似乎上面講的都懂。

enter image description here

但要寫個自己的系統依舊不知道從何處下手。

enter image description here

所以在搞定交易介面後,我們開發交易系統的第一步就是要弄清楚系統的工作原理,在讀完這篇教程後,你應該至少不會再對如何寫一個交易系統茫然無措了。

事件驅動

計算機程式分類

所有的計算機程式都可以大致分為兩類:指令碼型(單次執行)和連續執行型(直到使用者主動退出)。

指令碼型

指令碼型的程式包括最早的批處理檔案以及使用Python做交易策略回測等等,這類程式的特點是在使用者啟動後會按照程式設計時設計好的步驟一步步執行,所有步驟執行完後自動退出。

連續執行型

連續執行型的程式包含了作業系統和絕大部分我們日常使用的軟體等等,這類程式啟動後會處於一個無限迴圈中連續執行,直到使用者主動退出時才會結束。

連續執行型程式

我們要開發的交易系統就是屬於連續執行型程式,而這種程式根據其計算邏輯的執行機制不同,又可以粗略的分為時間驅動和事件驅動兩種。

時間驅動

時間驅動的程式邏輯相對容易設計,簡單來說就是讓電腦每隔一段時間自動做一些事情。這個事情本身可以很複雜、包括很多步驟,但這些步驟都是線性的,按照順序一步步執行下來。

以下程式碼展示了一個非常簡單的時間驅動的Python程式。

from time import sleep

def demo():
    print u'時間驅動的程式每隔1秒執行demo函式'

while 1:
    demo()
    sleep(1.0)

時間驅動的程式本質上就是每隔一段時間固定執行一次指令碼(上面程式碼中的demo函式)。儘管指令碼自身可以很長、包含非常多的步驟,但是我們可以看出這種程式的執行機制相對比較簡單、容易理解。

舉一些量化交易相關的例子:

  1. 每隔5分鐘,通過新浪財經網頁的公開API讀取一次滬深300成分股的價格,根據當日漲幅進行排序後輸出到電腦螢幕上。

  2. 每隔1秒鐘,檢查一次最新收到的股指期貨TICK資料,更新K線和其他技術指標,檢查是否滿足趨勢策略的下單條件,若滿足則執行下單。

對速度要求較高的量化交易方面(日內CTA策略、高頻策略等等),時間驅動的程式會存在一個非常大的缺點:對資料資訊在反應操作上的處理延時。例子2中,在每次邏輯指令碼執行完等待的那1秒鐘裡,程式對於接收到的新資料資訊(行情、成交推送等等)是不會做出任何反應的,只有在等待時間結束後腳本再次執行時才會進行相關的計算處理。而處理延時在量化交易中的直接後果就是money:市價單滑點、限價單錯過本可成交的價格。

時間驅動的程式在量化交易方面還存在一些其他的缺點:如浪費CPU的計算資源、實現非同步邏輯複雜度高等等。

事件驅動

與時間驅動對應的就是事件驅動的程式:當某個新的事件被推送到程式中時(如API推送新的行情、成交),程式立即呼叫和這個事件相對應的處理函式進行相關的操作。

上面例子2的事件驅動版:交易程式對股指TICK資料進行監聽,當沒有新的行情過來時,程式保持監聽狀態不進行任何操作;當收到新的資料時,資料處理函式立即更新K線和其他技術指標,並檢查是否滿足趨勢策略的下單條件執行下單。

對於簡單的程式,我們可以採用上面測試程式碼中的方案,直接在API的回撥函式中寫入相應的邏輯。但隨著程式複雜度的增加,這種方案會變得越來越不可行。假設我們有一個帶有圖形介面的量化交易系統,系統在某一時刻接收到了API推送的股指期貨行情資料,針對這個資料系統需要進行如下處理:

  1. 更新圖表上顯示的K線圖形(繪圖)

  2. 更新行情監控表中股指期貨的行情資料(表格更新)

  3. 策略1需要執行一次內部演算法,檢查該資料是否會觸發策略進行下單(運算、下單)

  4. 策略2同樣需要執行一次內部演算法,檢查該資料是否會觸發策略進行下單(運算、下單)

  5. 風控系統需要檢查最新行情價格是否會導致賬戶的整體風險超限,若超限需要進行報警(運算、報警)

此時將上面所有的操作都寫到一個回撥函式中無疑變成了非常差的方案,程式碼過長容易出錯不說,可擴充套件性也差,每新增一個策略或者功能則又需要修改之前的原始碼(有經驗的讀者會知道,經常修改生產程式碼是一種非常危險的運營管理方法)。

為了解決這種情況,我們需要用到事件驅動引擎來管理不同事件的事件監聽函式並執行所有和事件驅動相關的操作。

事件驅動引擎原理

vn.py框架中的vn.event模組包含了一個可擴充套件的事件驅動引擎。整個引擎的實現並不複雜,除去註釋、空行後大概也就100行左右的程式碼:

# encoding: UTF-8

# 系統模組
from Queue import Queue, Empty
from threading import Thread

# 第三方模組
from PyQt4.QtCore import QTimer

# 自己開發的模組
from eventType import *


########################################################################
class EventEngine:
    """
    事件驅動引擎
    事件驅動引擎中所有的變數都設定為了私有,這是為了防止不小心
    從外部修改了這些變數的值或狀態,導致bug。

    變數說明
    __queue:私有變數,事件佇列
    __active:私有變數,事件引擎開關
    __thread:私有變數,事件處理執行緒
    __timer:私有變數,計時器
    __handlers:私有變數,事件處理函式字典


    方法說明
    __run: 私有方法,事件處理執行緒連續執行用
    __process: 私有方法,處理事件,呼叫註冊在引擎中的監聽函式
    __onTimer:私有方法,計時器固定事件間隔觸發後,向事件佇列中存入計時器事件
    start: 公共方法,啟動引擎
    stop:公共方法,停止引擎
    register:公共方法,向引擎中註冊監聽函式
    unregister:公共方法,向引擎中登出監聽函式
    put:公共方法,向事件佇列中存入新的事件

    事件監聽函式必須定義為輸入引數僅為一個event物件,即:

    函式
    def func(event)
        ...

    物件方法
    def method(self, event)
        ...

    """

    #----------------------------------------------------------------------
    def __init__(self):
        """初始化事件引擎"""
        # 事件佇列
        self.__queue = Queue()

        # 事件引擎開關
        self.__active = False

        # 事件處理執行緒
        self.__thread = Thread(target = self.__run)

        # 計時器,用於觸發計時器事件
        self.__timer = QTimer()
        self.__timer.timeout.connect(self.__onTimer)

        # 這裡的__handlers是一個字典,用來儲存對應的事件呼叫關係
        # 其中每個鍵對應的值是一個列表,列表中儲存了對該事件進行監聽的函式功能
        self.__handlers = {}

    #----------------------------------------------------------------------
    def __run(self):
        """引擎執行"""
        while self.__active == True:
            try:
                event = self.__queue.get(block = True, timeout = 1)  # 獲取事件的阻塞時間設為1秒
                self.__process(event)
            except Empty:
                pass

    #----------------------------------------------------------------------
    def __process(self, event):
        """處理事件"""
        # 檢查是否存在對該事件進行監聽的處理函式
        if event.type_ in self.__handlers:
            # 若存在,則按順序將事件傳遞給處理函式執行
            [handler(event) for handler in self.__handlers[event.type_]]

            # 以上語句為Python列表解析方式的寫法,對應的常規迴圈寫法為:
            #for handler in self.__handlers[event.type_]:
                #handler(event)

    #----------------------------------------------------------------------
    def __onTimer(self):
        """向事件佇列中存入計時器事件"""
        # 建立計時器事件
        event = Event(type_=EVENT_TIMER)

        # 向佇列中存入計時器事件
        self.put(event)

    #----------------------------------------------------------------------
    def start(self):
        """引擎啟動"""
        # 將引擎設為啟動
        self.__active = True

        # 啟動事件處理執行緒
        self.__thread.start()

        # 啟動計時器,計時器事件間隔預設設定為1秒
        self.__timer.start(1000)

    #----------------------------------------------------------------------
    def stop(self):
        """停止引擎"""
        # 將引擎設為停止
        self.__active = False

        # 停止計時器
        self.__timer.stop()

        # 等待事件處理執行緒退出
        self.__thread.join()

    #----------------------------------------------------------------------
    def register(self, type_, handler):
        """註冊事件處理函式監聽"""
        # 嘗試獲取該事件型別對應的處理函式列表,若無則建立
        try:
            handlerList = self.__handlers[type_]
        except KeyError:
            handlerList = []
            self.__handlers[type_] = handlerList

        # 若要註冊的處理器不在該事件的處理器列表中,則註冊該事件
        if handler not in handlerList:
            handlerList.append(handler)

    #----------------------------------------------------------------------
    def unregister(self, type_, handler):
        """登出事件處理函式監聽"""
        # 嘗試獲取該事件型別對應的處理函式列表,若無則忽略該次登出請求
        try:
            handlerList = self.handlers[type_]

            # 如果該函式存在於列表中,則移除
            if handler in handlerList:
                handlerList.remove(handler)

            # 如果函式列表為空,則從引擎中移除該事件型別
            if not handlerList:
                del self.handlers[type_]
        except KeyError:
            pass

    #----------------------------------------------------------------------
    def put(self, event):
        """向事件佇列中存入事件"""
        self.__queue.put(event)

初始化

當事件驅動引擎物件被建立時,初始化函式__init__會建立以下私有變數:

  • __queue:用來儲存事件的佇列

  • __active:用來控制引擎啟動、停止的開關

  • __thread:負責處理事件、執行具體操作的執行緒

  • __timer:用來每隔一段時間觸發定時事件的計時器

  • __handlers:用來儲存不同型別事件所對應的事件處理函式的字典

註冊事件處理函式

引擎提供了register方法,用來向引擎註冊事件處理函式的監聽,傳入引數為

  1. type_:表示事件型別的常量字串,由使用者自行定義,注意不同事件型別間不能重複

  2. handler:當該型別的事件被觸發時,使用者希望進行相應操作的事件處理函式,函式的定義方法參考程式碼中的註釋

當用戶呼叫register方法註冊事件處理函式時,引擎會嘗試獲取__handlers字典中該事件型別所對應的處理函式列表(若無則建立一個空列表),並向這個列表中新增該事件處理函式。使用了Python的列表物件,使用者可以很容易的控制同一個事件型別下多個事件處理函式的工作順序,因此對某些涉及多步操作的複雜演算法可以保證按照正確的順序執行,這點是相比於某些系統0訊息機制(如Qt的Signal/Slot)最大的優勢。

如當標的物行情發生變化時,期權高頻套利演算法需要執行以下操作:

  1. 使用定價引擎先計算新的期權理論價、希臘值

  2. 使用風控引擎對當前持倉的風險度彙總,並計算報價的中間價

  3. 使用套利引擎基於預先設定的價差、下單手數等引數,計算具體價格併發單

以上三步操作,只需在交易系統啟動時按順序註冊監聽到標的物行情事件上,就可以保證操作順序的正確。

和register對應的是unregister方法,用於登出事件處理函式的監聽,傳入引數相同,具體原理請參照原始碼。在實際應用中,使用者可以動態的組合使用register和unregister方法,只在需要監聽某些事件的時候監聽,完成後取消監聽,從而節省CPU資源。

這裡讓筆者吐槽一下某些國內的C++平臺(當然不是指所有的),每個策略對系統裡所有的訂單回報進行監聽,如果是自身相關的就處理,不相關的就PASS。這種寫法,光是判斷是否和自身相關就得多做多少無謂的判斷、浪費多少CPU資源,隨著策略數量的增加,浪費呈線性增加的趨勢,這種平臺還叫囂做高頻,唉......

觸發事件

使用者可以通過引擎的put方法向事件佇列__queue中存入事件,等待事件處理執行緒來進行處理,事件類的實現如下:

########################################################################
class Event:
    """事件物件"""

    #----------------------------------------------------------------------
    def __init__(self, type_=None):
        """Constructor"""
        self.type_ = type_      # 事件型別
        self.dict_ = {}         # 字典用於儲存具體的事件資料

物件建立時使用者可以選擇傳入事件型別字串type_作為引數。dict_字典用於儲存具體事件相關的資料資訊,以供事件處理函式進行操作。

事件處理執行緒的連續執行

事件引擎的事件處理執行緒__thread中執行連續執行工作的函式為__run:當事件引擎的開關__active沒有被關閉時,引擎嘗試從事件佇列中讀取最新的事件,若讀取成功則立即呼叫__process函式處理該事件,若無法讀取(佇列為空)則進入阻塞狀態節省CPU資源,當阻塞時間(預設為1秒)結束時再次進入以上迴圈。

__process函式工作時,首先檢查事件物件的事件型別在__handlers字典中是否存在,若存在(說明有事件處理函式在監聽該事件)則按照註冊順序呼叫監聽函式列表中的事件處理函式進行相關操作。

計時器

事件引擎中的__timer是一個PyQt中的QTimer物件,提供的功能非常簡單:每隔一段時間(由使用者設定)自動執行函式__onTimer。__onTimer函式會建立一個型別為EVENT_TIMER(在eventType.py檔案中定義)的事件物件,並呼叫引擎的put方法存入到事件佇列中。

敏感的讀者可能已經意識到了,這個計時器本質上是一個由時間驅動的功能。儘管我們在前文中提到了事件驅動在量化交易平臺開發中的重要性,但不可否認某些交易功能的實現必須基於時間驅動,例如:下單後若2秒不成交則立即撤單、每隔5分鐘將當日的成交記錄儲存到資料庫中等。這類功能在實現時就可以選擇使用事件處理函式對EVENT_TIMER型別的計時器事件進行監聽(參考下一章節“事件驅動引擎使用”中的示例)。

啟動、停止

使用者可以通過start和stop兩個方法來啟動和停止事件驅動引擎,原理很簡單讀者可以直接參考原始碼。

當啟動計時器時,事件間隔預設設定為了1秒(1000毫秒),這個引數使用者可以視乎自己的需求進行調整。假設使用者使用時間驅動的函式工作間隔為分鐘級,則可以選擇將引數設定為60秒(600000毫秒),以此類推。

事件驅動引擎使用

同樣在eventEngine.py中,包含了一段測試程式碼test函式,用來展示事件驅動引擎的使用方法:

#----------------------------------------------------------------------
def test():
    """測試函式"""
    import sys
    from datetime import datetime
    from PyQt4.QtCore import QCoreApplication

    def simpletest(event):
        print u'處理每秒觸發的計時器事件:%s' % str(datetime.now())

    app = QCoreApplication(sys.argv)

    ee = EventEngine()
    ee.register(EVENT_TIMER, simpletest)
    ee.start()

    app.exec_()


# 直接執行指令碼可以進行測試
if __name__ == '__main__':
    test()

test函式整體上包含了這幾步:

  1. 匯入相關的包(sys、datetime、PyQt4),注意由於EventEngine的實現中使用了PyQt4的QTimer類,因此整個程式的執行必須包含在Qt事件迴圈中,即使用QCoreApplication(或者PyQt4.QtGui中的QApplication)的exec_()方法在程式主執行緒中啟動事件迴圈。

  2. 定義一個簡單的函式simpletest,該函式包含一個輸入引數event物件,函式被呼叫後會列印一段字元以及當前的時間

  3. 建立QCoreApplication物件app

  4. 建立事件驅動引擎EventEngine物件ee

  5. 向引擎中註冊simpletest函式對定時器事件EVENT_TIMER的監聽

  6. 啟動事件驅動引擎

  7. 啟動Qt事件迴圈

整體上看,當用戶開發自己的程式時,需要修改的只是第2步和第5步:建立自己的事件處理函式並將這些函式註冊到相應的事件型別上進行監聽。

總結

有了API介面和事件驅動引擎,接下來我們可以開始開發自己的平臺了,後面的幾篇教程將會一步步展示一個簡單的LTS交易平臺的開發過程。