1. 程式人生 > >Storm Topology的生命週期——淺析(一)

Storm Topology的生命週期——淺析(一)

作為一名storm的初學者,首先應該瞭解的就是storm是如何部署提交一個topology的。也就是說,當我們運行了命令:”storm jar myjarpath mytopologyclass args”之後,storm又是如何做的呢。我查閱了Apache Storm的官方文件,在此翻譯整理一下,並寫一點自己的理解。
官方連結:Lifecycle of a Storm Topology

[譯]Storm Topology的生命週期

(NOTE:這篇文章基於storm0.7.1,之後的版本有了許多變化,比如tasks 和 executors直接的劃分,原始碼的路徑由之前的src/變為storm-core/src等。)

這篇文章詳細介紹了我們執行storm jar命令之後一個Topology的生命週期:上傳Topology到Nimbus,Supervisor啟動/停止workers,workers和tasks的自我建立。還有Nimbus是如何監控Topology的,當執行kill命令時Topology是如何關閉的。

幾個重要nodes:
1、真正執行的topology與使用者自定義的topology不同,因為其加入了acker bolt 和 Streams。
2、真正的topology由system-topology!方法建立。該方法在Nimbus為topology建立tasks時 和worker 路由訊息時使用。

1、啟動一個Topology

  • “storm jar”命令根據特定的引數執行你所提交的class。我們只確定”storm jar”命令通過StormSubmitter方法建立並設定了環境變數。
def jar(jarfile, klass, *args):
    """Syntax: [storm jar topology-jar-path class ...]
    Runs the main method of class with the specified arguments. 
    The storm jars and configs in ~/.storm are put on the classpath. 
    The process is configured so that StormSubmitter 
    (http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)
    will upload the jar at topology-jar-path when the topology is submitted.
    """
exec_storm_class( klass, jvmtype="-client", extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"], args=args, childopts="-Dstorm.jar=" + jarfile)
  • 當你使用StormSubmitter.submitTopology提交Topology時,StormSubmitter做了如下幾件事:
    • 首先,StormSubmitter通過Nimbus Thrift介面將jar包上傳到Nimbus(在之前沒有上傳過jar包的情況下)。
      • beginFileUPload方法返回Nimbus inbox的一個路徑。(這個路徑應該就是jar包上傳到nimbus的路徑。)
      • uploadChunk方法保證上傳速度為每次15KB。
      • finishFileUpload方法在上傳結束時執行。
    • 其次,StormSubmitter呼叫Nimbus Thrift介面的submitTopology方法。
      • Topology的配置使用JSON進行序列化。
      • submitTopology使用的Nimbus inbox path 就是jar被上傳到nimbus的位置。
//submitter upload the jar 
public static String submitJar(Map conf, String localJar) {
        NimbusClient client = NimbusClient.getConfiguredClient(conf);
        try {
            String uploadLocation = client.getClient().beginFileUpload();
            LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
            BufferFileInputStream is = new BufferFileInputStream(localJar);
            while(true) {
                byte[] toSubmit = is.read();
                if(toSubmit.length==0) break;
                client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
            }
            client.getClient().finishFileUpload(uploadLocation);
            LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
            return uploadLocation;
        } catch(Exception e) {
            throw new RuntimeException(e);            
        } finally {
            client.close();
        }
    }
  • Nimbus接收提交的Topology。code
  • Nimbus規範化Topology的配置,其主要目的是確保每個單獨的task都有相同的序列化註冊碼,這對於獲得正確的序列化工作至關重要。code
  • Nimbus為topology設定靜態狀態。code
    • Jars和配置檔案configs都儲存在Nimbus本地檔案系統上(對於zookeeper來說太大了)。Jars和configs被拷貝到 {nimbus local dir}/stormdist/{topology id}路徑下。
    • setup-storm-static方法將 task -> component 的對映寫入ZK。
    • setup-heartbeats在ZK上建立一個目錄用於task的心跳反應。
  • Nimbus呼叫mk-assignment方法將tasks分配給節點。該方法包含:
  • master-code-dir:supervisor呼叫,以從Nimbus下載到Topology的jars/configs。
  • task->node+port:首先它是一個Map,從 task id 到 worker(該task應該執行)。一個worker由node/port定義,node指定哪臺節點,port指定哪個程序。
  • node->host:一個Map,從 node id 到 hostname。使得worker知道需要通訊的其他worker在哪臺節點上。Node id 用於區分supervisor,因此一個或多個supervisor可以執行在一臺節點上。
  • task->start-time-secs:包含了一個Map,從 task id 到 timestamp(該時間戳是Nimbus釋出這個task的時間)。這個被Nimbus用來監控topologies,在tasks第一次被啟動時,它們被指定了較長的超時時間,該超時時間用於心跳檢測,並且可以由 “nimbus.task.launch.secs”定義。
;;mk-assignment方法
(defnk mk-assignments [nimbus storm-id :scratch? false]
  (log-debug "Determining assignment for " storm-id)
  (let [conf (:conf nimbus)
        storm-cluster-state (:storm-cluster-state nimbus)
        callback (fn [& ignored] (transition! nimbus storm-id :monitor))
        node->host (get-node->host storm-cluster-state callback)

        existing-assignment (.assignment-info storm-cluster-state storm-id nil)
        task->node+port (compute-new-task->node+port conf storm-id existing-assignment
                                                     storm-cluster-state callback
                                                     (:task-heartbeats-cache nimbus)
                                                     scratch?)

        all-node->host (merge (:node->host existing-assignment) node->host)
        reassign-ids (changed-ids (:task->node+port existing-assignment) task->node+port)
        now-secs (current-time-secs)
        start-times (merge (:task->start-time-secs existing-assignment)
                           (into {}
                             (for [id reassign-ids]
                               [id now-secs]
                               )))

        assignment (Assignment.
                    (master-stormdist-root conf storm-id)
                    (select-keys all-node->host (map first (vals task->node+port)))
                    task->node+port
                    start-times
                    )
        ]
    ;; tasks figure out what tasks to talk to by looking at topology at runtime
    ;; only log/set when there's been a change to the assignment
    (if (= existing-assignment assignment)
      (log-debug "Assignment for " storm-id " hasn't changed")
      (do
        (log-message "Setting new assignment for storm id " storm-id ": " (pr-str assignment))
        (.set-assignment! storm-cluster-state storm-id assignment)
        ))
    ))
  • 一旦Topology被分配好了,它們便處於deactivated模式。start-storm將資料(什麼資料我還沒看)寫到ZK上,從而叢集知道該Topology已經處於active狀態了,便開始從spout連續不斷的發射tuples。

(defn- start-storm [storm-name storm-cluster-state storm-id]
  (log-message "Activating " storm-name ": " storm-id)
  (.activate-storm! storm-cluster-state
                    storm-id
                    (StormBase. storm-name
                                (current-time-secs)
                                {:type :active})))
  • TODO叢集狀態圖(展示了所有的節點和它所包含的元件)。
  • Supervisor在後臺運行了兩個方法:
    • synchronize-supervisor:這個方法在ZK上的任務分配改變時呼叫,或者沒10s呼叫一次。code
      • 從Nimbus下載分配給該節點的topology的程式碼(這些程式碼之前沒有)。code
      • 將這臺節點應該執行的任務寫到本地檔案系統中。它維護了一個map:port -> LocalAssignment。LocalAssignment 包含了 一個topology id 和該 worker 的 task id 列表。code
    • sync-processes:從LFS中讀取synchronize-supervisor寫入的東西並進行比較,得到真正執行在這臺節點上的任務。然後,必要時它將啟動/關閉worker程序來進行同步。code
  • Worker 通過mk-worker方法啟動。code

    • worker連線其他worker 並啟動一個監護執行緒來檢測變化。因此如果一個worker被重新部署,該worker可以自動重連到其他worker新的位置。code
    • Monitors不論topology是否活躍都將其狀態儲存到storm-active-atom變數。tasks使用該變數來判斷是否呼叫spout的nextTuple方法。code
    • worker(程序)啟動其實際tasks(執行緒)。code
  • Tasks通過mk-task方法啟動。code

    • Tasks啟動路由方法,接收一個stream和(上游)發出的tuple,並且返回一個task id 列表(該列表是發出的tuple的目的tasks)。code
    • Tasks建立真正的spout或者bolt。code

2、Topology的監控

  • Nimbus在topology的整個生命週期內都對其進行監控。
    • 建立一個迴圈執行的計時器執行緒(schedule-recurring)來監控topology。
    • Nimbus的行為表示成一個有限狀態機。code
    • Topology中的”monitor”事件每隔”nimbus.monitor.freq.secs”呼叫一次,該監控器通過reassign-transition方法呼叫reassign-topology方法。code
    • reassign-topology呼叫 mk-assignments方法來(逐步遞增地)更新topology。
      • mk-assignments檢測heartbeats 並且在必要時重新部署workers。
      • 任何重新部署後的改變都將改變ZK中的狀態資訊,從而觸發supervisor的同步工作,並啟動/停止workers。
 (schedule-recurring (:timer nimbus)
                        0
                        (conf NIMBUS-MONITOR-FREQ-SECS)
                        (fn []
                          (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
                            (transition! nimbus storm-id :monitor))
                          (do-cleanup nimbus)
                          ))

3、(Kill)結束一個Topology

  • “strom kill”命令通過Nimbus Thrift介面呼叫方法來kill一個topology。
(defn -main [& args]
  (with-command-line args
    "Kill a topology"
    [[wait w "Override the amount of time to wait after deactivating before killing" nil]
     posargs]    
    (let [name (first posargs)
          opts (KillOptions.)]
      (if wait (.set_wait_secs opts (Integer/parseInt wait)))
      (with-configured-nimbus-connection nimbus
        (.killTopologyWithOpts nimbus name opts)
        (log-message "Killed topology: " name)
        ))))
  • Nimbus 接收 kill命令。code
  • Nimbus將該topology標記為kill transition狀態(個人理解)。code
  • kill-transition方法將topology的狀態改為killed,並且呼叫remove事件執行”wait time seconds”。code
  • wait time 預設和 topology message timeout一樣,也可以通過storm kill 命令中的引數 -w來修改。
  • 該方法使得topology處於未啟用狀態(在wait time 之前),wait time結束後才真正關閉。這使得topology在真正結束之前有一段時間來處理目前正在進行的工作。
  • 在 kill transition過程中改變狀態保證來kill協議是容錯的。在啟動時,如果一個topology的狀態是”killed”,那麼nimbus將會呼叫remove event來清除該topology。code

  • 移除一個Topology時會清除儲存在ZK上的分配和狀態資訊。code

  • 一個單獨cleanup的執行緒執行 do-cleanup方法,該方法會清除本地的 heartbeat目錄 和 儲存jars/configs目錄。code