1. 程式人生 > >深度解析 Twitter Heron 大資料實時分析系統

深度解析 Twitter Heron 大資料實時分析系統

所有的老的生產環境的topology已經執行在Heron上, 每天大概處理幾十T的資料, billions of訊息

為什麼要重新設計Heron:

【題外話】這裡完全引用作者吐槽的問題, 不少問題,其實JStorm已經解決

(1)debug-ability 很差, 出現問題,很難發現

1.1 多個task執行在一個系統程序中, 很難定位問題。需要一個清晰的邏輯計算單元到物理計算單元的關係

(2)需要一種更高階的資源池管理系統

2.1 可以和其他程式設計框架共享資源, 說白了,就是類似yarn/mesos, 而在Twitter就是Aurora 2.2 更簡單的彈性擴容和縮容 叢集 2.3 因為不同task,對資源需求是不一樣的, 而storm會公平對待每個worker, 因此會存在worker浪費記憶體問題。當worker記憶體特別大時, 進行jstack或heap dump時,特別容易引起gc,導致被supervisor幹掉 2.4 經常為了避免效能故障,常常進行超量資源分配, 原本100個core,分配了200個

(3)認為Storm設計不合理的地方

3.1 一個executor 存在2個執行緒, 一個執行執行緒, 一個傳送執行緒, 並且一個executor執行多個task, task的排程完全依賴來源的tuple, 很不方便確認哪個task出了問題。 3.2 因為多種task執行在一個worker中, 無法明確出每種task使用的資源, 也很難定位出問題的task,當出現效能問題或其他行為時, 常用就是重啟topology, 重啟後就好了,因為task進行了重新排程 3.3 日誌打到同一個檔案中,也很難查詢問題,尤其是當某個task瘋狂的列印日誌時 3.4 當一個task掛掉了,直接會幹掉worker,並強迫其他執行好的task被kill掉 3.5 最大的問題是,當topology某個部分出現問題時, 會影響到topology其他的環節 3.6 gc引起了大量的問題 3.7 一條訊息至少經過4個執行緒, 4個佇列, 這會觸發執行緒切換和佇列競爭問題 3.8 nimbus功能太多, 排程/監控/分發jar/metric report, 經常會成為系統的bottleneck 3.9 storm的worker沒有做到資源保留和資源隔離, 因此存在一個worker會影響到另外的worker。 而現有的isolation排程會帶來資源浪費問題。 Storm on Yarn也沒有完全解決這個問題。 3.10 zookeeper成為系統的瓶頸, 當叢集規模增大時。 有些系統為了降低zk心態,新增了tracker,但tracker增加了系統運維難度。 3.11 nimbus是系統單點 3.12 缺乏反壓機制 3.12.1 當receiver忙不過來時, sender就直接扔棄掉tuple, 3.12.2 如果關掉acker機制, 那無法量化drop掉的tuple 3.12.3 因為上游worker執行的計算就被扔棄掉。 3.12.4. 系統會變的難以預測(less predictable.) 3.13 常常出現效能問題, 導致tuple fail, tuple replay, 執行變慢 3.13.1 不良的replay, 任意一個tuple失敗了,都會導致整個tuple tree fail, 不良的設計時(比如不重要的tuple失敗),會導致tuple輕易被重發 3.13.2 當記憶體很大時,長時間的gc,導致處理延時,甚至被誤殺 3.13.3 佇列競爭

Heron設計原則:

(1)相容老的storm api

(2)實現2種策略, At most once/At least once

架構:

排程器

Aurora是一個基於mesos的通用service scheduler, Hero基於Aurora 實現了一套Topology Scheduler, 並且這個排程器已經提供了一定的抽象,可以移植到yarn/mesos/ec2(我的理解應該稍加修改就可以執行在其他通用型排程器上) 2/ 第一個container 執行 Topology Manager(TM), 其他的container 內部會執行一個Stream manager/Metrics Manager 和多個Heron Instance。 這裡一個container類似一個docker感念,表示一個資源集合,是Aurora的排程單元, 多個container可以執行在一臺機器上, 分配多少container由Aurora根據現有資源情況進行分配, 另外一個container設定了cgroup

Topology Manager

1. tm伴隨整個topology生命週期, 提供topology狀態的唯一contact (類似yarn的app master) 2. 可以一主多備, 大家搶佔zk 節點, 誰勝出,誰為master, 其他為standby

Stream manager(SM)

最大的改變就是源自Stream manager, Stream manager就相當於一個container的tuple的匯流排(hub)。 所有的Hero Instance(HI)都連線SM進行send/receive 如果container內部一個HI 傳送資料到另外一個HI,走的是本地快速通道。

Backpressure 反壓機制

當下遊處理速度變慢後,通過反壓機制,可以通知上游進行減速, 避免資料因buffer被塞滿而丟失,並因此帶來資源浪費。

TCP 反壓:

當一個HI 處理慢了後,則該HI的接收buffer會被填滿, 緊接著本地SM的sending buffer被填滿, ? 然後會傳播到其他的SM和上游HI。 這個機制很容易實現,但在實際使用中,存在很多問題。因為多個HI 共用SM, 不僅將上游的HI 降速了,也把下游的HI 降速。從而整個topology速度全部下架,並且長時間的降級。

Spout 反壓。

這個機制是結合TCP 反壓機制, 一旦SM 發現一個或多個HI 速度變慢,立刻對本地spout進行降級, 停止從這些spout讀取資料。並且受影響的SM 會發送一個特殊的start backpressure message 給其他的sm,要求他們對spout進行本地降級。一旦出問題的HI 恢復速度後,本地的SM 會發送 stop backpressure message 解除降級。

Stage-by-Stage 反壓

這個類似spout反壓,但是一級一級向上反壓。 Heron最後採用的是spout反壓, 因為實現比較簡單,而且降級響應非常迅速。 並且可以很快定位到那個HI 處理速度慢了。 每個socket channel都綁定了一個buffer, 當buffer 的 queue size超過警戒水位時,觸發反壓,減少時,接觸反壓。 這種機制,不會丟棄tuple,除了機器宕機。 topology可以設定開啟或關閉。

Heron Instance

(1) 一個task 一個程序, (2) 所有的程序之間通訊都是使用protocol buffer (3) 一個gateway執行緒, 一個執行執行緒。 gateway執行緒負責和外圍通訊, sm/mm。 執行執行緒和現有storm的執行執行緒非常類似。執行執行緒會收集所有的metrics,然後傳送給gateway執行緒。 (4)這個data-in/data-out佇列會限定大小, 當data-in 佇列滿了的時候, gateway執行緒停止從local SM 讀取資料。同理如果data-out佇列滿,gateway會認為local SM不想接受更多的資料。 執行執行緒就不再emit或執行更多的tuple。 (5)data-in/data-out佇列大小不是固定, 如果是固定時, 當網路顛簸時,會導致記憶體中大量資料堆積無法傳送出去,並觸發GC, 並導致進一步的降級。因此是動態調整, 定期調整佇列大小。 如果佇列的capacity超過閥值時, 對其進行減半。這個操作持續進行指導佇列的capacity維持在一個穩定的水位或0。這種方式有利避免GC的影響。 當佇列的capcity小於某個閥值時, 會緩慢增長到配置大小或最大capacity值。

Metrics manager(mm)

收集所有的metrics,包括系統的和使用者的metrics, 也包含SM的。 mm會發送metrics 給monitor系統(類似ganglia系統),同樣也會給TM.

流程:

(1)提交任務, Aurora分配必要的資源和在一些機器上排程container (2)TM 在一個container上執行起來,並註冊到ZK (3)每個container的SM 查詢ZK 找到TM, 向TM 傳送心跳。 (4)當所有的SM 連上TM後, TM 執行分配演算法, 不同的compoent到不同的container。 這個階段叫物理執行計劃(類似SQL解析和執行過程)。並將執行計劃放到ZK。 (5)SM 下載執行計劃,並開始相互之間進行連線, 與此同時, 啟動HI, hi開始發現container,下載他們的執行計劃,並開始執行 (6)整個topology完成初始化,開始正式的傳送和接收資料。

三種failure case

1. 程序掛了 1.1 如果TM 掛了, container會重啟TM, TM 會從ZK 上重新下載執行計劃。如果有一主多備,則備機會被promotion。 所有SM 會切到新的TM 1.2 如果SM 掛了, container依舊會重啟TM, 並從ZK 下載執行計劃, 並檢查是否有變化。其他的SM 會連到新的SM 1.3 如果HI 掛了, 重啟並下載執行計劃,並重新執行。

外圍系統

外圍系統就介紹一下Heron Tracker

Heron Tracker

負責收集topology的資訊, 類似一個gateway的角色。 通過watch zk,發現新的TM, 並獲取topology的一些原資料。是一種Aurora service, 提供load balance在多個instance之間。 可以提供REST API。可以獲取 (1) 邏輯和物理執行計劃 (2) 各種metrics, 系統的和使用者的 (3)日誌link

Heron UI/VIZ

UI 提供傳統的UI 方式。 VIZ 提供全新的UI, 可以看到更多的metrics, 曲線和健康檢查。比UI 炫酷很多。

效能報告和測試過程:

瞭解整個系統架構和工作流程後, 後面的效能測試報告, 沒有看了, 也差不多有個概念了。

個人思考和總結:

(1) 相對於JStorm, Heron把角色剝離的更清晰明瞭。

(1.1)排程器

scheduler 負責container的排程,這個排程非常的純粹,可以直接複用yarn/mesos/, 現有的TM 其實就是nimbus,唯一一點變化就是這個TM 只負責自己topology的資訊, 不是負責所有topology。這個TM 就相當於yarn下的app master, 非常適合目前主流的排程系統。 當叢集規模非常大的時候, 並且每個應用都比較大的時候, 這個架構會非避免nimbus成為瓶頸。 不過storm-on-yarn模式下, 可能通過一個nimbus管理一個小的邏輯叢集,也可以解決這個問題, 並且當topology 比較小的時候, 可以通過大家公用一個nimbus,節省一些資源。

(1.2) container

這裡特別要把container拿出來仔細說一下, 這個container是Auron的一個資源單元。如果將Auron類似JStorm的worker, 你就會發現角色和架構是多麼的類似。 (1.2.1) container和jstorm的worker都可以設定cgroup,達到一定的資源隔離 (1.2.2)container內部的SM/MM 其實就類似jstorm worker內部drainer/dispatcher/metricsreport執行緒。 但container 相對jstorm 的worker 還有一些其他的優缺點: 優點: (1.2.3)這個粒度可以控制的更自由, 這個container 可以控制cpu 到更多的核,更多的記憶體上限。 但jstorm的worker 基本上最多10個核, 而且當記憶體太大,在core dump和gc的時候壓力會比較大。 (1.2.4)container還帶一定的supervisor的功能,當container內部任何程序掛了, container都會負責把它重啟, 因此整個系統的心態邏輯會非常的簡單。 ?Auron <–> container, ? ?Container <– > tm/sm/mm/hi. ?整個系統的心跳壓力模型會更簡單, 心跳壓力(對ZK)也更小

效能:

ppt和文件裡面說效能有15倍以上的提升, 這個在某些設定下是可以達到這種效果, 但通常情況效能應該比JStorm還要差一點點。 如何達到這種效果呢, (1)前提條件是, grouping方式不是選擇localOrShuffle或者localFirst ?就是把container設定的儘可能的大, 最好是獨佔一臺機器。這樣SM和SM 之間的通訊就會大幅減少, 而一個container內部的HI 通訊走內部通道。因此會有更多的HI走內部通道。而jstorm/storm, worker比較多的時候, worker和worker之間會建立netty connection, 更多的netty connection會帶來更多的記憶體消耗和執行緒切換。 尤其是worker數超過200個以上時。 但為什麼說通常情況下,效能應該還要比JStorm差一點點呢。 因為在生產環境, container 是不可能佔有這麼多資源, 否則Auron的排程太粗粒度,一臺機器只跑一個大container, 會導致更嚴重的資源浪費。正常情況下, 一個container繫結2 ~ 4個核, 這個時候,和一個普通的jstorm worker沒有什麼區別, 但jstorm worker內部task之間資料傳輸的效率會遠遠高於Heron, 因為Heron的HI 之間即使是走程序間通訊方式, 也逃脫不了序列化和反序化的動作, 這個動作肯定會耗時, 更不用說IPC 之間的通訊效率和程序內的通訊效率。

資源利用率:

Heron 可以非常精準的控制資源使用情況, 能夠保證, 申請多少資源,就會用多少資源。 在大叢集這個級別會節省資源,在topology級別浪費資源。 如果JStorm-on-yarn這種系統下, 因為每個邏輯叢集會超量申請一些資源, 因此資源可能會多有少量浪費。無法做到像Heron一樣精準。 如果改造nimbus成為topology level 類似TM(騰訊在jstorm基礎上實現了這個功能), 這個問題就可以很好的解決。在普通standalone的JStorm模式下, jstorm不會浪費資源, 但因為Standalone,導致這些機器不能被其他程式設計框架使用, 因此也可以說浪費一定的資源。 但這種情況就是 資源隔離性– 資源利用率的一種平衡, 現在這種根據線上執行情況,浪費程度可以接受。 在topology這個粒度進行比較時, Heron應該會消耗掉更多的資源。 最大的問題在於, Heron中一個task就是一個process, 論文中沒有描敘這個process的公共執行緒, 可以肯定的是, 這個process比如還有大量的公共執行緒, 比如ZK-client/network-thread/container-heartbeat-thread, 一個task一個process,這種設計,相對於一個worker跑更多的task而言,肯定浪費了更多的CPU 和記憶體。 至於吐槽在Storm和JStorm,超量申請資源問題, 比如一個topology只要100 個cpu core能完成, 申請了600個core, 這個問題,在jstorm中是絕對不存在的, jstorm的cgroup設定是share + limit方式, 也就是上限是600 core,但topology如果用不到600個core, 別的topology可以搶佔到cpu core。 在記憶體方面, jstorm的worker 記憶體申請量,是按照worker最大記憶體申請, 但現代作業系統早就做到了, 給你一個上限, 當你用不了這麼多的時候, 其他程序可以搶佔。

在穩定性和debug-ability這點上:

Heron 優勢非常大, 主要就是通過2點: (1) 自動降級策略, 也就是論文說的backpressure, 這個對於大型應用是非常有效的, 也很顯著提高穩定性。 (2) 一個task一個process, 這個結合降級策略,可以非常快速定位到出錯的task, 另外因為一個task 一個process, task之間的影響會非常快, 另外也避免了一個程序使用過大的記憶體,從而觸發嚴重的GC 問題。

最後總結:

Heron更適合超大規模的機器, 超過1000臺機器以上的叢集。 在穩定性上有更優異的表現, 在效能上,表現一般甚至稍弱一些,在資源使用上,可以和其他程式設計框架共享資源,但topology級別會更浪費一些資源。 另外應用更偏向於大應用,小應用的話,會多一點點資源浪費, 對於大應用,debug-ability的重要性逐漸提升。 另外對於task的設計, task會走向更重更復雜, 而JStorm的task是向更小更輕量去走。 未來JStorm可以把自動降級策略引入, 通過實現阿里媽媽的ASM, debug-ability應該遠超過storm, 不會遜色於Heron, 甚至更強。

其他流式程式設計框架

1.S4 Distributed Stream Computing Platform.?http://incubator.apache.org/s4/ 2. Spark Streaming. https://spark.apache.org/streaming/? 3. Apache Samza. http://samza.incubator.apache.org 4. Tyler Akidau, Alex Balikov, Kaya Bekiroglu, Slava Chernyak, Josh?Haberman, Reuven Lax, Sam McVeety, Daniel Mills, Paul?Nordstrom, Sam Whittle: MillWheel: Fault-Tolerant Stream?Processing at Internet Scale.?PVLDB 6(11): 1033-1044 (2013) 5.?Mohamed H. Ali, Badrish Chandramouli, Jonathan Goldstein,Roman Schindlauer: The Extensibility Framework in Microsoft?StreamInsight.?ICDE?2011: 1242-1253 6. Rajagopal Ananthanarayanan, Venkatesh Basker, Sumit Das, Ashish?Gupta, Haifeng Jiang, Tianhao Qiu, Alexey Reznichenko, Deomid?Ryabkov, Manpreet Singh, Shivakumar Venkataraman: Photon:?Fault-tolerant and Scalable Joining of Continuous Data Streams.?SIGMOD?2013: 577-588 8. Simon Loesing, Martin Hentschel, Tim Kraska, Donald Kossmann:?Stormy: An Elastic and Highly Available Streaming Service in the?Cloud. EDBT/ICDT Workshops 2012: 55-60