1. 程式人生 > >分散式流式計算平臺-S4

分散式流式計算平臺-S4

本文是作者在充分閱讀和理解Yahoo!最新發布的技術論文《S4:Distributed Stream Computing Platform》的基礎上,所做出的知識分享。

S4是Yahoo!在2010年10月開源的一套通用、分散式、可擴充套件、部分容錯、具備可插拔功能的平臺。這套平臺主要是為了方便開發者開發處理流式資料(continuous unbounded streams of data)的應用。專案官方網站為:http://s4.io/。同時,S4的開發者也發表了一篇技術論文《S4:Distributed Stream Computing Platform》來介紹S4的設計。下面我們就來學習這篇論文。

開發動機

“We designed this engine to solve real-world problems in the context of search applications that use data mining and machine learning algorithms.” … “To process user feedback, we developed S4, a low latency, scalable stream processing engine.”

Yahoo!之所以開發S4系統,主要是為了解決它現實的問題:搜尋廣告的展現。搜尋廣告是當前各大搜索引擎的主要收入來源,使用者發出查詢請求,搜尋引擎在返回正常結果的同時也會返回相關廣告,而廣告是按照點選付費。為了在最好的位置,放置最相關(也就是使用者最有可能點選)的廣告,各大搜索引擎使用了大量的資料探勘和機器學習演算法來進行相關性計算,以便提高收入,滿足使用者需求。其中很重要的一點就是要不斷分析使用者的點選反饋,以便捕獲使用者的行為。S4最初主要還只是用來處理使用者的點選反饋。

“The streaming paradigm dictates a very different architecture than the one used in batch processing. Attempting to build a general- purpose platform for both batch and stream computing would result in a highly complex system that may end up not being optimal for either task.”

那麼Yahoo!為什麼沒有選擇Hadoop來處理呢? MapReduce系統主要解決的是對靜態資料的批量處理,即當前的MapReduce系統實現啟動計算時,一般資料已經到位了(比如儲存到了分散式檔案系統上)。

而流式計算系統在啟動時,一般資料並沒有完全到位,而是源源不斷地流入,並且不像批處理系統重視的是總資料處理的吞吐,而是對資料處理的latency,即希望進入的資料越快處理越好。

當然,現在也有很多基於Hadoop系統來處理流式資料。一般有以下幾種方式。

  • Micro-batchinMapReduce:就是把流式的資料按照時間或者大小形成小的靜態資料,然後定期啟動MapReduce來計算。
  • Continuous MapReduce:Hadoop Online(http://www.eecs.berkeley.edu/Pubs/TechRpts/2009/EECS-2009-136.html)通過實現作業內的資料傳輸Pipeline和作業間的資料傳輸Pipeline,可以實現online aggregation和continuous queries。當前MapReduce模型中,只有Map中間結果完全產生後,Reduce才會過來拖資料,等所有Map資料都拖成功後,才能計算。Hadoop Online實現了Map到Reduce間的資料Pipeline,使得可以在Map產生部分資料後,就可以送到Reduce端,以便Reduce可以提前或者定期計算。
  • Dynamic add input:百度的一種實現,用來解決計算時資料還沒有到位的問題。作業可以在資料還沒有完全到位的情況下啟動,當新資料累積到一定量時,通過一個命令列介面,向執行中的作業動態增加新的輸入。通過這種方式,大大減少了處理大資料作業時等待資料到位的時間,在依次執行多個作業時,也會有時間收益。

在論文中,對類似於第一種的方式,分析了它的缺點。如果將資料流切成較小的data segment,就會增加啟動作業的overhead,同時使得維護segment之間的依賴關係變得更加複雜;但如果切得較大,那麼處理的latency就會比較長。

隨著大量實時應用的發展,比如實時搜尋、實時交易系統、實時欺騙分析、實時監控、社交網路等,都需要一個高度可擴充套件的流式計算解決方案。不同於原來的流式計算系統,S4主要解決的是高資料率和大資料量的流式處理。

設計假設和目標

為了簡化設計,S4給出了下面的假設。

Lossy failover is acceptable,即一旦一個節點失敗,會failover到另一個standby節點,但是會丟失原節點的記憶體狀態。這也是為什麼說S4是一個部分容錯的系統。

節點不能動態增加和減少。

設計目標包括以下幾個方面。

  • 簡單的程式設計介面。
  • 高可用+高可擴充套件。
  • 盡力避免Disk IO,而要儘量使用Local Memory,以便減少處理latency。
  • 使用去中心化和對稱架構,所有的節點的責任相同,方便部署和維護。
  • 功能可插拔,使得平臺通用化的同時,做到可以定製化。
  • 設計要科學、易用和靈活。

S4的設計大量借鑑了I BM的Stream Processing Core(SPC)中介軟體的設計。只是 SPC採用的是Subscription Model,而S4結合了 MapReduce和Actors Model。

Event Stream

一個Stream是Events的序列流。每個Event是一個(K,A)資料,通過EventType來標示其型別。K、A分別表示這種型別的Event的keys和attributes。key和attribute都是tuple-valued,即key=value這種元組值。下面給出一個event的例子:

EV:ClickLog                         → event type

KEY:product=“search”, type=”online”         → keys

VAL: userid=”123”, ip=”10.0.0.0”, cookieid=”3”                            → attributes

Processing Elements

Processing Element(PE)是S4中的基本運算單元。一個PE通過下面四個元件來表示。

  • functionality:實現PE的Java類和相關配置來定義。
  • types of events:處理的event type。
  • key:關心哪種key。
  • Key的值:關心的key值是多少。

每個PE只負責處理自己所關心的eventtype,並且只處理自己所對應的key值的event。PE處理後可能輸出一個或多個event。當平臺處理一個key值時,會先檢查相應的PE是否已經存在,如果不存在,會先初始化相應的PE,然後交由這個PE進行處理。舉例如圖1所示。

在圖1中,PE2負責處理相應的單詞事件(WordEvent),主要邏輯是統計所關心單詞的個數,然後輸出給下游的PE。PE2所關心的eventtype為WorkEvent,所關心的key為word,所關心的key值為“said”。假如又來了一個WordEvent,key為word=“listen”,那麼這個事件就不是PE2所關心的,所以平臺可能會為“listen”值啟動一個新的PE來處理。

有一類特殊的PE,即keylessPE(沒有key和key值),這些PE會接收相應eventtype的所有event進行處理。這類PE主要用來作為S4cluster的輸入層(InputLayer),即外圍應用會產生相應的event(keylessevent),將這些event發到任何一個節點。而S4cluster中的每個節點都會啟動一個keylessPE,這些PE做簡單的輸入處理後,轉化為keyedevent,交給叢集中的其他PE型別進行處理。

PE的邏輯主要由應用程式設計師來開發。

Processing Node

Processing Node是一個邏輯節點,負責監聽訊息的到來,對訊息進行處理,然後通過Communication Layer將event在叢集中分發。S4主要依據上面提到的eventtype和key/key值,對key值求hash,在叢集中進行分發。關注的key集合通過配置檔案來得到。對於需要處理的event,會交給PN中的Processing Element Container(PEC),然後PEC呼叫相應的PE進行處理。PN功能框如圖2所示。

                                                                   

通過圖2的設計,可以保證,對應於相同event type,key和key值的event一定會被路由到對應的PN。

底下的Communication Layer和Zookeeper共同完成了叢集管理和自動failover功能。

程式設計模型

應用的主要任務就是實現一些相應的PE。PE一般提供如下介面供應用實現。

  • processEvent():用來處理每一個event,然後修改相應的內部狀態。
  • output():框架會按照應用的配置定期的呼叫,以便向下遊輸出其他event。應用可以使用兩種輸出配置,一個是隔多長時間輸出一次;另一個是隔多少event個數輸出一次。

其他

論文中給出了一個Word Count的例子,大家可以仔細研究一下。在效能測試部分,論文總結了將S4應用到實際的CTR(Click-Through Rate)預估中的效果。在應用舉例中,給出了S4在線上引數優化的應用。

隨著大量實時計算需求的增加,分散式流式計算將會成為分散式計算的下一個主要研究重點,將會成為類似Hadoop這類MapReduce框架的有力補充。這一方向的工作還處在初級發展階段,大家需要多加關注。

作者簡介

馬如悅,百度基礎架構部高階工程師,自2007年加入百度,一直從事分散式儲存系統和分散式計算系統的設計和開發工作。對Hadoop有較深入的研究,一直積極活動在Hadoop開源社群。