storm 工作原理詳解
1.1、概念
l Workers (JVMs): 在一個物理節點上可以執行一個或多個獨立的JVM 程序。一個Topology可以包含一個或多個worker(並行的跑在不同的物理機上), 所以worker process就是執行一個topology的子集, 並且worker只能對應於一個topology
l Executors (threads): 在一個worker JVM程序中執行著多個Java執行緒。一個executor執行緒可以執行一個或多個tasks。但一般預設每個executor只執行一個task。一個worker可以包含一個或多個executor, 每個component (spout
l Tasks(bolt/spout instances):Task就是具體的處理邏輯物件,每一個Spout和Bolt會被當作很多task在整個叢集裡面執行。每一個task對應到一個執行緒,而stream grouping則是定義怎麼從一堆task發射tuple到另外一堆task。你可以呼叫TopologyBuilder.setSpout和TopBuilder.setBolt來設定並行度 — 也就是有多少個task。
1.2、配置並行度
l 對於併發度的配置, 在storm裡面可以在多個地方進行配置, 優先順序為:
defaults.yaml < storm.yaml < topology-specific configuration
< internal component-specific configuration < external component-specific configuration
l worker processes的數目, 可以通過配置檔案和程式碼中配置, worker就是執行程序, 所以考慮併發的效果, 數目至少應該大亍machines的數目
l executor的數目
l tasks的數目, 可以不配置, 預設和executor1:1, 也可以通過setNumTasks()配置
Topology的worker數通過config設定,即執行該topology的worker(java)程序數。它可以通過 storm rebalance 命令任意調整。
Config conf = newConfig(); conf.setNumWorkers(2); //用2個worker topologyBuilder.setSpout("blue-spout", newBlueSpout(), 2); //設定2個併發度 topologyBuilder.setBolt("green-bolt", newGreenBolt(), 2).setNumTasks(4).shuffleGrouping("blue-spout"); //設定2個併發度,4個任務 topologyBuilder.setBolt("yellow-bolt", newYellowBolt(), 6).shuffleGrouping("green-bolt"); //設定6個併發度 StormSubmitter.submitTopology("mytopology", conf, topologyBuilder.createTopology()); |
3個元件的併發度加起來是10,就是說拓撲一共有10個executor,一共有2個worker,每個worker產生10 / 2 = 5條執行緒。
綠色的bolt配置成2個executor和4個task。為此每個executor為這個bolt執行2個task。
l 動態的改變並行度
Storm支援在不 restart topology 的情況下, 動態的改變(增減) worker processes 的數目和 executors 的數目, 稱為rebalancing. 通過Storm web UI,或者通過storm rebalance命令實現:
storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10 |
2、Storm通訊機制
Worker間的通訊經常需要通過網路跨節點進行,Storm使用ZeroMQ或Netty(0.9以後預設使用)作為程序間通訊的訊息框架。
Worker程序內部通訊:不同worker的thread通訊使用LMAX Disruptor來完成。
不同topologey之間的通訊,Storm不負責,需要自己想辦法實現,例如使用kafka等;
2.1、Worker程序間通訊
worker程序間訊息傳遞機制,訊息的接收和處理的大概流程見下圖
ü 對於worker程序來說,為了管理流入和傳出的訊息,每個worker程序有一個獨立的接收執行緒(對配置的TCP埠supervisor.slots.ports進行監聽);
對應Worker接收執行緒,每個worker存在一個獨立的傳送執行緒,它負責從worker的transfer-queue中讀取訊息,並通過網路傳送給其他worker
ü 每個executor有自己的incoming-queue和outgoing-queue。
Worker接收執行緒將收到的訊息通過task編號傳遞給對應的executor(一個或多個)的incoming-queues;
每個executor有單獨的執行緒分別來處理spout/bolt的業務邏輯,業務邏輯輸出的中間資料會存放在outgoing-queue中,當executor的outgoing-queue中的tuple達到一定的閥值,executor的傳送執行緒將批量獲取outgoing-queue中的tuple,併發送到transfer-queue中。
ü 每個worker程序控制一個或多個executor執行緒,使用者可在程式碼中進行配置。其實就是我們在程式碼中設定的併發度個數。
2.2、Worker程序間通訊分析
1、 Worker接受執行緒通過網路接受資料,並根據Tuple中包含的taskId,匹配到對應的executor;然後根據executor找到對應的incoming-queue,將資料存傳送到incoming-queue佇列中。
2、 業務邏輯執行現成消費incoming-queue的資料,通過呼叫Bolt的execute(xxxx)方法,將Tuple作為引數傳輸給使用者自定義的方法
3、 業務邏輯執行完畢之後,將計算的中間資料傳送給outgoing-queue佇列,當outgoing-queue中的tuple達到一定的閥值,executor的傳送執行緒將批量獲取outgoing-queue中的tuple,併發送到Worker的transfer-queue中
4、 Worker傳送執行緒消費transfer-queue中資料,計算Tuple的目的地,連線不同的node+port將資料通過網路傳輸的方式傳送給另一個的Worker。
5、 另一個worker執行以上步驟1的操作。
2.3、Worker程序間技術(Netty、ZeroMQ)
2.3.1、Netty
Netty是一個NIO client-server(客戶端伺服器)框架,使用Netty可以快速開發網路應用,例如伺服器和客戶端協議。Netty提供了一種新的方式來使開發網路應用程式,這種新的方式使得它很容易使用和有很強的擴充套件性。Netty的內部實現時很複雜的,但是Netty提供了簡單易用的api從網路處理程式碼中解耦業務邏輯。Netty是完全基於NIO實現的,所以整個Netty都是非同步的。
書籍:Netty權威指南
2.3.2、ZeroMQ
ZeroMQ是一種基於訊息佇列的多執行緒網路庫,其對套接字型別、連線處理、幀、甚至路由的底層細節進行抽象,提供跨越多種傳輸協議的套接字。ZeroMQ是網路通訊中新的一層,介於應用層和傳輸層之間(按照TCP/IP劃分),其是一個可伸縮層,可並行執行,分散在分散式系統間。
ZeroMQ定位為:一個簡單好用的傳輸層,像框架一樣的一個socket library,他使得Socket程式設計更加簡單、簡潔和效能更高。是一個訊息處理佇列庫,可在多個執行緒、核心和主機盒之間彈性伸縮。ZMQ的明確目標是“成為標準網路協議棧的一部分,之後進入Linux核心”。
2.4、Worker 內部通訊技術(Disruptor)
2.4.1、 Disruptor的來歷
ü 一個公司的業務與技術的關係,一般可以分為三個階段。第一個階段就是跟著業務跑。第二個階段是經歷了幾年的時間,才達到的驅動業務階段。第三個階段,技術引領業務的發展乃至企業的發展。所以我們在學習Disruptor這個技術時,不得不提LMAX這個機構,因為Disruptor這門技術就是由LMAX公司開發並開源的。
ü LMAX是在英國註冊並受到FSA監管(監管號碼為509778)的外匯黃金交易所。LMAX也是歐洲第一家也是唯一一家採用多邊交易設施Multilateral Trading Facility(MTF)擁有交易所牌照和經紀商牌照的歐洲頂級金融公司
ü LAMX擁有最迅捷的交易平臺,頂級技術支援。LMAX交易所使用“(MTF)分裂器Disruptor”技術,可以在極短時間內(一般在3百萬秒之一內)處理訂單,在一個執行緒裡每秒處理6百萬訂單。所有訂單均為撮合成交形式,無一例外。多邊交易設施(MTF)曾經用來設計倫敦證券交易 所(london Stock Exchange)、德國證券及衍生工具交易所(Deutsche Borse)和歐洲證券交易所(Euronext)。
ü 2011年LMAX憑藉該技術獲得了金融行業技術評選大賽的最佳交易系統獎和甲骨文“公爵杯”創新程式設計框架獎。
2.4.2、Disruptor是什麼
1、 簡單理解:Disruptor是一個Queue。Disruptor是實現了“佇列”的功能,而且是一個有界佇列。而佇列的應用場景自然就是“生產者-消費者”模型。
2、 在JDK中Queue有很多實現類,包括不限於ArrayBlockingQueue、LinkBlockingQueue,這兩個底層的資料結構分別是陣列和連結串列。陣列查詢快,連結串列增刪快,能夠適應大多數應用場景。
3、 但是ArrayBlockingQueue、LinkBlockingQueue都是執行緒安全的。涉及到執行緒安全,就會有synchronized、lock等關鍵字,這就意味著CPU會打架。
4、 Disruptor一種執行緒之間資訊無鎖的交換方式(使用CAS(Compare And Swap/Set)操作)。
2.4.2、Disruptor主要特點
1、 沒有競爭=沒有鎖=非常快。
2、 所有訪問者都記錄自己的序號的實現方式,允許多個生產者與多個消費者共享相同的資料結構。
3、 在每個物件中都能跟蹤序列號(ring buffer,claim Strategy,生產者和消費者),加上神奇的cache line padding,就意味著沒有為偽共享和非預期的競爭。
2.4.2、 Disruptor 核心技術點
Disruptor可以看成一個事件監聽或訊息機制,在佇列中一邊生產者放入訊息,另外一邊消費者並行取出處理.
底層是單個數據結構:一個ring buffer。
每個生產者和消費者都有一個次序計算器,以顯示當前緩衝工作方式。
每個生產者消費者能夠操作自己的次序計數器的能夠讀取對方的計數器,生產者能夠讀取消費者的計算器確保其在沒有鎖的情況下是可寫的。
核心元件
ü Ring Buffer 環形的緩衝區,負責對通過 Disruptor 進行交換的資料(事件)進行儲存和更新。
ü Sequence 通過順序遞增的序號來編號管理通過其進行交換的資料(事件),對資料(事件)的處理過程總是沿著序號逐個遞增處理。
ü RingBuffer底層是個陣列,次序計算器是一個64bit long 整數型,平滑增長。
1、 接受資料並寫入到腳標31的位置,之後會沿著序號一直寫入,但是不會繞過消費者所在的腳標。
2、 Joumaler和replicator同時讀到24的位置,他們可以批量讀取資料到30
3、消費邏輯執行緒讀到了14的位置,但是沒法繼續讀下去,因為他的sequence暫停在15的位置上,需要等到他的sequence給他序號。如果sequence能正常工作,就能讀取到30的資料。
3、Storm元件本地目錄樹
4、Storm zookeeper目錄樹
5、Storm 任務提交的過程
TopologyMetricsRunnable.TaskStartEvent[oldAssignment=<null>,newAssignment=Assignment[masterCodeDir=C:\Users\MAOXIA~1\AppData\Local\Temp\\e73862a8-f7e7-41f3-883d-af494618bc9f\nimbus\stormdist\double11-1-1458909887,nodeHost={61ce10a7-1e78-4c47-9fb3-c21f43a331ba=192.168.1.106},taskStartTimeSecs={1=1458909910, 2=1458909910, 3=1458909910, 4=1458909910, 5=1458909910, 6=1458909910, 7=1458909910, 8=1458909910},workers=[ResourceWorkerSlot[hostname=192.168.1.106,memSize=0,cpu=0,tasks=[1, 2, 3, 4, 5, 6, 7, 8],jvm=<null>,nodeId=61ce10a7-1e78-4c47-9fb3-c21f43a331ba,port=6900]],timeStamp=1458909910633,type=Assign],task2Component=<null>,clusterName=<null>,topologyId=double11-1-1458909887,timestamp=0] |
6、Storm 訊息容錯機制
6.1、總體介紹
l 在storm中,可靠的資訊處理機制是從spout開始的。
l 一個提供了可靠的處理機制的spout需要記錄他發射出去的tuple,當下遊bolt處理tuple或者子tuple失敗時spout能夠重新發射。
l Storm通過呼叫Spout的nextTuple()傳送一個tuple。為實現可靠的訊息處理,首先要給每個發出的tuple帶上唯一的ID,並且將ID作為引數傳遞給SoputOutputCollector的emit()方法:collector.emit(new Values("value1","value2"), msgId); messageid就是用來標示唯一的tupke的,而rootid是隨機生成的
給每個tuple指定ID告訴Storm系統,無論處理成功還是失敗,spout都要接收tuple樹上所有節點返回的通知。如果處理成功,spout的ack()方法將會對編號是msgId的訊息應答確認;如果處理失敗或者超時,會呼叫fail()方法。
6.2、基本實現
Storm 系統中有一組叫做"acker"的特殊的任務,它們負責跟蹤DAG(有向無環圖)中的每個訊息。
acker任務儲存了spout id到一對值的對映。第一個值就是spout的任務id,通過這個id,acker就知道訊息處理完成時該通知哪個spout任務。第二個值是一個64bit的數字,我們稱之為"ack val", 它是樹中所有訊息的隨機id的異或計算結果。
ack val表示了整棵樹的的狀態,無論這棵樹多大,只需要這個固定大小的數字就可以跟蹤整棵樹。當訊息被建立和被應答的時候都會有相同的訊息id傳送過來做異或。每當acker發現一棵樹的ack val值為0的時候,它就知道這棵樹已經被完全處理了
6.3、可靠性配置
有三種方法可以去掉訊息的可靠性:
將引數Config.TOPOLOGY_ACKERS設定為0,通過此方法,當Spout傳送一個訊息的時候,它的ack方法將立刻被呼叫;
Spout傳送一個訊息時,不指定此訊息的messageID。當需要關閉特定訊息可靠性的時候,可以使用此方法;
最後,如果你不在意某個訊息派生出來的子孫訊息的可靠性,則此訊息派生出來的子訊息在傳送時不要做錨定,即在emit方法中不指定輸入訊息。因為這些子孫訊息沒有被錨定在任何tuple tree中,因此他們的失敗不會引起任何spout重新發送訊息。