1. 程式人生 > >Storm雜談之Topology的啟動過程(二)

Storm雜談之Topology的啟動過程(二)

在一中講到了topology提交給nimbus

nimbus

Nimbus可以 說是storm中最核心的部分,它的主要功能有兩個:

  • 對Topology的任務進行分配資源
  • 接收使用者的命令並做相應的處理,如Topology的提交,殺死,啟用等等
Nimbus本身是基於Thrift框架實現的,使用了Thrift的THsHaServer服務,即半同步半非同步服務模式,使用一個單獨的執行緒來處理網路IO,使用一個獨立的執行緒池來處理訊息,大大提高了訊息的併發處理能力。

服務介面的定義都在storm.thrift檔案中定義,貼下部分程式碼:

service Nimbus {
  void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
  void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
  void killTopology(1: string name) throws (1: NotAliveException e);
  void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
  void activate(1: string name) throws (1: NotAliveException e);
  void deactivate(1: string name) throws (1: NotAliveException e);
  void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);

  // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs

  string beginFileUpload();
  void uploadChunk(1: string location, 2: binary chunk);
  void finishFileUpload(1: string location);
  
  string beginFileDownload(1: string file);
  //can stop downloading chunks when receive 0-length byte array back
  binary downloadChunk(1: string id);

  // returns json
  string getNimbusConf();
  // stats functions
  ClusterSummary getClusterInfo();
  TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
  //returns json
  string getTopologyConf(1: string id) throws (1: NotAliveException e);
  StormTopology getTopology(1: string id) throws (1: NotAliveException e);
  StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);
}

當執行命令  nohup ${STORM_HOME}/bin/storm nimbus & 時,會啟動nimbus服務,具體的程式碼執行:

storm python指令碼程式碼,預設啟動backtype.storm.daemon.nimbus程式:

def nimbus(klass="backtype.storm.daemon.nimbus"):
    """Syntax: [storm nimbus]

    Launches the nimbus daemon. This command should be run under 
    supervision with a tool like daemontools or monit. 

    See Setting up a Storm cluster for more information.
    (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
    """
    cppaths = [CLUSTER_CONF_DIR]
    jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
        "-Dlogfile.name=nimbus.log",
        "-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml",
    ]
    exec_storm_class(
        klass, 
        jvmtype="-server", 
        extrajars=cppaths, 
        jvmopts=jvmopts)

然後執行nimbus.clj 指令碼,主要涉及兩個方法——launch-server!(nimbus的啟動入口)和service-handler(真正定義處理邏輯的地方)。

nimbus啟動後,對外提供了一些服務,topology的提交,UI資訊,topology的kill,rebalance等等。在文章一中講到提交topology給nimbus,這些服務的處理邏輯全部在service-handler方法中。以下擷取service-handler裡面處理提交Topology的邏輯

(reify Nimbus$Iface
      (^void submitTopologyWithOpts
        [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
         ^SubmitOptions submitOptions]
        (try
          (assert (not-nil? submitOptions))
          (validate-topology-name! storm-name)
          (check-storm-active! nimbus storm-name false)
          (let [topo-conf (from-json serializedConf)]
            (try
              (validate-configs-with-schemas topo-conf)
              (catch IllegalArgumentException ex
                (throw (InvalidTopologyException. (.getMessage ex)))))
            (.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)
                       storm-name
                       topo-conf
                       topology))
          (swap! (:submitted-count nimbus) inc)
          (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
                storm-conf (normalize-conf
                            conf
                            (-> serializedConf
                                from-json
                                (assoc STORM-ID storm-id)
                              (assoc TOPOLOGY-NAME storm-name))
                            topology)
                total-storm-conf (merge conf storm-conf)
                topology (normalize-topology total-storm-conf topology)
                storm-cluster-state (:storm-cluster-state nimbus)]
            (system-topology! total-storm-conf topology) ;; this validates the structure of the topology
            (log-message "Received topology submission for " storm-name " with conf " storm-conf)
            ;; lock protects against multiple topologies being submitted at once and
            ;; cleanup thread killing topology in b/w assignment and starting the topology
            (locking (:submit-lock nimbus)
              (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
              (.setup-heartbeats! storm-cluster-state storm-id)
              (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
                                              TopologyInitialStatus/ACTIVE :active}]
                (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))
              (mk-assignments nimbus)))
          (catch Throwable e
            (log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
            (throw e))))
      
      (^void submitTopology
        [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
        (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
                                 (SubmitOptions. TopologyInitialStatus/ACTIVE)))
檢查Topology的DAG圖是否是有效連線圖、以及該topology Name是否已經存在,然後分配資源和任務排程(mk-assignments )方法,等分配好資源之後,把資料寫入到zookeeper,watcher發現有資料,就通知supervisor讀取資料啟動新的worker,一個worker就是一個JVM程序,worker啟動後就會按照使用者事先定好的task數來啟動task,一個task就是一個thread

在executor.clj中mk-threads: spout ,mk-threads: bolt方法就是啟動task,而task就是對應的spout或bolt 元件,而且這時Spout的open,nextTuple方法,以及bolt的preapre,execute方法都是在這裡被呼叫的,結合文章一中提到的,對於

Spout 方法呼叫順序:

declareOutputFields-> open -> nextTuple -> fail/ack or other

Bolt 方法呼叫順序:

declareOutputFields-> prepare -> execute

需要的注意的是在Spout中fail、ack方法和nextTuple是在同一執行緒中被順序呼叫的,所以在nextTuple中不要做延遲很大的操作。

至此,一個topology算是可以正式啟動工作了。

相關推薦

Storm雜談Topology啟動過程

在一中講到了topology提交給nimbus nimbus Nimbus可以 說是storm中最核心的部分,它的主要功能有兩個: 對Topology的任務進行分配資源接收使用者的命令並做相應的處理,如Topology的提交,殺死,啟用等等Nimbus本身是基於Thr

elasticsearch原始碼分析啟動過程

最近開始廣泛的使用elasticsearch,也開始寫一些java程式碼了,為了提高java程式碼能力,也為了更加深入一點了解elasticsearch的內部運作機制,所以開始看一些elasticsearch的原始碼了。對於這種廣受追捧的開源專案,細細品讀一定會受益匪淺,

hadoop啟動過程secondNameNode

  作用:定期將namenode的fsimage和edits合併(資料或者操作不多的時候可以關閉 ),可加速hdfs啟動(如果edits很多的話,開啟會很難)   SecondNameNode: 它會定期的和namenode就行通訊來完成整個的備份操作(????更

AndroidO audio系統audioflinger啟動分析

1. audioflinger建立過程 在Android8.0的音訊系統中,AudioFlinger是一個C++的Binder服務,執行在HAL程序中,它是在audioserver.c //frameworks/av/media/audioserver/

Spring啟動流程Spring載入Bean Definition的流程

繼上篇Spring啟動流程(一) prepareRefresh() prepareRefresh();//初始化配置和環境 obtainFreshBeanFactory() // Tell the subclass to refresh the internal bean fac

Node.js在服務端啟動網頁

現在我們學習一下關於檔案路徑的相關服務。 http://127.0.0.1 這是網路路徑 var http = require('http');//建立伺服器的 var fs = require('fs'); var path = require('path'); //引

大資料叢集搭建節點的網路配置過程

緊接著上一章來設定windows的vmnet8的ip地址和虛擬機器中centos的ip地址。 NAT虛擬網路的配置圖如下圖所示: 1、這裡根據VMware中得到的閘道器地址去設定vmnet8的ip地址。 閘道器地址檢視: 2、得到的閘道器地址後去

Android系統啟動流程解析Zygote程序啟動過程

前言 上一篇文章我們分析了init程序的啟動過程,啟動過程中主要做了三件事,其中一件就是建立了Zygote程序,那麼Zygote程序是什麼,它做了哪些事呢?這篇文章會給你這些問題的答案。 1.Zygote簡介 在Android系統中,DVM(D

Linux載入啟動可執行程式的過程直譯器完成動態連結

接著上一篇部落格。前面的工作都是在核心完成的,接下來會回到使用者空間。第一步,直譯器(也可以叫動態連結器)首先檢查可執行程式所依賴的共享庫,並在需要的時候對其進行載入。ELF 檔案有一個特別的節區: .dynamic,它存放了和動態連結相關的很多資訊,例如動態連結器通過它找到

Oracle儲存過程merge into 函式

今天主要說明的是merger into 的各個層次關係,這個對於寫儲存過程非常重要!!!希望對大家有所幫助。 首先貼出的是今天寫的一些錯誤SQL: 執行提示無效的SQL語句!!! 第一個錯誤,單獨寫儲存過程來測試的話,第一句是不需要的,這個大家切

從壹開始微服務 [ DDD ] 十一 ║ 基於原始碼分析,命令分發的過程

緣起 哈嘍小夥伴週三好,老張又來啦,DDD領域驅動設計的第二個D也快說完了,下一個系列我也在考慮之中,是 Id4 還是 Dockers 還沒有想好,甚至昨天我還想,下一步是不是可以寫一個簡單的Angular 入門教程,本來是想來個前後端分離的教學視訊的,簡單試了試,發現自己的聲音不好聽,真心不好聽那種,就作

OpenStack原始碼分析Nova-Compute服務啟動過程icehouse

學習OpenStack有半年多了,一直都停留在使用和trouble shooting的階段,最近有時間來好好研究了一下程式碼,因為以前是C++/Windows出生的,所以對Linux下面的Python開發不是很熟悉,本文適合一些已經使用過OpenStack並且想要初步瞭解程

神奇的 SQL 聯表細節 → MySQL JOIN 的執行過程

開心一刻   一頭母牛在吃草,突然一頭公牛從遠處狂奔而來說:“快跑啊!!樓主來了!”   母牛說:“樓主來了關我屁事啊?”   公牛急忙說:“樓主吹牛逼呀!”   母牛大驚,拔腿就跑,邊跑邊問:“你是公牛你怕什麼啊?&

精盡MyBatis原始碼分析 - SQL執行過程 StatementHandler

> 該系列文件是本人在學習 Mybatis 的原始碼過程中總結下來的,可能對讀者不太友好,請結合我的原始碼註釋([Mybatis原始碼分析 GitHub 地址](https://github.com/liu844869663/mybatis-3)、[Mybatis-Spring 原始碼分析 GitHub 地址

STLset具體解釋

基本操作 二叉樹 mono itl 自己 pair leading 左右子樹 ews 首先來看看set集合容器: set集合容器實現了紅黑樹的平衡二叉樹數據結構。在插入元素時它會自己主動調整二叉樹的排列,把該元素放到適當的位置,而且 保證左右子樹平衡。平衡二

性能測試從0到1的過程

sleep efi python腳本 size sel 應用服務器 完全 展示 服務   本人,從畢業開始接觸測試,但是性能方面一無所知。之前在第一份工作,測過安卓客戶端,當時寫過一個非常簡單的shell腳本,push到手機系統內,用於手機硬件信息。但是在服務端的性能方面,

LIVE555研究五:RTPServer

tpch live555 循環調用 family 每一個 函數 計算 ack close LIVE555研究之五:RTPServer(二) 接上文,main函數的幾行代碼創建了RTSPSe

springboot啟動過程1-初始化

好的 事件監聽 spa 兩個 包括 servlet 實例對象 ice 機制 1 springboot啟動時,[email protected]/* */函數,執行SpringApplication.run(DemoApplication.class, arg

jenkins實戰jenkins安裝部署

自動化運維 上一小節介紹了Jenkins安裝(Linux/uninx平臺),這節我們講講Jenkins界面操作(包括系統設置,工具安裝,插件管理,系統升級,安全設置等等操作); 登錄jenkins首頁,分別有以下選項欄,從左側看起,點擊Jenkins系統管理我們會看到右側list欄,內

JmeterBean shell使用

.get 方法 tro 邏輯 麻煩 str www title jar  上一篇Jmeter之Bean shell使用(一)簡單介紹了下Jmeter中的Bean shell,本文是對上文的一個補充,主要總結下常用的幾種場景和方法,相信這些基本可以涵蓋大部分的需求。本節內容如