JStorm 原始碼解析:nimbus 的啟動和執行機制
本篇我們一起分析一下 nimbus 節點的啟動和執行機制。Nimbus 節點是 storm 叢集的排程者和管理者,它是叢集與使用者互動的視窗,負責 topology 任務的分配、啟動和執行,也管理著叢集中所有的 supervisor 節點的執行,監控著整個叢集的執行狀態,並將叢集執行資訊彙集給 UI 進行展示。
Nimbus 節點的啟動過程位於 NimbusServer 類中,這是一個驅動類,main 方法中會載入叢集配置檔案,包括 default.yaml 和 storm.yaml,並將配置檔案內容與啟動時的命令列引數一起封裝成 map 物件便於後續使用,真正的啟動邏輯位於NimbusServer#launchServer
方法中:
private void launchServer(final Map conf, INimbus inimbus) { LOG.info("Begin to start nimbus with conf " + conf); try { // 1. 驗證當前為分散式執行模式,不允許以本地模式執行 StormConfig.validate_distributed_mode(conf); // 2. 建立當前 JVM 程序對應的目錄:${storm.local.dir}/nimbus/pids/${pid},如果存在歷史執行記錄,則會進行清除 this.createPid(conf); // 3. 註冊 shutdown hook 方法,用於在 JVM 程序終止時執行清理邏輯 this.initShutdownHook(); // 4. 模板方法 inimbus.prepare(conf, StormConfig.masterInimbus(conf)); // 5. 基於 conf 建立 NimbusData 物件 data = this.createNimbusData(conf, inimbus); // 6. 註冊一個 follower 執行緒 this.initFollowerThread(conf); // 7. 建立並啟動一個後端 HTTP 服務(預設埠為 7621,主要用於檢視和下載 nimbus 的日誌資料) int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf); hs = new Httpserver(port, conf); hs.start(); // 8. 如果叢集執行在 YARN 上,則初始化容器心跳執行緒 this.initContainerHBThread(conf); // 9. 建立 ServiceHandler(實現了 Nimbus.Iface),並啟動 Thrift 服務,用於處理 Nimbus 請求 serviceHandler = new ServiceHandler(data); this.initThrift(conf); } catch (Throwable e) { if (e instanceof OutOfMemoryError) { LOG.error("Halting due to out of memory error..."); } LOG.error("Fail to run nimbus ", e); } finally { this.cleanup(); } LOG.info("Quit nimbus"); }
整個啟動過程可以概括如下:
${storm.local.dir}/nimbus/pids/${pid} INimbus#prepare
整個方法的執行邏輯還是相當清晰的,下面就其中一些關鍵步驟深入分析,主要包含 NimbusData 的例項化過程、HA 機制,以及 thrift 服務的初始化啟動過程。
一. NimbusData 的例項化過程
首先來看一下 NimbusData 的例項化過程,位於NimbusServer#createNimbusData
方法中,該方法基於前面載入的叢集配置資訊建立 NimbusData 類例項,並在構造方法中執行了一系列的初始化邏輯。NimbusData 是 nimbus 端非常重要的一個類,封裝了 nimbus 節點所有的執行資料,這裡挑重點分析一下其構造的初始化過程:
- 建立上傳和下載傳輸通道處理器
- 建立並初始化對應的 blobstore 例項
- 建立 StormZkClusterState 物件,並設定本地快取
建立上傳和下載傳輸通道處理器位於NimbusData#createFileHandler
方法中,前面我們在分析 topology 構建和提交過程時曾分析過 jar 檔案的上傳過程,在開始上傳之前客戶端會先通知 nimbus 節點做一些準備工作,其中就包含建立檔案上傳通道,對於建立完成的通道會記錄到一個 TimeCacheMap 型別的 uploaders 欄位中等待後續取用。在 supervisor 從 nimbus 節點下載對應 topology 的 jar 檔案時會建立相應的下載傳輸通道,並記錄到 TimeCacheMap 型別的 downloaders 欄位中。本方法就是對這兩個欄位執行初始化的過程,實現如下:
public void createFileHandler() { // 註冊一個 callback 方法,基於回撥的方式關閉管道或輸入流 ExpiredCallback<Object, Object> expiredCallback = new ExpiredCallback<Object, Object>() { @Override public void expire(Object key, Object val) { try { LOG.info("Close file " + String.valueOf(key)); if (val != null) { if (val instanceof Channel) { Channel channel = (Channel) val; channel.close(); } else if (val instanceof BufferFileInputStream) { BufferFileInputStream is = (BufferFileInputStream) val; is.close(); } } } catch (IOException e) { LOG.error(e.getMessage(), e); } } }; /* * 獲取檔案上傳和下載的超時時間,預設為 30 秒 * * During upload/download with the master, * how long an upload or download connection is idle before nimbus considers it dead and drops the connection. */ int file_copy_expiration_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30); /* * TimeCacheMap 在例項化時會啟動一個守護執行緒, * 並依據超時時間迴圈從 buckets 中去除物件,並應用執行 callback 的 expire 方法 * 這裡的 expire 邏輯是執行關閉管道或輸入流 */ uploaders = new TimeCacheMap<>(file_copy_expiration_secs, expiredCallback); downloaders = new TimeCacheMap<>(file_copy_expiration_secs, expiredCallback); }
該方法主要完成了 3 件事情,其中 1 和 2 比較直觀,而 3 則在 TimeCacheMap 類例項化時完成,3 件事情分別如下:
- 為通道或流建立回撥策略,用於關閉通道或流
- 例項化 uploaders 和 downloaders 屬性
- 啟動一個守護執行緒,該執行緒會定期對過期的通道應用註冊的回撥策略
我們來看一下步驟 3 的邏輯,TimeCacheMap 是一個自定義的 map 型別,包含 map 型別常用的方法,同時具備超時機制,在例項化物件時會建立並啟動一個後臺執行緒,用於定時的應用超時回撥策略:
public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) { if (numBuckets < 2) { throw new IllegalArgumentException("numBuckets must be >= 2"); } buckets = new LinkedList<>(); for (int i = 0; i < numBuckets; i++) { buckets.add(new HashMap<K, V>()); } // 註冊回撥策略 this.callback = callback; final long expirationMillis = expirationSecs * 1000L; final long sleepTime = expirationMillis / (numBuckets - 1); /* * cleaner 執行緒會一直迴圈的執行, * 間隔指定時間從緩衝區尾部獲取物件,併為該物件應用 callback 的 expire 方法 */ this.cleaner = new Thread(new Runnable() { @Override public void run() { while (!AsyncLoopRunnable.getShutdown().get()) { Map<K, V> dead; JStormUtils.sleepMs(sleepTime); synchronized (lock) { // 從緩衝區隊尾獲取物件 dead = buckets.removeLast(); // 新增一個空的 map 到緩衝區,從而保證執行緒的正常執行 buckets.addFirst(new HashMap<K, V>()); } if (TimeCacheMap.this.callback != null) { for (Entry<K, V> entry : dead.entrySet()) { TimeCacheMap.this.callback.expire(entry.getKey(), entry.getValue()); } } } } }); cleaner.setDaemon(true); cleaner.start(); }
這裡我們以 uploaders 為例,當客戶端請求 nimbus 執行檔案上傳準備時,nimbus 會為本次請求建立一個上傳通道,同時記錄到 uploaders 中,本質上是記錄到了 TimeCacheMap 的 buckets 欄位頭部。在例項化 uploaders 時,方法會建立相應的守護執行緒,每間隔指定時間(預設是 30 秒)從 buckets 尾部移除超時的通道,並應用回撥策略,這裡也就是在 createFileHandler 方法開始時建立的關閉通道回撥策略。NimbusData#mkBlobCacheMap
方法的邏輯與 createFileHandler 基本相同。
下面來看一下 BlobStore 例項的建立和初始化過程,BlobStore 是一個鍵值儲存物件,用於儲存 topology 物件,以及 topology 配置資訊等。Storm 預設提供了兩類儲存實現:本地檔案儲存(LocalFsBlobStore)和 HDFS 檔案儲存(HdfsBlobStore)。如果是本地儲存則需要 ZK 的介入來保證資料一致性,而採用 HDFS 儲存則會使用 HDFS 自帶的備份和一致性保證。在 NimbusData 例項化過程中會呼叫BlobStoreUtils#getNimbusBlobStore
方法建立並初始化 BlobStore 例項,方法會檢查nimbus.blobstore.class
配置,該配置用於指定具體的 BlobStore 實現類全稱類名(包括 HdfsBlobStore),如果沒有指定則預設採用 LocalFsBlobStore 實現,並在例項化後呼叫對應的 prepare 方法執行初始化,這裡以LocalFsBlobStore#prepare
進行說明。對於本地模式而言,會採用${storm.local.dir}/blobs/
作為儲存的基礎路徑,並以 FileBlobStoreImpl 類例項操作本地檔案,同時會建立對應的 ZK 客戶端用於操作 ZK,以維護資料的一致性。
最後來看一下 StormZkClusterState 類物件的建立。StormZkClusterState 類也是一個非常重要的類,它實現了 StormClusterState 介面。Storm 可以看做是基於 ZK 的分散式實時任務排程系統,基於 ZK 實現對整個叢集中任務和節點的協調和排程,而叢集與 ZK 之間的通訊都依賴於 StormZkClusterState 類物件,其例項化過程中所做的主要工作就是在 ZK 上建立相應的一級目錄,並註冊一個數據變更回撥策略,用於觸發監聽相應路徑資料變更時的回撥處理器。建立的路徑包括:
- supervisors - topology - assignments - assignments_bak - tasks - taskbeats - taskerrors - metrics - blobstore - gray_upgrade
這裡我們對上面的路徑進行一個簡單的說明:
+ ${zk_root_dir} | ---- + topology: 記錄叢集中所有正在執行的 topology 資料 | ---- | ---- + ${topology_id}: 指定 topology 的相關資訊(名稱、開始執行時間、執行狀態等) | ---- + supervisors: 記錄叢集中所有 supervisor 節點的心跳資訊 | ---- | ---- + ${supervivor_id}: 指定 supervisor 的心跳資訊(心跳時間、主機名稱、所有 worker 的埠號、執行時間等) | ---- + assignments: 記錄提交給叢集的 topology 任務分配資訊 | ---- | ---- + ${topology_id}: 指定 topology 的任務分配資訊(對應 nimbus 上的程式碼目錄、所有 task 的啟動時間、每個 task 與節點和埠的對映關係等) | ---- + assignments_bak: 記錄提交給叢集的 topology 任務分配資訊的備份 | ---- + tasks: 記錄叢集中所有 topology 的 task 資訊 | ---- | ---- + ${topology_id}: 指定 topology 的所有 task 資訊 | ---- | ---- | ---- + ${task_id}: 指定 task 所屬的元件 ID 和型別(spout/bolt) | ---- + taskbeats: 記錄叢集中所有 task 的心跳資訊 | ---- | ---- + ${topology_id}: 記錄指定 topology 下所有 task 的心跳資訊、topologyId,以及 topologyMasterId 等 | ---- | ---- | ---- + ${task_id}: 指定 task 的心跳資訊(最近一次心跳時間、執行時長、統計資訊等) | ---- + taskerrors: 記錄叢集中所有 topology 的 task 執行錯誤資訊 | ---- | ---- + ${topology_id}: 指定 topology 下所有 task 的執行錯誤資訊 | ---- | ---- | ---- + ${task_id}: 指定 task 的執行錯誤資訊 | ---- + metrics: 記錄叢集中所有 topology 的 metricsId | ---- + blobstore: 記錄叢集對應的 blobstore 資訊,用於協調資料一致性 | ---- + gray_upgrade: 記錄灰度釋出中的 topologyId
Storm 叢集的執行嚴重依賴於 ZK 進行協調,所以在叢集較大的時候 ZK 有可能成為瓶頸,JStorm 在這一塊引入了快取進行優化,因為 ZK 中的資料有相當一部分是很少變更的,採用快取策略可以提升訪問速度,又減小對於 ZK 的讀壓力。快取例項的建立也在 NimbusData 例項化期間完成,相應邏輯位於NimbusData#createCache
方法中,該方法會建立一個 NimbusCache 快取類物件,並將其記錄到 StormZkClusterState 的相應屬性中。NimbusCache 採用了兩級快取設計,即記憶體和檔案,構造方法實現如下:
public NimbusCache(Map conf, StormClusterState zkCluster) { super(); // 獲取本地快取的具體實現類 String dbCacheClass = this.getNimbusCacheClass(conf); LOG.info("NimbusCache db cache will use {}", dbCacheClass); try { dbCache = (JStormCache) Utils.newInstance(dbCacheClass); String dbDir = StormConfig.masterDbDir(conf); // 設定本地快取資料存放目錄 conf.put(RocksDBCache.ROCKSDB_ROOT_DIR, dbDir); // ${storm.local.dir}/nimbus/rocksdb // 是否在 nimbus 啟動時清空資料,預設為 true conf.put(RocksDBCache.ROCKSDB_RESET, ConfigExtension.getNimbusCacheReset(conf)); dbCache.init(conf); if (dbCache instanceof TimeoutMemCache) { memCache = dbCache; } else { memCache = new TimeoutMemCache(); memCache.init(conf); } } // 省略 catch 程式碼塊 this.zkCluster = zkCluster; }
JStormCache 介面聲明瞭快取的基本操作,針對該介面 storm 主要提供了兩類實現:TimeoutMemCache 和 RocksDBCache。對於檔案儲存而言,如果是本地模式,或者 linux 和 mac 以外的平臺均採用 TimeoutMemCache,否則會檢查nimbus.cache.class
配置是否有指定相應的快取實現類,如果沒有指定的話,storm 會採用 RocksDBCache 作為檔案儲存。RocksDBCache 的儲存實現基於ofollow,noindex">rocksdb
,這是一個由 Facebook 開發和維護的嵌入式鍵值資料庫,借用了leveldb
專案的核心程式碼,以及來自 HBase 的設計思想,可以簡單將其理解為本地版本的HBase
。
二. Nimbus 節點的 HA 機制
Nimbus 節點在整個 storm 叢集中地位無可厚非,但是單點的設計對於目前大環境下的高可用來說是欠缺的,雖然 nimbus 本身的執行資料是無狀態的,但是當 nimbus 節點宕機後,我們還是希望有其它 nimbus 節點能夠快速頂替上來,以保證業務 topology 的正常執行。早期的 storm 實現存在單點的問題,所以 jstorm 在改寫的時候引入了 HA 機制來解決這一問題,對於 jstorm 來說一個叢集執行過程中只能有一個 nimbus leader 節點,但是可以啟動多個 nimbus follower 節點,當 leader 節點宕機之後,follower 節點們可以依據優先順序競爭成為 leader 節點。實際上叢集剛剛啟動時所有的 nimbus 節點都是 follower,不過在短時間內就會依賴於 HA 機制從中選出一個 leader 節點。JStorm HA 機制依賴於 ZK 實現,會在 ZK 根節點下建立 nimbus_master 和 nimbus_slave 兩個檔案,顧名思義,nimbus_master 用於儲存 nimbus leader 的相關資訊,其實就是節點對應的 IP 和埠,而 nimbus_slave 主要儲存 nimbus follower 的相關資訊。HA 機制的啟動過程位於NimbusServer#initFollowerThread
方法中:
private void initFollowerThread(Map conf) { // 如果當前 nimbus 成為 leader,則會觸發此回撥執行初始化操作 Callback leaderCallback = new Callback() { @Override public <T> Object execute(T... args) { try { init(data.getConf()); } catch (Exception e) { LOG.error("Nimbus init error after becoming a leader", e); JStormUtils.halt_process(0, "Failed to init nimbus"); } return null; } }; // 建立並啟動 follower 執行緒 follower = new FollowerRunnable(data, 5000, leaderCallback); Thread thread = new Thread(follower); thread.setDaemon(true); thread.start(); LOG.info("Successfully init Follower thread"); }
方法的主要邏輯就是為當前 nimbus 節點建立並啟動一個 follower 執行緒,相應的實現位於 FollowerRunnable 類中,該類例項化的過程中會執行以下幾件事情:
- 判斷當前是否是以叢集模式執行,對於本地模式不適用於 HA 機制
- 將當前節點的 IP 和埠號資訊註冊到 ZK 的 nimbus_slave 和 nimbus_slave_detail 目錄下,表示當前節點是一個 nimbus follower 節點
- 檢查當前節點是否存在 leader,如果不存在則嘗試成為 leader 節點
- 如果使用本地儲存 blobstore 資料則記錄狀態資訊到 ZK,以保證資料的一致性
當 follower 執行緒啟動之後,follower 預設會每間隔 5 秒鐘檢查一次當前叢集是否存在 nimbus leader 節點,如果不存在則會嘗試成為 leader,該過程位於FollowerRunnable#tryToBeLeader
方法中:
private void tryToBeLeader(final Map conf) throws Exception { // 依據候選 nimbus 從節點的優先順序來決定當前 nimbus 從節點是否有資格嘗試成為 leader boolean allowed = this.check_nimbus_priority(); if (allowed) { // 回撥策略再次嘗試 RunnableCallback masterCallback = new RunnableCallback() { @Override public void run() { try { tryToBeLeader(conf); } catch (Exception e) { LOG.error("tryToBeLeader error", e); JStormUtils.halt_process(30, "Cant't be master" + e.getMessage()); } } }; // 嘗試成為 leader 節點 LOG.info("This nimbus can be leader"); data.getStormClusterState().try_to_be_leader(Cluster.MASTER_SUBTREE, hostPort, masterCallback); } else { LOG.info("This nimbus can't be leader"); } }
方法首先會基於所有 follower 的優先順序來決定當前 follower 節點是否有資格嘗試成為 leader,對於有資格的 follower 會呼叫StormZkClusterState#try_to_be_leader
方法嘗試在 ZK 上建立 nimbus_master 節點並寫入自己的 IP 和埠號。如果對應 nimbus_master 節點已經存在,則說明已經有 leader 選舉出來,則當前嘗試失敗,否則如果不存在異常則表示競選成功。
如果叢集已經存在 leader,則方法會判斷對應的 leader 是否是當前 follower 自身,如果是的話且上一次的 leader 不存在或是其它 follower 節點,則會觸發之前在NimbusServer#initFollowerThread
方法中定義的回撥策略,本質上是呼叫了NimbusServer#init
方法,該方法主要執行以下初始化邏輯:
NimbusData#init
+ ${nimbus_local_dir} | ---- + nimbus | ---- | ---- + inbox: 存放客戶端上傳的 jar 包 | ---- | ---- | ---- + stormjar-{uuid}.jar: 對應一個具體的 jar 包 | ---- | ---- + stormdist | ---- | ---- | ---- + ${topology_id} | ---- | ---- | ---- | ---- + stormjar.jar: 包含當前拓撲所有程式碼的 jar 包(從 inbox 那複製過來的) | ---- | ---- | ---- | ---- + stormcode.ser: 當前拓撲物件的序列化檔案 | ---- | ---- | ---- | ---- + stormconf.ser: 當前拓撲的配置資訊檔案
三. Thrift 服務的初始化啟動過程
最後我們來看一下 nimbus 服務的啟動過程。Thrift
是一種介面描述語言和二進位制通訊協議,同時也是一個強大的 RPC 中介軟體,跨語言高效通訊是其主要賣點。Nimbus 啟動起來本質上就是一個 thrift 服務,在介紹 topology 任務提交過程時我們就已經接觸到與 nimbus 節點通訊的過程,本質上也是 RPC 服務呼叫的過程。所有 RPC 介面的實現均位於 ServiceHandler 類中,該類實現了Nimbus.Iface
介面,NimbusServer 主要呼叫NimbusServer#initThrift
方法來啟動 thrift 服務,過程如下:
private void initThrift(Map conf) throws TTransportException { // 獲取 thrift 埠,預設為 8627 Integer thrift_port = JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT)); // ${nimbus.thrift.port} TNonblockingServerSocket socket = new TNonblockingServerSocket(thrift_port); // ${nimbus.thrift.max_buffer_size} Integer maxReadBufSize = JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE)); // 設定服務執行引數 THsHaServer.Args args = new THsHaServer.Args(socket); args.workerThreads(ServiceHandler.THREAD_NUM); // 64 args.protocolFactory(new TBinaryProtocol.Factory(false, true, maxReadBufSize, -1)); args.processor(new Nimbus.Processor<Iface>(serviceHandler)); args.maxReadBufferBytes = maxReadBufSize; thriftServer = new THsHaServer(args); LOG.info("Successfully started nimbus: started Thrift server..."); thriftServer.serve(); }
方法實現了一個標準的 thrift 服務啟動過程,如果對於 thrift 不熟悉可以參考Thrift: The Missing Guide 。Nimbus 節點啟動後預設監聽 8627 埠,然後等待客戶端的請求。到此,一個 nimbus 節點啟動的主要流程就基本完成了。
(本篇完)
轉載宣告 : 版權所有,商業轉載請聯絡作者,非商業轉載請註明出處
本部落格所有文章除特別宣告外,均採用 CC BY-NC-SA 4.0 許可協議