1. 程式人生 > >基於Lambda架構的股票市場事件處理引擎實踐

基於Lambda架構的股票市場事件處理引擎實踐

CEP(Complex Event Processing)是證券行業很多業務應用的重要支撐技術。CEP的概念本身並不新鮮,相關技術已經被運用超過15年以上,但是證券界肯定是運用CEP技術最為充分、最為前沿的行業之一,從演算法交易(algorithmic trading)、風險管理(risk management)、關鍵時刻管理(Moment of Truth – MOT)、委託與流動性分析(order and liquidity analysis)到量化交易(quantitative trading)乃至向投資者推送投資訊號(signal generation)等等,不一而足。

CEP技術通常與Time-series Database(時序資料庫)結合,最理想的解決方案是CEP技術平臺嚮應用提供一個歷史序列(historical time-series)與實時序列(real-time series)無差異融合的資料流連續體(continuum)- 對於證券類應用而言,昨天、上週、上個月的資料不過是當下此刻資料的延續,而處理演算法卻是無邊際的 – 只要開發者能構想出場景與模型。

廣發證券的IT研發團隊,一直關注Storm、Spark、Flink等流式計算的開源技術,也經歷了傳統Lambda架構的技術演進,在Kappa架構的技術尚未成熟之際,團隊針對證券行業的技術現狀與特點,採用改良的Lambda架構實現了一個CEP引擎,本文介紹了此引擎的架構並分享了一些股票業務較為有趣的應用場景,以饗同好。

隨著移動互聯和物聯網的到來,大資料迎來了高速和蓬勃發展時期。一方面,移動互聯和物聯網產生的大量資料為孕育大資料技術提供了肥沃的土壤;一方面,各個公司為了應對大資料量的挑戰,也急切的需要大資料技術解決生產實踐中的問題。短時間內各種技術層出不窮,在這個過程中Hadoop脫穎而出,並營造了一個豐富的生態圈。雖然大資料一提起Hadoop,好像有點老生常談,甚至覺得這個技術已經過時了,但是不能否認的是Hadoop的出現確實有非凡的意義。不管是它分散式處理資料的理念,還是高可用、容錯的處理都值得好好借鑑和學習。

剛開始,大家可能都被各種分散式技術、思想所吸引,一頭栽進去,掉進了技術的漩渦,不能自拔。一方面大資料處理技術和系統確實複雜、繁瑣;另一方面大資料生態不斷的推陳出新,新技術和新理念層出不窮,確實讓人目不暇接。如果想要把生態圈中各個元件玩精通確實不是件容易的事情。本人一開始也是深陷其中,皓首窮經不能自拔。但騰出時間,整理心緒,回頭反顧,突然有種釋然之感。大資料並沒有大家想象的那麼神祕莫測與複雜,從技術角度看無非是解決大資料量的採集、計算、展示的問題。

因此本文參考Lambda/Kappa架構理念,提出了一種有行業針對性的實現方法。儘量讓系統層面更簡單,技術更同構,初衷在讓大家聚焦在大資料業務應用上來,從而真正讓大資料發揮它應有的價值。

1、 背景

Lambda架構是由Storm的作者Nathan Marz 在BackType和Twitter多年進行分散式大資料系統的經驗總結提煉而成,用數學表示式可以表示如下:

batch view = function(all data)
realtime view = function(realtime view,new data)
query = function(batch view .realtime view)

邏輯架構圖如下:

Lambda架構

從圖上可以看出,Lambda架構主要分為三層:批處理層,加速層和服務層。它整合了離線計算和實時計算,融合了不可變性(immutable),讀寫分離和複雜性隔離等一系列架構原則設計而成,是一個滿足大資料系統關鍵特性的架構。Nathan Marz認為大資料系統應該具有以下八個特性,Lambda都具備它們分別是:

  • Robustness and fault tolerance(魯棒性和容錯性)
  • Low latency reads and updates(讀和更新低延時)
  • Scalability(可伸縮)
  • Generalization(通用性)
  • Extensibility(可擴充套件)
  • Ad hoc queries(可即席查詢)
  • Minimal maintenance(易運維)
  • Debuggability(可除錯)

由於Lambda架構的資料是不可變的(immutable),因此帶來的好處也是顯而易見的:

  • Human-fault tolerance(對人為的容錯性):資料流水被按時序記錄下來,而且資料只寫一次,不做更改,而不像RDBMS只是保留最後的狀態。因此不會丟失資料資訊。即使平臺升級或者計算程式中不小心出現Bug,修復Bug後重新計算就好。強調了資料的重新計算問題,這個特性對一個生產的資料平臺來說是十分重要的。
  • Simplicity(簡易性):可變的資料模型一般要求資料能必須被索引,以便於資料可被再次被檢索到和可以被更新。但是不變的資料模型相對來說就很簡單了,只是一味的追加新資料即可。大大簡化了系統的複雜度。

但是Lambda也有自身的侷限性,舉個例子:在大資料量的情況下,要即席查詢過去24小時某個網站的pv數。根據前面的數學表示式,Lambda架構需要實現三部分程式,一部分程式是批處理程式,比如可能用Hive或者MapReduce批量計算最近23.5個小時pv數,一部分程式是Storm或Spark Streaming流式計算程式,計算0.5個小時內的pv數,然後還需要一個服務程式將這兩部分結果進行合併,返回最終結果。因此Lambda架構包含固有的開發和運維的複雜性。

因為以上的缺陷,Linkedin的Jay Kreps在2014年7月2日在O’reilly《Questioning the Lambda Architecture》提出了Kappa架構,如下圖:

Lambda架構

Kappa在Lambda做的最大的改進是用同一套實時計算框架代替了Lambda的批處理層,這樣做的好處是一套程式碼或者一套技術棧可以解決一個問題。它的做法是這樣的:

  1. 用Kafka做持久層,根據需求需要,用Kafka保留需要重新計算的歷史資料長度,比如計算的時候可能用30天的資料,那就配置Kafka的值,讓它保留最近30天的資料。
  2. 當你程式因為升級或者修復了缺陷,需要重新計算的時候,就再啟一個流式計算程式,從你所需的Offset開始計算,並將結果輸入到一個新的表裡。
  3. 當這個流式計算程式追平第一個程式的時候,將應用切換到第二個程式的輸出上。
  4. 停止第一個程式,刪除第一個程式的輸出結果表。

這樣相當於用同一套計算框架和程式碼解決了Lambda架構中開發和運維比較複雜的問題。當然如果資料量很大的情況下,可以增加流式計算程式的併發度來解決速度的問題。

2、 廣發證券Lambda架構的實現

由於金融行業在業務上受限於T+1交易,在技術上嚴重依賴關係型資料庫(特別是Oracle)。在很多場景下,資料並不是以流的形式存在的,而且資料的更新頻率也並不是很實時。比如為了做技術面分析的行情資料,大多數只是使用收盤價和歷史收盤價(快照資料)作為輸入,來計算各類指標,產生買賣點訊號。

因此這是一個典型的批處理的場景。另一方面,比如量化交易場景,很多實時的訊號又是稍縱即逝,只有夠實時才存在套利的空間,而且回測和實盤模擬又是典型的流處理。鑑於以上金融行業特有的場景,我們實現了我們自己的架構(GF-Lambda),它介於Lambda和Kappa之間。一方面能夠滿足我們處理資料的需求;一方面又可以達到技術上的同構,減少開發運維成本。根據對資料實時性要求,將整個計算部分分為三類:

  • Spark SQL:代替MapReduce或者Hive的功能,實現資料的批量預處理;
  • Spark Streaming:近實時高吞吐的mini batching資料處理功能;
  • Storm:完成實時的流式資料處理;

GF-Lambda的優勢如下:

  • 在PipeLine的驅動方面,採用Airbnb開源的Airflow,Airflow使用指令碼語言來實現整個PipeLine的定義,而且任務例項也是動態生成的;相比Oozie和Azkaban採用標記語言來完成PipeLine的定義,Airflow的優勢是顯而易見的,例如:

    整個data flow採用指令碼編寫,便於配置管理和升級。而Oozie只能使用XML定義,升級遷移成本較大。

    觸發方式靈活,整個PipeLine可以動態生成,切實的做到了“analytics as a service”或者 “analysis automation”。

  • 另外一個與Lambda或者Kappa最大的不同之處是我們採用了Redis作為快取來儲存各個計算服務的狀態;雖然Spark和Storm都有Checkpoint機制,但是CheckPoint會影響到程式複雜度和效能,並且以上兩種技術的CheckPoint機制並不是很完善。通過Redis和Kafka的Offset機制,不僅可以做到無狀態的計算服務,而且即使升級或者系統故障,資料的可用性也不會受到影響。
  • 整個batch layer採用Spark SQL,使用Spark SQL的好處是能做到密集計算的後移。由於歷史原因,券商Oracle等關係型資料庫使用比較多,而且在開市期間資料庫壓力也比較大,此處的Spark SQL只是不斷的從Oracle批量載入資料(除了Filter基本在Oracle上做任何計算)或者主動的通過Oracle日誌旁錄資料,對資料庫壓力較小,同時又能達到資料準實時性的要求;另外所有的計算都後置到Yarn叢集上進行,不僅利於程式的運維,也利於資源的有效管控和伸縮。架構實現如下圖所示:

Lambda架構

3、 應用場景

CEP在證券市場的應用的有非常多,為了讀者更好的理解上述技術架構的設計,在此介紹幾個典型應用場景。

1)自選股到價和漲跌幅提醒

自選股到價和漲跌幅提醒是股票交易軟體的一個基礎伺服器,目的在於方便使用者簡單、及時的盯盤。其中我們使用MongoDB來儲存使用者的個性化設定資訊,以便各類應用可以靈活的定製自身的Schema。在功能上主要包括以下幾種:

  • 股價高於設定值提醒。
  • 股價低於設定值提醒。
  • 漲幅高於設定值提醒。
  • 一分鐘、五分鐘漲幅高於設定值提醒。
  • 跌幅高於設定值提醒。
  • 一分鐘、五分鐘跌幅高於設定值提醒。

主要的挑戰在於大資料量的實時計算,而採用GF-Lambda可以輕鬆解決這個問題。資料處理流程如下:

Lambda架構

首先從Kafka訂閱實時行情資料並進行解析,轉化成RDD物件,然後再衍生出Key(market+stockCode),同時從Mongo增量載入使用者自選股預警設定資料,然後將這兩份資料進行一個Join,再分片對同一個Key的兩個物件做一個Filter,產生出預警資訊,並進行各個終端渠道推送。

2)自選股實時資訊

實時資訊對各類交易使用者來說是非常重要的,特別是和自身嚴重相關的自選股實時資訊。一個公告、重大事項或者關鍵新聞的出現可能會影響到使用者的投資回報,因此這類事件越實時,對使用者來說價值就越大。

在GF-Lambda平臺上,自選股實時資訊主要分為兩部分:實時資訊的採集及預處理(適配)、資訊資訊與使用者資訊的撮合。整個處理流程如下圖所示:

Lambda架構

在上圖分割線左側是實時資訊的預處理部分,首先使用Spark JDBC介面從Oracle資料庫載入資料到Spark,形成DataFrame,再使用Spark SQL的高階API做資料的預處理(此處主要做表之間的關聯和過濾),最後將每個Partition上的資料轉化成協議要求的格式,寫入Kafka中等待下游消費。

左側資料ETL的過程是完全由Airflow來進行驅動排程的,而且每次處理完就將狀態cache到Redis中,以便下次增量處理。在上圖的右側則是與使用者強相關的業務邏輯,將使用者配置的資訊與實時資訊資訊進行撮合匹配,根據使用者設定的偏好來產生推送事件。

此處用Kafka來做資料間的解耦,好處是不言而喻的。首先是保證了訊息之間的靈活性,因為左側部分產生的事件是一個基礎公共事件,而右側才是一個與業務緊密耦合的邏輯事件。基礎公共事件只有事件的基礎屬性,是可以被很多業務同時訂閱使用的。

其次從技術角度講左側是一個類似批處理的過程,而右側是一個流處理的過程,中間通過Kafka做一個轉換與對接。這個應用其實是很具有代表性的,因為在大部分情況下,資料來源並不是以流的形式存在,更新的頻率也並不是那麼實時,所以大多數情況下都會涉及到batch layer與speed layer之間的轉換對接。

3)資金流選股策略

上面兩個應用相對來說處理流程比較簡單,以下這個case是一個業務
稍微繁瑣的CEP應用-資金流策略交易模型,該模型使用資金流流向來判斷股票在未來一段時間的漲跌情況。它基於這樣一個假設,如果是資金流入的股票,則股價在未來一段時間上漲是大概率事件;如果是資金流出的股票,則股價在未來一段時間下跌是大概率事件。那麼我們可以基於這個假設來構建我們的策略交易模型。如下圖所示,這個模型主要分為三部分:

Lambda架構

1)個股資金流指標的實時計算

由於涉及到一些業務術語,這裡先做一個簡單的介紹。

資金流是一種反映股票供求關係的指標,它的定義如下:證券價格在約定的時間段中處於上升狀態時產生的成交額是推動指數上漲的力量,這部分成交額被定義為資金流入;證券價格在約定的時間段中下跌時的成交額是推動指數下跌的力量,這部分成交額被定義為資金流出;若證券價格在約定的時間段前後沒有發生變化,則這段時間中的成交額不計入資金流量。當天資金流入和流出的差額可以認為是該證券當天買賣兩種力量相抵之後,推動價格變化的淨作用量,被定義為當天資金淨流量。數量化定義如下:

Lambda架構

其中,Volume為成交量,為i時刻收盤價,為上一時刻收盤價。

嚴格意義上講,每一個買單必須有一個相應的賣單,因此真實的資金流入無法準確的計算,只能通過其他替代方法來區分資金的流入和流出,通過高頻資料,將每筆交易按照驅動股價上漲和下跌的差異,確定為資金的流入或流出,最終匯聚成一天的資金流淨額資料。根據業界開發的CMSMF指標,採用高頻實時資料進行資金流測算,主要出於以下兩方面考慮:一是採用高頻資料進行測算,可以儘可能反映真實的市場資訊;二是採取報價(最近買價、賣價)作為比較基準,成交價大於等於上期最優賣價視為流入,成交價小於等於上期最優買價視為流出。

除了資金的流入、流出、淨額,還有一系列衍生指標,比如根據流通股本數多少衍生出的大、中、小單流入、流出、淨額,及資金流資訊含量(IC)、資金流強度(MFP),資金流槓桿倍數(MFP),在這裡就不一一介紹。

從技術角度講,第一部分我們通過訂閱實時行情資訊,開始計算當天從開市到各個時刻點的資金流入、流出的累計值,及衍生指標,並將這些指標計算完成後重新寫回到Kafka進行儲存,方便下游消費。因此第一部分完全是一個大資料量的實時流處理應用,屬於Lambda的speed layer。

2)買賣訊號量的產生及交易

第二部分在業務上屬於模型層,即根據當前實時資金流指標資訊,構建自己的策略模型,輸出買賣訊號。比如以一個簡單的策略模型為例,如果同時滿足以下三個條件產生買的訊號。反之,產生賣的訊號:

  • (大單資金流入-大單資金流出>0) && (中單資金流入-中單資金流出>0)
  • 大單的資金流資訊含量>50%
  • 大單的資金流強度>20%

在技術上,這個應用也屬於Lambda的 speed layer,通過訂閱Kafka中的資金流指標,根據上面簡單的模型,不斷的判斷是否要買或者賣,並呼叫介面發起買賣委託指令,最後根據回報結果操作持倉表或者成交表。(注意此處在業務上只是以簡單的模型舉例,沒有涉及到更多的細節)

3)持倉盈虧實時追蹤及交易

第三部分在業務上主要是準實時的盈虧計算。在技術層面,屬於Lambda 的batch layer。通過訂閱實時行情和載入持倉表/成交表,實時計算使用者的盈虧情況。當然此處還有一些簡單的止損策略,也可以根據盈利情況,發起賣委託指令,並操作持倉表和成交表。最後將盈利情況報給服務層,進行展示或者提供回撥介面。詳細的處理流程如下圖所示:

Lambda架構

總結

正如文章前面強調的一樣,寫這篇文章的初衷是希望大家從大資料豐富的生態中解放出來,與業務深度的跨界融合,從而開發出更多具有價值的大資料應用,真正發揮大資料應有的價值。這和Lambda架構的作者Nathan Marz的理念也是十分吻合的,記得他還在BackType工作的時候,他們的團隊才五個人,卻開發了一個社會化媒體分析產品——在100TB的資料上提供各種豐富的實時分析,同時這個小的團隊還負責上百臺機器的叢集的部署、運維和監控。

當他向別人展示產品的時候,很多人都很震驚他們只有五個人。經常有人問他:“How can so few people do so much?”。他的回答是:“It’s not what we’re doing, but what we’re not doing.”通過使用Lambda架構,他們避免了傳統大資料架構的複雜性,從而產出變得非常顯著。

在五花八門的大資料技術層出不窮的當下,Marz的理念更加重要。我們一方面需要與時俱進關注最新的技術進步 – 因為新技術的出現可能反過來讓以前沒有考慮過或者不敢想的應用場景變成可能,但另一方面更重要的是,大資料技術的合理運用需要建立在對行業領域知識深刻理解的基礎上。大資料是金融科技的核心支撐技術之一,我們將持續關注最前沿的大資料技術與架構理念,持續優化最符合金融行業特點的解決方案,構建能放飛業務專家專業創新能力的技術平臺。

作者介紹

鄧昌甫,畢業於中山大學,廣發證券IT研發工程師,一直從事大資料平臺的架構、及大資料應用的開發、運維和敏捷相關工具的引入和最佳實踐的推廣(Git/Jenkins/Gerrit/Zenoss)。

文章出處:聊聊架構(訂閱號ID:archtime)