JStorm 原始碼解析:worker 的啟動和執行機制
上一篇我們分析了 supervisor 節點的啟動和執行過程,提及到 supervisor 的核心工作就是基於 ZK 從 nimbus 節點領取分配給它的任務,並啟動 worker 執行。一個 worker 就是一個 JVM 程序,執行在 supervisor 節點上,多個 task 可以同時執行在一個 worker 程序之中,每個 task 都對應一個執行緒。
Worker 程序的啟動位於 Worker 類中,前面我們在分析 supervisor 節點的啟動過程時提及到了對於 Worker 類 main 函式的觸發,supervisor 在啟動相應 worker 程序時會指定 topologyId、supervisorId、workerPort、workerId,以及 classpath 等引數,worker 在拿到這些引數之後會先獲取當前機器上埠對應的老程序,並逐一 kill 掉,然後呼叫Worker#mk_worker
方法建立並啟動對應的 worker 例項,該方法的核心實現如下:
Worker w = new Worker(conf, context, topologyId, supervisorId, port, workerId, jarPath); return w.execute();
Worker 類僅包含一個例項屬性 WorkerData,它封裝了所有與 worker 執行相關的屬性,例項化 Worker 物件的過程也是初始化 WorkerData 屬性的過程,該過程主要包含以下工作:
workers/${worker_id}/pids
初始化完成之後會呼叫Worker#execute
方法建立並啟動 worker 程序,該方法主要的執行流程可以概括如下:
- 為當前 worker 建立並啟動一個 socket 連線,用於接收訊息並分發給名下的 task 執行緒
- 啟動一個執行緒用於維護當前 worker 狀態變更時,更新與其它 worker 之間的連線關係
- 啟動一個執行緒用於定期獲取當前 topology 在 ZK 上的基本資訊,當 topology 狀態發生變更時觸發本地相應操作
- 啟動一個執行緒迴圈消費當前 worker 的 tuple 佇列傳送給對應的下游 task 執行緒
- 啟動一個執行緒用於定期更新本地的 worker 心跳資訊
- 建立並啟動當前 worker 下所有的 task 任務
方法實現如下:
public WorkerShutdown execute() throws Exception { List<AsyncLoopThread> threads = new ArrayList<>(); // 1. 為 worker 建立一個 socket 連線,接收和分發訊息給對應的 task AsyncLoopThread controlRvThread = this.startDispatchThread(); threads.add(controlRvThread); // 2. 建立執行緒用於在 worker 關閉或者新啟動時更新與其他 worker 之間的連線資訊 RefreshConnections refreshConn = this.makeRefreshConnections(); AsyncLoopThread refreshConnLoopThread = new AsyncLoopThread(refreshConn, false, Thread.MIN_PRIORITY, true); threads.add(refreshConnLoopThread); // 3. 獲取 topology 在 ZK 上的狀態,當狀態發生變更時更新本地 task 狀態 RefreshActive refreshZkActive = new RefreshActive(workerData); AsyncLoopThread refreshZk = new AsyncLoopThread(refreshZkActive, false, Thread.MIN_PRIORITY, true); threads.add(refreshZk); // 4. 建立一個執行緒迴圈消費 tuple 佇列傳送給對應的下游 task DrainerCtrlRunnable drainerCtrlRunnable = new DrainerCtrlRunnable(workerData, MetricDef.SEND_THREAD); AsyncLoopThread controlSendThread = new AsyncLoopThread(drainerCtrlRunnable, false, Thread.MAX_PRIORITY, true); threads.add(controlSendThread); // Sync heartbeat to Apsara Container AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkWorkerInstance(workerData.getStormConf()); if (syncContainerHbThread != null) { threads.add(syncContainerHbThread); } JStormMetricsReporter metricReporter = new JStormMetricsReporter(workerData); metricReporter.init(); workerData.setMetricsReporter(metricReporter); // 5. 更新本地心跳資訊 RunnableCallback heartbeatFn = new WorkerHeartbeatRunnable(workerData); AsyncLoopThread hb = new AsyncLoopThread(heartbeatFn, false, null, Thread.NORM_PRIORITY, true); threads.add(hb); // 6. 建立並啟動當前 worker 下所有的 task List<TaskShutdownDaemon> shutdownTasks = this.createTasks(); workerData.setShutdownTasks(shutdownTasks); List<AsyncLoopThread> serializeThreads = workerData.setSerializeThreads(); threads.addAll(serializeThreads); List<AsyncLoopThread> deserializeThreads = workerData.setDeserializeThreads(); threads.addAll(deserializeThreads); return new WorkerShutdown(workerData, threads); }
一. 訊息接收與分發
Storm 會為 worker 基於 Netty 建立並返回一個 socket 連線用於接收訊息,同時 worker 與名下所有 task 之間會維持一個傳輸佇列,並啟動一個執行緒迴圈消費接收到的訊息投遞給對應 task 的傳輸佇列中。該過程位於Worker#startDispatchThread
方法中,該方法實現如下(去掉了一些非關鍵程式碼):
private AsyncLoopThread startDispatchThread() { IContext context = workerData.getContext(); // 獲取訊息上下文:NettyContext String topologyId = workerData.getTopologyId(); // 1. 建立一個接收訊息的訊息佇列(disruptor) Map stormConf = workerData.getStormConf(); long timeout = JStormUtils.parseLong(stormConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10); // 預設 10ms WaitStrategy waitStrategy = new TimeoutBlockingWaitStrategy(timeout, TimeUnit.MILLISECONDS); // 10ms int queueSize = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_CTRL_BUFFER_SIZE), 256); DisruptorQueue recvControlQueue = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI, queueSize, waitStrategy, false, 0, 0); // 2. 為當前 worker 基於 Netty 建立並返回一個 Socket 連線用於接收訊息 IConnection recvConnection = context.bind( topologyId, workerData.getPort(), workerData.getDeserializeQueues(), recvControlQueue, false, workerData.getTaskIds()); workerData.setRecvConnection(recvConnection); // 3. 啟動一個執行緒迴圈消費 worker 接收到的訊息,並應用 DisruptorRunnable.onEvent 方法, //最終呼叫的是 VirtualPortCtrlDispatch.handleEvent 方法,將訊息投遞給指定 task 的訊息佇列 RunnableCallback recvControlDispatcher = new VirtualPortCtrlDispatch( workerData, recvConnection, recvControlQueue, MetricDef.RECV_THREAD); return new AsyncLoopThread(recvControlDispatcher, false, Thread.MAX_PRIORITY, true); }
這裡的訊息佇列底層都依賴於Exchange/disruptor" target="_blank" rel="nofollow,noindex">disruptor
實現,最終對於接收到的訊息都會呼叫VirtualPortCtrlDispatch#handleEvent
方法進行處理:
public void handleEvent(Object event, boolean endOfBatch) throws Exception { TaskMessage message = (TaskMessage) event; int task = message.task(); // 獲取當前訊息對應的 taskId // 訊息反序列化 Object tuple = null; try { // there might be errors when calling update_topology tuple = this.deserialize(message.message(), task); } catch (Throwable e) { if (Utils.exceptionCauseIsInstanceOf(KryoException.class, e)) { throw new RuntimeException(e); } LOG.warn("serialize msg error", e); } // 獲取 taskId 對應的訊息通道 DisruptorQueue queue = controlQueues.get(task); if (queue == null) { LOG.warn("Received invalid control message for task-{}, Dropping...{} ", task, tuple); return; } if (tuple != null) { // 將訊息投遞給對應的 task 傳輸佇列 queue.publish(tuple); } }
二. 建立並啟動用於維護 worker 之間連線關係的執行緒
在這一步會建立一個 RefreshConnections 物件,它繼承了 RunnableCallback 類,所以同樣是被非同步迴圈執行緒模型接管(按照指定間隔迴圈呼叫其RefreshConnections#run
方法),storm 會定期檢測 ZK 上的 topology 任務分配資訊是否有更新,如果有比本地更新的任務分配(依賴於任務分配時間戳進行判定)則會判斷新任務分配的型別來相應的更新本地的資訊。
如果當前的任務分配型別僅僅是更新叢集上已有的 topology,則 storm 會遍歷通知各個 task 執行相應的更新操作,同時會回撥已註冊的所有更新監聽器以更新配置資訊,實現如下:
// 當前任務分配已經更新且是更新 topology 操作,則通知所有的 task List<TaskShutdownDaemon> taskShutdowns = workerData.getShutdownTasks(); Map newConf = StormConfig.read_supervisor_topology_conf(conf, topologyId); workerData.getStormConf().putAll(newConf); for (TaskShutdownDaemon taskSD : taskShutdowns) { // 通知所有的 task taskSD.update(newConf); } // disable/enable metrics on the fly workerData.getUpdateListener().update(newConf); // 回撥更新監聽器,更新配置 workerData.setAssignmentType(AssignmentType.UpdateTopology);
如果當前是更新以外的任務分配型別(Assign、ScaleTopology),則 storm 會從新的任務分配資訊中分別獲取新增的、待刪除的,以及需要更新的 taskId 列表,並執行相應的建立、刪除,以及更新 task 操作,同時會更新 worker 上所有 task 的下游 task 列表資訊。部分程式碼實現如下:
// 獲取新增的 taskId 列表 Set<Integer> addedTasks = this.getAddedTasks(assignment); // 獲取待刪除的 taskId 列表 Set<Integer> removedTasks = this.getRemovedTasks(assignment); // 獲取待更新的 taskId 列表 Set<Integer> updatedTasks = this.getUpdatedTasks(assignment); // 基於新任務分配資訊更新 workerData workerData.updateWorkerData(assignment); workerData.updateKryoSerializer(); // 關閉需要移除的 task this.shutdownTasks(removedTasks); // 建立新增的 task this.createTasks(addedTasks); // 更新已有需要被更新的 task this.updateTasks(updatedTasks); // 更新當前 worker 上所有 task 的下游 task 列表資訊 Set<Integer> tmpOutboundTasks = Worker.worker_output_tasks(workerData); if (!outboundTasks.equals(tmpOutboundTasks)) { for (int taskId : tmpOutboundTasks) { if (!outboundTasks.contains(taskId)) { workerData.addOutboundTaskStatusIfAbsent(taskId); } } for (int taskId : workerData.getOutboundTaskStatus().keySet()) { if (!tmpOutboundTasks.contains(taskId)) { workerData.removeOutboundTaskStatus(taskId); } } workerData.setOutboundTasks(tmpOutboundTasks); outboundTasks = tmpOutboundTasks; } workerData.setAssignmentType(AssignmentType.Assign);
三. 建立並啟動定期獲取 topology 基本資訊的執行緒
在這一步會建立一個 RefreshActive 物件,它同樣繼承了 RunnableCallback 類,所以同樣也是被非同步迴圈執行緒模型接管(按照指定間隔迴圈呼叫其RefreshActive#run
方法),storm 會定期獲取當前 topology 在 ZK 上的基本資訊,當 topology 狀態發生變更時觸發本地執行相應的操作。
如果 topology 狀態資訊變為 active、upgrading,或者 rollback 時,storm 會依次將本地 task 的狀態設定為TaskStatus.RUN
,如果當前 task 對應的元件是 spout,則會觸發ISpout#activate
方法。如果當前 topology 狀態不為 inactive 時,storm 會依次將本地的 task 狀態設定為TaskStatus.PAUSE
,如果當前 task 對應的元件是 spout,則會觸發ISpout#deactivate
方法。最後更新本地記錄的 topology 狀態。相關實現如下:
if (newTopologyStatus.equals(StatusType.active) // 啟用 || newTopologyStatus.equals(StatusType.upgrading) // 灰度 || newTopologyStatus.equals(StatusType.rollback)) { // 回滾 for (TaskShutdownDaemon task : tasks) { if (task.getTask().getTaskStatus().isInit()) { task.getTask().getTaskStatus().setStatus(TaskStatus.RUN); } else { task.active(); } } } else if (oldTopologyStatus == null || !oldTopologyStatus.equals(StatusType.inactive)) { for (TaskShutdownDaemon task : tasks) { if (task.getTask().getTaskStatus().isInit()) { task.getTask().getTaskStatus().setStatus(TaskStatus.PAUSE); } else { task.deactive(); } } } workerData.setTopologyStatus(newTopologyStatus);
四. 建立並啟動迴圈消費 worker tuple 佇列的執行緒
在這一步會建立一個 DrainerCtrlRunnable 物件,它同樣繼承了 RunnableCallback 類,所以同樣也是被非同步迴圈執行緒模型接管(按照指定間隔迴圈呼叫其DrainerCtrlRunnable#run
方法),storm 會迴圈消費當前 worker 的 tuple 佇列 transferCtrlQueue,並最終呼叫DrainerCtrlRunnable#handleEvent
方法對拿到的訊息進行處理,該方法的實現如下:
public void handleEvent(Object event, boolean endOfBatch) throws Exception { if (event == null) { return; } ITupleExt tuple = (ITupleExt) event; int targetTask = tuple.getTargetTaskId(); // 獲取與下游 task 的連線 IConnection conn = this.getConnection(targetTask); if (conn != null) { byte[] tupleMessage = null; try { // there might be errors when calling update_topology tupleMessage = this.serialize(tuple); // 序列化資料 } catch (Throwable e) { // 省略異常處理 } // 基於 netty 傳送資料 TaskMessage message = new TaskMessage(TaskMessage.CONTROL_MESSAGE, targetTask, tupleMessage); conn.sendDirect(message); } }
方法的邏輯比較簡單,拿到當前 tuple 對應的下游 taskId,然後與之建立連線(netty)並將 tuple 傳送給它。
五. 建立並啟動當前 worker 下所有的 task 執行緒
方法Worker#createTasks
用於為當前 worker 下的所有 task 任務建立一個 Task 物件,併為每個 task 啟動一個執行緒執行,同時為每個 task 任務建立一個 TaskShutdownDaemon 物件用於管理對應的 task 執行緒,方法的實現如下:
private List<TaskShutdownDaemon> createTasks() throws Exception { List<TaskShutdownDaemon> shutdownTasks = new ArrayList<>(); // 獲取當前 worker 下所有的 taskId Set<Integer> taskIds = workerData.getTaskIds(); Set<Thread> threads = new HashSet<>(); List<Task> taskArrayList = new ArrayList<>(); for (int taskId : taskIds) { // 建立並啟動 task Task task = new Task(workerData, taskId); Thread thread = new Thread(task); threads.add(thread); taskArrayList.add(task); thread.start(); // 啟動 task } for (Thread thread : threads) { thread.join(); } for (Task t : taskArrayList) { shutdownTasks.add(t.getTaskShutdownDameon()); } return shutdownTasks; }
Task 類實現了 Runnable 介面,其 run 方法中簡單呼叫了Task#execute
方法,該方法首先會向系統 bolt 傳送一條“startup”訊息,然後依據當前的元件型別建立對應的任務執行器,建立的過程位於Task#mkExecutor
方法中:
public BaseExecutors mkExecutor() { BaseExecutors baseExecutor = null; if (taskObj instanceof IBolt) { if (taskId == topologyContext.getTopologyMasterId()) { baseExecutor = new TopologyMasterBoltExecutors(this); } else { baseExecutor = new BoltExecutors(this); } } else if (taskObj instanceof ISpout) { if (this.isSingleThread(stormConf)) { baseExecutor = new SingleThreadSpoutExecutors(this); } else { baseExecutor = new MultipleThreadSpoutExecutors(this); } } return baseExecutor; }
BaseExecutors 類是一個 RunnableCallback 類,所以其 run 方法會被非同步迴圈呼叫。繼承自 BaseExecutors 類有 5 個(如下),而Task#mkExecutor
方法基於元件型別分別選擇了相應的實現類進行例項化。
- BoltExecutors
- TopologyMasterBoltExecutors
- SpoutExecutors
- SingleThreadSpoutExecutors
- MultipleThreadSpoutExecutors
先來看一下 BoltExecutors 和 TopologyMasterBoltExecutors,這是 bolt 元件的任務執行器,其中 TopologyMasterBoltExecutors 繼承自 BoltExecutors,所以接下來我們主要來看一下 BoltExecutors 的實現。BoltExecutors 類的 run 方法實現如下:
public void run() { if (!isFinishInit) { // 執行初始化操作,主要是呼叫了 IBolt.prepare 方法 this.initWrapper(); } while (!taskStatus.isShutdown()) { try { // 迴圈消費當前 task 的訊息佇列 this.consumeExecuteQueue(); } catch (Throwable e) { // 省略異常處理邏輯 } } }
方法首先會判定是否完成了初始化操作,如果未完成則會呼叫BaseExecutors#initWrapper
執行初始化,這期間主要是呼叫了IBolt#prepare
方法,這也是我們在實現一個 bolt 時執行初始化的方法。如果當前 task 執行緒沒有被銷燬,則會一直迴圈呼叫BoltExecutors#consumeExecuteQueue
消費當前 task 的訊息佇列。前面的分析我們知道 worker 會對接收到的訊息按照 taskId 投遞給對應 task 的訊息佇列,而訊息佇列的消費過程就在這裡發生。針對接收到訊息會逐條進行處理,這裡最終呼叫的是BoltExecutors#onEvent
方法,處理的訊息就是我們熟悉的 Tuple 物件,而該方法的核心就是呼叫IBolt#execute
方法,也就是呼叫使用者自定義的策略對收到的 tuple 進行處理。
再來看一下 SingleThreadSpoutExecutors 和 MultipleThreadSpoutExecutors,這兩類都繼承自 SpoutExecutors 類,區別僅在於對於訊息的附加處理和正常的業務邏輯是否位於同一個執行緒中,而核心邏輯都是呼叫ISpout#nextTuple
方法,也就是執行使用者自定義的業務邏輯。
針對 worker 的執行機制就分析到這裡,但是 storm 對於訊息的處理並沒有結束,下一篇我們將一起探尋 ack 機制,看看 storm 如何保證訊息至少被執行一次(at least once)。
(本篇完)
轉載宣告 : 版權所有,商業轉載請聯絡作者,非商業轉載請註明出處
本部落格所有文章除特別宣告外,均採用 CC BY-NC-SA 4.0 許可協議