JStorm 原始碼解析:supervisor 的啟動和執行機制
Supervisor 節點可以理解為單機任務排程器,它負責監聽 nimbus 節點的任務資源分配,啟動相應的 worker 程序執行 nimbus 分配給當前節點的任務,同時監測 worker 的執行狀態,一旦發現有 worker 執行異常,就會殺死該 worker 程序,並將原先分配給 worker 的任務交還給 nimbus 節點進行重新分配。
Supervisor 節點的啟動過程位於 Supervisor 類中,main 方法的實現比較簡單,主要就是建立了一個 Supervisor 類物件,並呼叫例項方法Supervisor#run
,該方法的實現如下:
public void run() { try { /* * 解析配置檔案: * 1. 解析 default.yaml * 2. 解析 storm.yaml * 3. 解析 -Dstorm.options 指定的命令列引數 * 4. 替換所有配置項中的 JSTORM_HOME 佔位符 */ Map<Object, Object> conf = Utils.readStormConfig(); // 確保當前為叢集執行模式 StormConfig.validate_distributed_mode(conf); // 建立程序檔案: ${storm.local.dir}/supervisor/pids/${pid} this.createPid(conf); // 建立並啟動 supervisor SupervisorManger supervisorManager = this.mkSupervisor(conf, null); JStormUtils.redirectOutput("/dev/null"); // 註冊 SupervisorManger,當 JVM 程序停止時執行 shutdown 邏輯 this.initShutdownHook(supervisorManager); // 迴圈監測 shutdown 方法是否執行完畢 while (!supervisorManager.isFinishShutdown()) { try { Thread.sleep(1000); } catch (InterruptedException ignored) { } } } // 省略 catch 程式碼塊 }
整個方法的邏輯比較清晰(如程式碼註釋),核心實現位於Supervisor#mkSupervisor
方法中,該方法主要用於建立和啟動 supervisor 節點,基本執行流程如下:
SyncSupervisorEvent#run()
Supervisor 節點在啟動時首先會在本地建立並清空臨時目錄(路徑:supervisor/tmp
),Supervisor 從 nimbus 節點下載下來的檔案會臨時存放在這裡,包括 stormcode.cer、stormconf.cer、stormjar.jar,以及 lib 目錄下面的檔案等,經過簡單處理之後會將其複製到stormdist/${topology_id}
本地目錄中,supervisor 本地檔案說明如下:
+ ${supervisor_local_dir} | ---- + supervisor | ---- | ---- + stormdist | ---- | ---- | ---- + ${topology_id} | ---- | ---- | ---- | ---- + resources: 指定 topology 程式包 resources 目錄下面的所有檔案 | ---- | ---- | ---- | ---- + stormjar.jar: 包含指定 topology 所有程式碼的 jar 檔案 | ---- | ---- | ---- | ---- + stormcode.ser: 包含指定 topology 物件的序列化檔案 | ---- | ---- | ---- | ---- + stormconf.ser: 包含指定 topology 的配置資訊檔案 | ---- | ---- + localstate: 本地狀態資訊 | ---- | ---- + tmp: 臨時目錄,從 nimbus 下載的檔案的臨時儲存目錄,簡單處理之後複製到 stormdist/${topology_id} | ---- | ---- | ---- + ${uuid} | ---- | ---- | ---- | ---- + stormjar.jar: 從 nimbus 節點下載下來的 jar 檔案 | ---- | ---- | ---- + ${topology_id} | ---- | ---- | ---- | ---- + stormjar.jar: 包含指定 topology 所有程式碼的 jar 檔案(從 inbox 目錄複製過來) | ---- | ---- | ---- | ---- + stormcode.ser: 包含指定 topology 物件的序列化檔案 | ---- | ---- | ---- | ---- + stormconf.ser: 包含指定 topology 的配置資訊檔案 | ---- + workers | ---- | ---- + ${worker_id} | ---- | ---- | ---- + pids | ---- | ---- | ---- | ---- + ${pid}: 指定 worker 程序 ID | ---- | ---- | ---- + heartbeats | ---- | ---- | ---- | ---- + ${worker_id}: 指定 worker 心跳資訊(心跳時間、worker 的程序 ID)
接下來 supervisor 會建立 StormClusterState 物件,用於操作 ZK 叢集,同時還會建立一個 WorkerReportError 類物件,用於上報 worker 的執行錯誤資料到 ZK,該類僅包含一個例項方法 report,用於執行上報邏輯。然後 supervisor 節點會建立一個 LocalState 物件用於儲存節點的狀態資訊,這是一個簡單、低效的鍵值儲存資料庫,每一次操作都會落盤,在這裡對應的落盤目錄是supervisor/localstate
。Supervisor 的 ID(UUID 字串) 就儲存在該資料庫中,supervisor 啟動時會先嚐試從本地狀態資訊物件中獲取 ID 值,如果不存在的話就會建立一個新的 UUID 字串作為 ID。
Supervisor 節點在啟動的過程中會初始化心跳機制,間隔指定時間將當前節點的相關資訊上報給 ZK(路徑:supervisors/${supervisor_id}
),包含當前 supervisor 節點的主機名、ID、最近一次上報時間、截止上次上報節點的執行時間,以及 worker 埠列表資訊。相關資訊的初始化在 Heartbeat 類物件例項化時進行設定,期間會依據當前機器 CPU 核心數和實體記憶體大小計算允許的 worker 埠數目,並預設從 6800 埠號開始分配 worker 埠。Supervisor 節點會啟動一個執行緒,預設每間隔 60 秒呼叫Heartbeat#update
方法同步心跳資訊到 ZK,該方法的實現如下:
public void update() { // 更新本次上報時間為當前時間(單位:秒) supervisorInfo.setTimeSecs(TimeUtils.current_time_secs()); // 更新截止目前節點的執行時間(單位:秒) supervisorInfo.setUptimeSecs(TimeUtils.current_time_secs() - startTime); // 依據具體配置和資源佔用,調整埠號列表 this.updateSupervisorInfo(); try { // 將 supervisor 資訊寫入 ZK:supervisors/${supervisor_id} stormClusterState.supervisor_heartbeat(supervisorId, supervisorInfo); } catch (Exception e) { LOG.error("Failed to update SupervisorInfo to ZK", e); } }
具體過程如程式碼註釋,下面是一個實際的心跳資訊示例:
{ "hostName": "10.38.164.192", "supervisorId": "980bbcfd-5438-4e25-aee9-bf411304a446", "timeSecs": 1533373753, "uptimeSecs": 2879598, "workerPorts": [ 6912, 6900, 6901, 6902, 6903, 6904, 6905, 6906, 6907, 6908, 6909, 6910, 6911 ] }
下面來重點看一下 supervisor 節點領取分配給當前節點的任務並啟動執行的過程。該過程的實現程式碼塊如下:
/* * 5. 啟動並定期執行 SyncSupervisorEvent#run() 方法(預設間隔 10 秒),從 nimbus 節點領取分配給當前節點的任務並啟動執行 */ ConcurrentHashMap<String, String> workerThreadPids = new ConcurrentHashMap<>(); SyncProcessEvent syncProcessEvent = new SyncProcessEvent( supervisorId, conf, localState, workerThreadPids, sharedContext, workerReportError, stormClusterState); EventManagerImp syncSupEventManager = new EventManagerImp(); AsyncLoopThread syncSupEventThread = new AsyncLoopThread(syncSupEventManager); threads.add(syncSupEventThread); SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent( supervisorId, conf, syncSupEventManager, stormClusterState, localState, syncProcessEvent, hb); // ${supervisor.monitor.frequency.secs},預設為 10 秒 int syncFrequency = JStormUtils.parseInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)); EventManagerPusher syncSupervisorPusher = new EventManagerPusher(syncSupEventManager, syncSupervisorEvent, syncFrequency); /* * 每間隔一段時間(預設為 10 秒)呼叫 EventManagerPusher#run(), * 本質上是呼叫 EventManagerImp#add(RunnableCallback) 將 syncSupervisorEvent 記錄到自己的阻塞佇列中, * 同時 EventManagerImp 也會迴圈消費阻塞佇列,取出其中的 syncSupervisorEvent,並應用其 run 方法:SyncSupervisorEvent#run() */ AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(syncSupervisorPusher); threads.add(syncSupervisorThread);
要理解該過程的執行機制,我們應該倒著來看相應的原始碼實現,首先看一下程式碼塊的倒數第二行:
AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(syncSupervisorPusher);
由前面我們對 storm 基本執行緒模型的分析可以知道,這行程式碼會啟動一個執行緒去迴圈執行入參回撥的 run 方法,這裡也就是EventManagerPusher#run
方法,該方法的實現比較簡單:
@Override public void run() { eventManager.add(event); }
也就是不斷的呼叫EventManager#add
方法(預設間隔時間為 10 秒),繼續往前看我們知道這裡的 EventManager 類實際實現是 EventManagerImp,而不斷的呼叫其 add 方法新增的 event 本質上就是一個 SyncSupervisorEvent 例項物件。EventManagerImp 維護了一個阻塞佇列來不斷記錄加入的 event,它本身也是一個回撥,再往前看我們就可以看到它在例項化時也被 AsyncLoopThread 啟動,EventManagerImp#run
方法實現如下:
public void run() { try { RunnableCallback r = queue.take(); if (r == null) { return; } r.run(); e = r.error(); this.processInc(); } catch (InterruptedException e) { LOG.info("Interrupted when processing event."); } }
該方法就是不斷的從阻塞佇列中取出相應的回撥並應用其 run 方法,也就是不斷應用SyncSupervisorEvent#run
方法。
以上就是步驟五的整體邏輯,簡單描述就是定期的往阻塞佇列中新增 SyncSupervisorEvent 事件,而執行緒會迴圈的消費佇列,取出事件並應用事件的 run 方法。下面來深入分析一下 SyncSupervisorEvent 的 run 方法,該方法所做的工作也就是 supervisor 的核心邏輯,主要可以概括為 3 點:
- 從 ZK 上下載任務分配資訊,並更新到本地
- 從 nimbus 節點上下載 topology 對應的 jar 和配置檔案
- 啟動 worker 執行分配給當前 supervisor 的 topology 任務
SyncSupervisorEvent#run
方法的實現比較長,下面按照執行步驟逐步拆分進行分析,首先來看一下從 ZK 上下載任務分配資訊,並更新到本地的過程,相應實現如下:
/* * 1.1. 同步所有 topology 的任務分配資訊及其版本資訊到本地 */ if (healthStatus.isMoreSeriousThan(HealthStatus.ERROR)) { // 檢查當前 supervisor 的狀態資訊,如果是 PANIC 或 ERROR,則清除所有本地的任務分配相關資訊 assignmentVersion.clear(); assignments.clear(); LOG.warn("Supervisor machine check status: " + healthStatus + ", killing all workers."); } else { // 同步所有 topology 的任務分配資訊及其版本(即更新 assignmentVersion 和 assignments 引數) this.getAllAssignments(assignmentVersion, assignments, syncCallback); } LOG.debug("Get all assignments " + assignments); /* * 1.2. 從 supervisor 本地(supervisor/stormdist/)獲取已經下載的所有的 topologyId */ List<String> downloadedTopologyIds = StormConfig.get_supervisor_toplogy_list(conf); LOG.debug("Downloaded storm ids: " + downloadedTopologyIds); /* * 1.3. 獲取分配給當前 supervisor 的任務資訊:<port, LocalAssignments> */ Map<Integer, LocalAssignment> zkAssignment = this.getLocalAssign(stormClusterState, supervisorId, assignments); /* * 1.4. 更新 supervisor 本地的任務分配資訊 */ Map<Integer, LocalAssignment> localAssignment; try { LOG.debug("Writing local assignment " + zkAssignment); localAssignment = (Map<Integer, LocalAssignment>) localState.get(Common.LS_LOCAL_ASSIGNMENTS); // local-assignments if (localAssignment == null) { localAssignment = new HashMap<>(); } localState.put(Common.LS_LOCAL_ASSIGNMENTS, zkAssignment); } catch (IOException e) { LOG.error("put LS_LOCAL_ASSIGNMENTS " + zkAssignment + " to localState failed"); throw e; }
Supervisor 節點在本地會快取任務分配資訊,同時會定期從 ZK 同步最新的任務分配資訊到本地,從 ZK 上獲取任務分配資訊的邏輯位於SyncSupervisorEvent#getAllAssignments
方法中,方法會從 ZK 的 assignments 路徑下獲取所有的 topologyId,並與本地比較對應 topology 的任務分配資訊版本,如果版本有更新則更新本地快取的任務分配資訊。
接下來 supervisor 會計算所有需要下載的 topology,包括需要更新的、需要重新下載的(之前下載有失敗),以及在當前節點進行灰度的,並從 nimbus 節點下載各個 topology 對應的檔案,包括 stormjar.jar、stormcode.ser、stormconf.ser,以及 lib 目錄下面的依賴檔案(如果存在的話),最後從本地刪除那些之前下載過但是本次未分配給當前 supervisor 節點的 topology 檔案,相應實現如下:
/* * 2.1. 獲取所有需要執行下載操作的 topology_id 集合(包括需要更新的、需要重新下載,以及在當前節點灰度的) */ Set<String> updateTopologies = this.getUpdateTopologies(localAssignment, zkAssignment, assignments); Set<String> reDownloadTopologies = this.getNeedReDownloadTopologies(localAssignment); if (reDownloadTopologies != null) { updateTopologies.addAll(reDownloadTopologies); } // 獲取灰度釋出且指定在當前 supervisor 的 topology:[topology_id, Pair(host, port)] Map<String, Set<Pair<String, Integer>>> upgradeTopologyPorts = this.getUpgradeTopologies(stormClusterState, localAssignment, zkAssignment); if (upgradeTopologyPorts.size() > 0) { LOG.info("upgrade topology ports:{}", upgradeTopologyPorts); updateTopologies.addAll(upgradeTopologyPorts.keySet()); } /* * 2.2. 從 nimbus 下載對應的 topology 任務程式碼 */ // 從 ZK 上獲取分配給當前 supervisor 的 [topologyId, master-code-dir] 資訊 Map<String, String> topologyCodes = getTopologyCodeLocations(assignments, supervisorId); // downloadFailedTopologyIds which can't finished download binary from nimbus Set<String> downloadFailedTopologyIds = new HashSet<>(); // 記錄所有下載失敗的 topologyId // 從 nimbus 下載相應的 topology jar 檔案到 supervisor 本地 this.downloadTopology(topologyCodes, downloadedTopologyIds, updateTopologies, assignments, downloadFailedTopologyIds); /* * 2.3. 刪除無用的 topology 相關檔案(之前下載過,但是本次未分配給當前 supervisor) */ this.removeUselessTopology(topologyCodes, downloadedTopologyIds);
檔案下載的邏輯位於SyncSupervisorEvent#downloadTopology
方法中,檔案下載的過程可以概括為以下 5 個步驟:
${storm.local.dir}/supervisor/tmp/${uuid} ${storm.local.dir}/supervisor/stormdist/${topology_id} ${storm.local.dir}/supervisor/stormdist/${topology_id}/timestamp
最後 supervisor 節點會呼叫SyncProcessEvent#run
方法殺死狀態異常的 worker,同時啟動新的 worker 執行分配的任務:
/* * 3. kill bad workers, start new workers */ syncProcesses.run(zkAssignment, downloadFailedTopologyIds, upgradeTopologyPorts); // SyncProcessEvent#run public void run(Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds, Map<String, Set<Pair<String, Integer>>> upgradeTopologyPorts) { LOG.debug("Syncing processes, interval (sec): " + TimeUtils.time_delta(lastTime)); lastTime = TimeUtils.current_time_secs(); try { if (localAssignments == null) { localAssignments = new HashMap<>(); } LOG.debug("Assigned tasks: " + localAssignments); /* * 3.1 獲取本地所有 worker 的狀態資訊:Map<worker_id [WorkerHeartbeat, state]> */ Map<String, StateHeartbeat> localWorkerStats; try { // Map[workerId, [worker heartbeat, state]] localWorkerStats = this.getLocalWorkerStats(conf, localState, localAssignments); } catch (Exception e) { LOG.error("Failed to get local worker stats"); throw e; } LOG.debug("Allocated: " + localWorkerStats); /* * 3.2 殺死無用的 worker,並從 localWorkerStats 中移除 */ Map<String, Integer> taskCleanupTimeoutMap; Set<Integer> keepPorts = null; try { // [topology_id, cleanup_second] taskCleanupTimeoutMap = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT); // task-cleanup-timeout // 對於一些狀態為 disallowed/timedOut 的 worker 進行 kill,並清空相應的資料,同時返可用的 worker port keepPorts = this.killUselessWorkers(localWorkerStats, localAssignments, taskCleanupTimeoutMap); localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, taskCleanupTimeoutMap); } catch (IOException e) { LOG.error("Failed to kill workers", e); } // 3.3 檢測 worker 是否正在啟動中,清空處於執行態和啟動失敗 worker 的相應資料(workerIdToStartTimeAndPort 和 portToWorkerId) this.checkNewWorkers(conf); // 3.4 標記需要重新下載的 topology(沒有啟動成功,同時下載時間已經超過 2 分鐘) this.checkNeedUpdateTopologies(localWorkerStats, localAssignments); // 3.5 啟動新的 worker 執行 topology 任務 this.startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds); // 3.6 啟動相應的 worker 執行在當前節點 灰度的 topology 任務 this.restartUpgradingWorkers(localAssignments, localWorkerStats, upgradeTopologyPorts); } catch (Exception e) { LOG.error("Failed to init SyncProcessEvent", e); } }
無論是新任務分配,還是灰度更新,啟動 worker 的過程都是呼叫了SyncProcessEvent#startWorkers
方法,該方法為每個新的 worker 基於 UUID 建立一個 workerId,以及程序目錄${storm.local.dir}/workers/${worker_id}/pids
,並呼叫SyncProcessEvent#doLaunchWorker
方法啟動 worker,同時更新 worker 在本地的相應資料。Worker 程序的啟動和執行機制將在下一篇中進行詳細說明。
在分析 nimbus 節點啟動過程中有一步會啟動一個 HTTP 服務,用於接收查詢 nimbus 節點本地日誌和配置等資料的需求,supervisor 節點的啟動過程也同樣包含這樣一個過程。Supervisor 的 HTTP 服務預設會監聽在 7622 埠,用於接收來自 UI 的請求。
最後對於叢集模式,如果配置了supervisor.enable.check=true
則 supervisor 節點在啟動時會建立一個執行緒用於定期檢查 supervisor 的執行狀況,另外還會啟動一個執行緒用於同步 nimbus 的配置資訊到本地節點。最後會建立並返回一個 SupervisorManger 類物件,用於對於當前 supervisor 節點進行管理。
到此,supervisor 節點基本啟動完成了,supervisor 會定期基於 ZK 從 nimbus 節點領取任務,然後啟動 worker 去執行任務,而啟動 worker 的過程我們將在下一篇中進行詳細分析。
(本篇完)
轉載宣告 : 版權所有,商業轉載請聯絡作者,非商業轉載請註明出處
ofollow,noindex">本部落格所有文章除特別宣告外,均採用 CC BY-NC-SA 4.0 許可協議