JStorm 原始碼解析:拓撲任務的資源分配過程
上一篇我們分析了 topology 構建和提交過程在客戶端的邏輯,並最終通過submitTopology
方法向 storm 叢集的 nimbus 節點提交任務。Nimbus 以 Thrift RPC 服務的方式執行,相應 thrift 介面方法實現位於 ServiceHandler 類中,下面我們從ServiceHandler#submitTopology
方法切入,分析 nimbus 節點之於客戶端提交任務的資源分配過程,該方法包裝了ServiceHandler#submitTopologyWithOpts
方法。
Storm 叢集的任務提交主要分為三種類型:新任務提交、熱部署,以及灰度釋出。ServiceHandler#submitTopologyWithOpts
方法統一處理這三種情況,但是不管哪種提交方式都會首先驗證 topology 名稱和配置的合法性,然後基於具體提交型別分而治之。
灰度釋出 & 熱部署
首先來看灰度釋出的情況,當客戶端請求灰度釋出時,nimbus 節點會檢查對應 topology 在服務端的執行情況,只有狀態為 ACTIVE 時才允許執行灰度釋出。灰度釋出的相關實現如下:
// 獲取指定 topology 的執行資料 TopologyInfo topologyInfo = this.getTopologyInfo(topologyId); if (topologyInfo == null) { throw new TException("Failed to get topology info"); } // 獲取指定的 worker 數目:${topology.upgrade.worker.num} int workerNum = ConfigExtension.getUpgradeWorkerNum(serializedConf); // 獲取指定的元件名稱:${topology.upgrade.component} String component = ConfigExtension.getUpgradeComponent(serializedConf); // 獲取指定的 worker 列表:${topology.upgrade.workers} Set<String> workers = ConfigExtension.getUpgradeWorkers(serializedConf); // 判定 topology master 是不是使用獨立的 worker if (!ConfigExtension.isTmSingleWorker(serializedConf, topologyInfo.get_topology().get_numWorkers())) { throw new TException("Gray upgrade requires that topology master to be a single worker, cannot perform the upgrade!"); } // 灰度釋出 return this.grayUpgrade(topologyId, uploadedJarLocation, topology, serializedConf, component, workers, workerNum);
對於允許灰度釋出的場景,storm 會基於當前提交 topology 的配置首先會嘗試獲取以下 3 個引數用於挑選 worker 進行釋出:
- topology.upgrade.worker.num
- topology.upgrade.component
- topology.upgrade.workers
如果同時指定了多個引數,方法會基於一定的優先順序進行決策,具體如下:
-
如果引數
topology.upgrade.workers
不為空則忽略其他引數,挑選指定的 worker 進行釋出,需要注意的是這些 worker 釋出完之後,這個引數就自動置空 -
否則檢視引數
topology.upgrade.component
是否為空,如果不為空還需要檢視引數topology.upgrade.worker.num
是否為 0, 如果不為 0 則挑選指定工作元件下topology.upgrade.worker.num
指定數目的 worker 進行釋出,否則對這些工作元件下所有 worker 進行釋出 -
如果上面兩個都為空,則隨機挑選
topology.upgrade.worker.num
個 worker 進行釋出
灰度釋出的具體執行流程位於ServiceHandler#grayUpgrade
方法中,該方法實現比較冗長,故不在此貼出,下面參考原始碼和官網文件對釋出的過程進行說明:
-
方法首先嚐試從 ZK 獲取當前 topology 對應的基本資訊(路徑:
/topology/${topology_id}
)和灰度釋出資訊(路徑:/gray_upgrade/${topology_id}
),以及任務分配資訊(路徑:assignments/${topology_id}
)。 - 如果存在灰度釋出資訊,則判斷對應的灰度狀態(已過期 / 已完成 / 進行中),如果正在灰度中則拒絕本次灰度請求,否則(包含不存在灰度釋出資訊的情況)繼續執行灰度釋出。
-
方法利用 GrayUpgradeConfig 物件封裝灰度釋出資訊,並寫入到 ZK 的
/gray_upgrade/${topology_id}
路徑下,同時設定config.continueUpgrading=true
。 -
Topology Master 有一個執行緒 GrayUpgradeHandler 會定時讀取該節點的配置,檢測到有灰度釋出配置且
continueUpgrading=true
時,將分配指定數目的 worker,新增到 ZK 的/gray_upgrade/${topology_id}/upgrading_workers
路徑下,並設定continueUpgrading=false
(防止自動進行後續的灰度釋出)。 - SyncSupervisorEvent 會定時檢查每個拓撲的 upgrading_workers 節點,一旦有資料就和自身的 IP 和埠列表進行對比,如果有屬於該 supervisor 節點的灰度釋出就下載最新的 storm-code 和 storm-jar,然後重啟 worker,同時將 worker 新增到 ZK 的 upgraded_workers 節點下。
- GrayUpgradeHandler 檢測 ZK,如果 upgraded_workers 的 worker 數大於等於當前總 worker 數減 1(topology master 元件佔用),則認為此次灰度釋出已經完成,刪除 ZK 上的灰度釋出配置、upgrading_workers,以及 upgraded_workers。
如果只想升級部分 worker 或特定元件,可以用 complete_upgrade 強制完成升級。灰度釋出過程中使用單獨的 upgrading_workers 和 upgraded_workers 的設計主要是為了避免同步問題。如果將這些資訊寫在 GrayUpgradeConfig 類中可能會涉及到多個 supervisor 節點同時更新 workers 的情況,而使用單獨的節點則只需要在這個節點下新增和刪除子節點,不會有同步問題。
熱部署和灰度釋出從形式上來看都是對執行中 topology 的更新替換操作,但是對於 nimbus 來說,在處理上卻是兩條不同的分支,實際上熱部署與新任務提交在處理過程上更加形似,畢竟熱部署的過程就是殺死處於執行中的 topology 然後執行新任務提交的過程,所以接下來我們主要分析新任務的排程細節。
新任務提交
對於新提交的任務來說,storm 會為該 topology 執行一些準備和驗證工作,並在 ZK 上建立相應的結點記錄該 topology 的元資料和任務分配資訊,然後為該 topology 生成一個事件提交給任務分配佇列等待 nimbus 節點為當前 topology 制定執行方案,並在執行成功後傳送相應的通知事件。相關實現如下:
// 對當前 topology 配置進行規範化,並附加一些必要的配置 Map<Object, Object> stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology); LOG.info("Normalized configuration:" + stormConf); // 合併叢集配置和拓撲配置 Map<Object, Object> totalStormConf = new HashMap<>(conf); totalStormConf.putAll(stormConf); // 確定 topology 中各個元件的並行度,保證不超過當前 topology 允許的最大值 StormTopology normalizedTopology = NimbusUtils.normalizeTopology(stormConf, topology, true); /* * 驗證 topology 的基本結構資訊: * 1. 驗證 topologyName,元件 ID 是否合法 * 2. 驗證是否存在缺失 input 宣告的 spout * 3. 驗證 woker 和 acker 數目引數配置 */ Common.validate_basic(normalizedTopology, totalStormConf, topologyId); StormClusterState stormClusterState = data.getStormClusterState(); // 建立 /local-dir/nimbus/${topology_id}/xxxx 檔案,並將元資料同步到 ZK this.setupStormCode(topologyId, uploadedJarLocation, stormConf, normalizedTopology, false); // wait for blob replication before activate topology this.waitForDesiredCodeReplication(conf, topologyId); // generate TaskInfo for every bolt or spout in ZK : /ZK/tasks/topoologyId/xxx // 為當前 topology 在 ZK 上生成 task 資訊:/tasks/${topology_id} this.setupZkTaskInfo(conf, topologyId, stormClusterState); // mkdir topology error directory : taskerrors/${topology_id} String path = Cluster.taskerror_storm_root(topologyId); stormClusterState.mkdir(path); String grayUpgradeBasePath = Cluster.gray_upgrade_base_path(topologyId); // gray_upgrade/${topology_id} stormClusterState.mkdir(grayUpgradeBasePath); // gray_upgrade/${topology_id}/upgraded_workers stormClusterState.mkdir(Cluster.gray_upgrade_upgraded_workers_path(topologyId)); // gray_upgrade/${topology_id}/upgrading_workers stormClusterState.mkdir(Cluster.gray_upgrade_upgrading_workers_path(topologyId)); // 為當前 topology 執行任務分配 LOG.info("Submit topology {} with conf {}", topologyName, serializedConf); this.makeAssignment(topologyName, topologyId, options.get_initial_status()); // push start event after startup double metricsSampleRate = ConfigExtension.getMetricSampleRate(stormConf); // ${topology.metric.sample.rate},預設是 0.05 StartTopologyEvent.pushEvent(topologyId, metricsSampleRate); this.notifyTopologyActionListener(topologyName, "submitTopology");
下面對原始碼中涉及到的相關流程進行進一步分析。對於新提交的任務首先會執行一些準備工作,包括:
- 規範化 topology 的配置資訊
- 確定 topology 各個元件的並行度,保證不超過允許的最大值
- 驗證 topology 的基本結構資訊(topology 名稱和元件 ID 的合法性、spout 是否缺失 input 宣告,以及驗證 worker 和 acker 的引數配置等)
然後 storm 會建立或更新當前 topology 物件的序列化檔案(stormcode.ser)和配置資訊檔案(stormconf.ser)到 blobstore 中,如果是採用 nimbus 本地模式儲存,還需要將對應的元資料寫入 ZK 來保證資料一致性。
接下來會為當前 topology 生成 task 資訊,並記錄到 ZK 上(路徑:/tasks/${topology_id}
),對於一個 topology 的同一個元件來說,如果並行度大於 1,那麼 storm 會為其建立對應數量的 task,並保證 taskId 是連續的,相應實現位於ServiceHandler#setupZkTaskInfo
方法中:
public void setupZkTaskInfo(Map<Object, Object> conf, String topologyId, StormClusterState stormClusterState) throws Exception { // 為當前 topology 追加系統元件,同時基於並行度建立元件對應的 task 資訊,同一個元件的多個 task 資訊具備連續的 ID Map<Integer, TaskInfo> taskToTaskInfo = this.mkTaskComponentAssignments(conf, topologyId); // 獲取 topology master 的 ID (這裡使用的是其對應的 task ID) int masterId = NimbusUtils.getTopologyMasterId(taskToTaskInfo); TopologyTaskHbInfo topoTaskHbInfo = new TopologyTaskHbInfo(topologyId, masterId); data.getTasksHeartbeat().put(topologyId, topoTaskHbInfo); // 建立 /ZK/taskbeats/${topology_id},並寫入 topologyId 和 topologyMasterId stormClusterState.topology_heartbeat(topologyId, topoTaskHbInfo); if (taskToTaskInfo == null || taskToTaskInfo.size() == 0) { throw new InvalidTopologyException("Failed to generate TaskIDs map"); } // key is task id, value is task info // 記錄 task 資訊到 ZK : /ZK/tasks/${topology_id} stormClusterState.set_task(topologyId, taskToTaskInfo); }
該方法主要做了 3 件事情:
- 為當前 topology 追加系統元件(acker-bolt、master-bolt,以及 system-bolt)
- 為當前 topology 生成 task 分配資訊,並記錄到 ZK 相應結點
- 為當前 topology 在 ZK 上建立對應的 task 心跳記錄檔案
其中 1 和 2 位於ServiceHandler#mkTaskComponentAssignments
方法中:
public Map<Integer, TaskInfo> mkTaskComponentAssignments(Map<Object, Object> conf, String topologyId) throws IOException, InvalidTopologyException, KeyNotFoundException { // 從 blobstore 中獲取當前 topology 的配置資訊 Map<Object, Object> stormConf = StormConfig.read_nimbus_topology_conf(topologyId, data.getBlobStore()); // 從 blobstore 中獲取當前 topology 的 StormTopology 物件 StormTopology rawTopology = StormConfig.read_nimbus_topology_code(topologyId, data.getBlobStore()); // 追加一些系統元件到當前 topology 中 StormTopology topology = Common.system_topology(stormConf, rawTopology); // 為當前 topology 生成 task 資訊,key 是 taskId return Common.mkTaskInfo(stormConf, topology, topologyId); }
方法首先會從 blobstore 中獲取 topology 的配置資訊和 StormTopology 物件,然後呼叫Common#system_topology
方法新增一些系統元件,包括 acker-bolt、master-bolt,以及 system-bolt 等。
然後呼叫Common#mkTaskInfo
方法為當前 topology 中的各個元件生成 task 分配資訊。方法實現比較簡單,返回的結果是一個 map 型別,其中 key 是 taskId,對於同一個元件來說為其分配的 taskId 是連續的,value 是對應的 TaskInfo 物件,包含兩個欄位:componentId 和 componentType。前者對應系統元件 ID 和使用者自定義元件 ID,後者對應元件型別,也就是 bolt 和 spout。
完成了 topology 元件 task 分配資訊的建立,接下來方法為當前任務建立對應的 TopologyAssignEvent 事件物件,並將事件新增到佇列中,等待叢集為其分配資源。這一過程位於ServiceHandler#makeAssignment
方法中,等待的過程採用了 CountDownLatch 機制,count 值設定為 1,並設定 5 分鐘上限等待叢集分配資源,超時則返回 false 表示本次任務提交失敗。
佇列的維護和消費過程位於 TopologyAssign 類中,該類實現了 Runnable 介面,並以單例的形式對外提供服務。Nimbus 節點在啟動的時候會建立並初始化 TopologyAssign 物件,並以守護執行緒的方式啟動佇列的消費過程。執行緒的 run 方法會迴圈的從佇列頭部以阻塞的方式獲取對應的 TopologyAssignEvent 事件物件,並呼叫TopologyAssign#doTopologyAssignment
方法為相應的 topology 建立任務分配資訊(Assignment 物件)和基本執行資訊(StormBase 物件),並將任務分配資訊和基本執行資訊寫入 ZK,其中關鍵的資源分配過程位於TopologyAssign#mkAssignment
方法中,實現如下:
public Assignment mkAssignment(TopologyAssignEvent event) throws Exception { String topologyId = event.getTopologyId(); LOG.info("Determining assignment for " + topologyId); // 1. 基於配置和當前叢集執行狀態建立 topology 任務分配的上下文資訊 TopologyAssignContext context = this.prepareTopologyAssign(event); // 2. 依據當前的執行模式基於對應節點負載為當前 topology 中的 task 分配 worker Set<ResourceWorkerSlot> assignments; if (!StormConfig.local_mode(nimbusData.getConf())) { // 叢集模式,獲取模式的排程器 ITopologyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME); // DefaultTopologyScheduler // 為當前 topology 中的 task 分配 worker assignments = scheduler.assignTasks(context); } else { // 本地模式 assignments = mkLocalAssignment(context); } // 3. 記錄任務分配資訊到 ZK: assignments/${topology_id} Assignment assignment = null; if (assignments != null && assignments.size() > 0) { // 獲取服務中的 supervisorId 及其 hostname 對映資訊 Map<String, String> nodeHost = getTopologyNodeHost(context.getCluster(), context.getOldAssignment(), assignments); // 獲取 task 的啟動時間:<taskId, start_second> Map<Integer, Integer> startTimes = getTaskStartTimes( context, nimbusData, topologyId, context.getOldAssignment(), assignments); String codeDir = (String) nimbusData.getConf().get(Config.STORM_LOCAL_DIR); assignment = new Assignment(codeDir, assignments, nodeHost, startTimes); //the topology binary changed. if (event.isScaleTopology()) { assignment.setAssignmentType(Assignment.AssignmentType.ScaleTopology); } StormClusterState stormClusterState = nimbusData.getStormClusterState(); // 寫入 assignment 資訊到 ZK: assignments/${topology_id} stormClusterState.set_assignment(topologyId, assignment); // update task heartbeat's start time NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId); NimbusUtils.updateTopologyTaskTimeout(nimbusData, topologyId); LOG.info("Successfully make assignment for topology id " + topologyId + ": " + assignment); } return assignment; }
TopologyAssign#mkAssignment
方法主要做了下面三件事情:
- 基於配置和當前叢集執行狀態為當前 topology 建立任務分配的上下文資訊
- 依據當前的執行模式基於對應節點負載為當前 topology 中的 task 分配 worker
- 將 topology 的任務分配資訊 Assignment 物件記錄到 ZK 相應節點上
方法一開始會為當前 topology 建立任務分配的上下文資訊 TopologyAssignContext 物件,該物件主要包含一下資訊:
- 當前 topology 的 topologyId 和 topologyMasterId
- 當前 topology 對應的 StormTopology 物件
- 配置資訊,包括 nimbus 節點配置和 topology 配置
- 當前叢集可用的 supervisor 節點資訊(不包含位於黑名單中的和已經死亡的)
- 當前 topology 範圍內所有 taskId 與其對應的元件 ID 之間的對映關係
- 當前 topology 範圍內所有 task 的狀態資訊
- 其他資訊,包括任務分配型別、老的任務分配資訊、是否是 reassign,以及未停止的 worker 列表等
完成任務分配的上下文資訊建立之後,storm 會基於該資訊為當前 topology 分配 worker,叢集模式下該過程的實現位於DefaultTopologyScheduler#assignTasks
方法中,該方法會先計算需要分配的 worker 數目,然後分別為每個 worker 分配對應的 supervisor 節點,最後為 topology 範圍內所有元件(包括系統元件)的 task 分配對應的 worker 程序。下面先來看一下 為 worker 分配 supervisor 節點的過程:
public List<ResourceWorkerSlot> getAvailableWorkers( DefaultTopologyAssignContext context, Set<Integer> needAssign, int allocWorkerNum) { // 1. 計算需要分配的 worker 數目 int reserveWorkers = context.getReserveWorkerNum(); // 需要保留的 worker 數目 int workersNum = this.getAvailableWorkersNum(context); // 當前叢集總的可用的 worker 數目 if ((workersNum - reserveWorkers) < allocWorkerNum) { // 沒有足夠的 worker 可以分配:可用 worker 數目 - 保留的 worker 數目 < 需要分配的數目 throw new FailedAssignTopologyException("there's no enough worker. allocWorkerNum=" + allocWorkerNum + ", availableWorkerNum=" + workersNum + ",reserveWorkerNum=" + reserveWorkers); } workersNum = allocWorkerNum; // 記錄分配到的 worker List<ResourceWorkerSlot> assignedWorkers = new ArrayList<>(); // 2. 分配 worker // 2.1 處理使用者自定義分配的情況 // 從 needAssign 中移除已經分配的 task,並記錄分配的 worker 到 assignedWorkers 中 this.getRightWorkers(context, needAssign, assignedWorkers, workersNum, // 獲取使用者自定義分配 worker slot 資訊,排除狀態為 unstopped 的 worker this.getUserDefineWorkers(context, ConfigExtension.getUserDefineAssignment(context.getStormConf()))); if (ConfigExtension.isUseOldAssignment(context.getStormConf())) { // 2.2 如果配置指定要複用舊的分配,則優先從舊的分配中選出合適的 worker this.getRightWorkers(context, needAssign, assignedWorkers, workersNum, context.getOldWorkers()); } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && !context.isReassign()) { // 2.3 如果是 rebalance 任務分配型別,且可以複用原來的 worker 則將原來分配的 worker 記錄下來 int cnt = 0; for (ResourceWorkerSlot worker : context.getOldWorkers()) { if (cnt < workersNum) { ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot(); resFreeWorker.setPort(worker.getPort()); resFreeWorker.setHostname(worker.getHostname()); resFreeWorker.setNodeId(worker.getNodeId()); assignedWorkers.add(resFreeWorker); cnt++; } else { break; } } } LOG.info("Get workers from user define and old assignments: " + assignedWorkers); int restWorkerNum = workersNum - assignedWorkers.size(); // 還需要分配的 worker 數目 if (restWorkerNum < 0) { throw new FailedAssignTopologyException( "Too many workers are required for user define or old assignments. " + "workersNum=" + workersNum + ", assignedWorkersNum=" + assignedWorkers.size()); } // 2.4 對於剩下需要的 worker,直接新增 ResourceWorkerSlot 例項物件 for (int i = 0; i < restWorkerNum; i++) { assignedWorkers.add(new ResourceWorkerSlot()); } /* * 3. 遍歷將 worker 分配給相應的 supervisor * - 如果 worker 指定了 supervisor,則優先分配給指定 supervisor * - 依據 supervisor 的負載情況優先選擇負載較低的進行分配 */ List<SupervisorInfo> isolationSupervisors = this.getIsolationSupervisors(context); if (isolationSupervisors.size() != 0) { this.putAllWorkerToSupervisor(assignedWorkers, this.getResAvailSupervisors(isolationSupervisors)); } else { // 為 worker 分配對應的 supervisor this.putAllWorkerToSupervisor(assignedWorkers, this.getResAvailSupervisors(context.getCluster())); } this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers); LOG.info("Assigned workers=" + assignedWorkers); return assignedWorkers; }
為 worker 分配 supervisor 節點的過程可以概括為:
- 計算需要分配的 worker 數目,如果可用的 worker 數目不滿足要求則會丟擲異常
-
為需要分配的 worker 建立 ResourceWorkerSlot 分配單元資訊,主要分為四種情況:
- 使用者自定義 worker slot 分配
- 配置指定複用舊的分配資訊則優先從舊的分配中選出合適的 worker slot
- 對於 rebalance 任務分配型別,如果允許則複用原來的 worker slot
- 剩餘情況,建立新的 work slot
- 為 worker 分配相應的 supervisor 節點
下面主要來看一下步驟 3,相應實現位於WorkerScheduler#putAllWorkerToSupervisor
方法中:
private void putAllWorkerToSupervisor(List<ResourceWorkerSlot> assignedWorkers, List<SupervisorInfo> supervisors) { // 遍歷處理 worker,如果指定了 supervisor,且 supervisor 存在空閒埠,則將其分配給該 supervisor for (ResourceWorkerSlot worker : assignedWorkers) { if (worker.getHostname() != null) { for (SupervisorInfo supervisor : supervisors) { // 如果當前 worker 對應的 hostname 是該 supervisor,且 supervisor 存在空閒的 worker if (NetWorkUtils.equals(supervisor.getHostName(), worker.getHostname()) && supervisor.getAvailableWorkerPorts().size() > 0) { /* * 基於當前 supervisor 資訊更新對應的 worker 資訊: *1. 保證 worker 對應的埠號是當前 supervisor 空閒的,否則選一個 supervisor 空閒的給 worker *2. 設定 worker 對應的 nodeId 為當前 supervisor 的 ID */ this.putWorkerToSupervisor(supervisor, worker); break; } } } } // 更新 supervisor 列表,移除沒有空閒埠的 supervisor supervisors = this.getResAvailSupervisors(supervisors); // 對 supervisor 按照空閒埠數由大到小排序 Collections.sort(supervisors, new Comparator<SupervisorInfo>() { @Override public int compare(SupervisorInfo o1, SupervisorInfo o2) { return -NumberUtils.compare(o1.getAvailableWorkerPorts().size(), o2.getAvailableWorkerPorts().size()); } }); /* * 按照 supervisor 的負載對 worker 進行分配: * 1. 優先選擇負載較低的 supervisor 進分配 * 2. 如果 supervisor 都已經過載但還有未分配的 worker,則從過載 supervisor 優先選擇空閒埠較多的進行分配 */ this.putWorkerToSupervisor(assignedWorkers, supervisors); }
如果 worker 指定了 supervisor 節點,則會將其分配給對應的 supervisor,對於剩餘的 worker 來說會考慮 supervisor 節點的負載進行分配,以保證叢集中 supervisor 負載的均衡性。Storm 依據叢集中 supervisor 節點的平均空閒埠數作為標準來衡量 supervisor 節點的負載,如果一個 supervisor 節點的空閒埠數小於該值則認為該 supervisor 過載。叢集負載均衡性的保證主要參考以下兩個規則:
- 優先選擇負載較低的 supervisor 節點進分配
- 如果 supervisor 節點都處於過載狀態,但還有未分配的 worker,則從過載 supervisor 節點中優先選擇空閒埠較多的節點進行分配
再來看一下為 task 分配 worker 程序的過程,實現位於TaskScheduler#assign
方法中,該方法按照元件的類別分先後對 task 進行 worker 分配,順序如下:
task.on.differ.node=true
方法實現如下:
public List<ResourceWorkerSlot> assign() { if (tasks.size() == 0) { // 沒有需要再分配的任務 assignments.addAll(this.getRestAssignedWorkers()); return assignments; } // 1. 處理設定了 task.on.differ.node=true 的元件,為其在不同 supervisor 節點上分配 worker Set<Integer> assignedTasks = this.assignForDifferNodeTask(); // 2. 為剩餘 task 分配 worker,不包含系統元件 tasks.removeAll(assignedTasks); Map<Integer, String> systemTasks = new HashMap<>(); for (Integer task : tasks) { String name = context.getTaskToComponent().get(task); if (Common.isSystemComponent(name)) { systemTasks.put(task, name); continue; } this.assignForTask(name, task); } // 3. 為系統元件 task 分配 worker, e.g. acker, topology master... for (Entry<Integer, String> entry : systemTasks.entrySet()) { this.assignForTask(entry.getValue(), entry.getKey()); } // 記錄所有分配了 task 的 worker 集合 assignments.addAll(this.getRestAssignedWorkers()); return assignments; }
對於設定了task.on.differ.node=true
的元件,要求名下的 task 需要執行在不同的 supervisor 節點上,所以需要優先進行分配,否則如果一些 supervisor 因為配額已滿從資源池移除之後,很可能導致沒有足夠多的 supervisor 節點來滿足此類元件的 task 分配需求。
對於這三類元件的 task 分配過程基本過程類似,基本流程可以概括如下:
- 基於多重選擇器為當前 task 選擇最優 worker 進行分配
- 將 task 加入到被分配 worker 的 task 列表,並更新 worker 持有的 task 數目
- 檢查當前 worker 分配的 task 數目,如果配額已滿則將其從資源池移除,不再分配新的 task
- 更新 task 所屬元件分配在指定 worker 上的 task 數目
完成了 task 到 worker,以及 worker 到 supervisor 的配置關係,也就相當於完成了對當前 topology 的任務分配過程,緊接著 storm 會將任務分配資訊記錄到 ZK 對應的任務分配路徑下面。需要清楚的一點是當前的分配還只是一個方案,storm 叢集並沒有開始真正執行當前 topology,如果需要真正啟動方案的執行,storm 還需要排程各個 supervisor 節點按照方案啟動相應的 worker 程序,並在每個 worker 程序上啟動相應數量的執行緒來執行 task,相應過程我們後面會逐一進行分析。
(本篇完)