JStorm 原始碼解析:基礎執行緒模型
在具體開始分析 storm 叢集的啟動和執行機制之前,我們先來看一下基礎的執行緒模型,在整個 storm 的實現中有很多地方用到它,所以將其單獨拎出來先分析說明一下,後面看到相應的類就大致知道其內在的執行過程啦。
在 storm 的實現中,有很多實現了 RunnableCallback 類的子類,這些類例項化之後都被傳遞給了 AsyncLoopThread 物件,示例如下:
public class MyRunnableCallback extends RunnableCallback { private static AtomicInteger count = new AtomicInteger(); @Override public void run() { System.out.println("[" + count.incrementAndGet() + "] thread-" + Thread.currentThread().getId() + " is running."); } @Override public Object getResult() { return 1; } public static void main(String[] args) { MyRunnableCallback callback = new MyRunnableCallback(); new AsyncLoopThread(callback); } }
上面的例子的執行效果是每間隔 1 秒會執行一遍 run 方法,輸出如下:
[1] thread-11 is running. [2] thread-11 is running. [3] thread-11 is running.
所以我們可以簡單的理解其作用是簡單方便的建立一個執行緒用於迴圈執行自定義的業務邏輯,接下來看一下相應的原始碼實現。
RunnableCallback 類實現了 Runnable、Callback,以及 Shutdownable 三個介面,其中 Runnable 是 jdk 自帶的介面,後兩個介面定義如下:
public interface Callback { <T> Object execute(T... args); } public interface Shutdownable { void shutdown(); }
RunnableCallback 類的完整定義如下:
public class RunnableCallback implements Runnable, Callback, Shutdownable { @Override public void run() { } @Override public <T> Object execute(T... args) { return null; } @Override public void shutdown() { } public void preRun() { } public void postRun() { } public Exception error() { return null; } public Object getResult() { return null; } public String getThreadName() { return null; } }
- run :用於實現自定義的需要非同步迴圈執行的邏輯,該方法會依據條件被迴圈排程
- execute :當執行緒異常退出時該方法會被呼叫,用於執行自定義的異常處理邏輯
- shutdown :當任務被銷燬或者正常退出時該方法被呼叫,用於執行一定的銷燬策略
- preRun :執行 run 方法之前的前置模板方法
- postRun :執行 run 方法之後的後置模板方法
- error :用於獲取當前任務的錯誤執行資訊,如果存在錯誤則會中斷當前 run 方法的繼續排程
- getResult :該方法用於控制 run 方法的排程,如果返回值為 null 或者 0 則任務會一直迴圈排程,如果返回值小於 0 則會在執行一次 run 之後退出,如果返回值大於 0 則表示每次排程間隔睡眠的時間(單位:秒)
- getThreadName :設定當前執行緒的名稱
當我們完成例項化自定義的 RunnableCallback 物件之後,我們需要將其傳遞給 AsyncLoopThread 類物件用於啟動執行。AsyncLoopThread 類提供了多個過載版本的建構函式,但最終呼叫的都是AsyncLoopThread#init
方法,該方法的實現如下:
private void init(RunnableCallback afn, boolean daemon, RunnableCallback kill_fn, int priority, boolean start) { if (kill_fn == null) { // 如果沒有設定,則預設建立一個 kill_fn = new AsyncLoopDefaultKill(); } // 採用 AsyncLoopRunnable 對於 afn 和 kfn 進行包裝 Runnable runnable = new AsyncLoopRunnable(afn, kill_fn); thread = new Thread(runnable); String threadName = afn.getThreadName(); if (threadName == null) { // 以 afn 的 simpleName 作為執行緒名稱 threadName = afn.getClass().getSimpleName(); } // 配置執行緒 thread.setName(threadName); thread.setDaemon(daemon); thread.setPriority(priority); thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { LOG.error("UncaughtException", e); ThreadUtils.haltProcess(1); } }); this.afn = afn; if (start) { // 啟動執行緒 thread.start(); } }
該方法的第一個引數 afn 就是我們傳遞的自定義的 RunnableCallback 物件;第二個引數 daemon 用於指定當前執行緒是否以守護執行緒的模式執行;第三個引數 kill_fn 也是一個 RunnableCallback 型別,當任務異常退出時會呼叫其 execute 方法;第四個引數 priority 用於指定執行緒的優先順序;第五個引數 start 用於指定是否立即啟動任務。
上面的方法中有一個名為 AsyncLoopRunnable 的類,實現了 Runnable 介面,並封裝了 afn 和 kill_fn 兩個 RunnableCallback 物件,該類的 run 方法實現了整個執行緒模型的排程機制:
public void run() { if (fn == null) { LOG.error("fn==null"); throw new RuntimeException("AsyncLoopRunnable no core function "); } // 模板方法 fn.preRun(); try { while (!shutdown.get()) { // 執行自定義 callback 邏輯 fn.run(); if (shutdown.get()) { this.shutdown(); return; } Exception e = fn.error(); if (e != null) { throw e; } // 獲取睡眠時間(單位:秒) Object rtn = fn.getResult(); if (this.needQuit(rtn)) { this.shutdown(); return; } } } catch (Throwable e) { if (shutdown.get()) { this.shutdown(); } else { LOG.error("Async loop died!!!" + e.getMessage(), e); killFn.execute(e); } } }
AsyncLoopRunnable 的 run 方法會迴圈排程 RunnableCallback 的 run 方法,並在每次執行完成之後檢測當前任務是否被 shutdown、是否存在錯誤執行資訊,如果都沒有的話則會繼續呼叫AsyncLoopRunnable#needQuit
方法檢查是否需要退出當前任務,該方法會依據RunnableCallback#getResult
的返回結果決策接下去的執行方式,具體決策過程如下:
private boolean needQuit(Object rtn) { if (rtn != null) { long sleepTime = Long.parseLong(String.valueOf(rtn)); if (sleepTime < 0) { // 小於 0 則退出執行 return true; } else if (sleepTime > 0) { // 大於 0 表示要求每次執行中間休息相應的時間(單位:秒) long now = System.currentTimeMillis(); long cost = now - lastTime; long sleepMs = sleepTime * 1000 - cost; // 期望睡眠時間 - 中間消耗的時間 if (sleepMs > 0) { // 還沒有達到期望睡眠時間,繼續睡眠 JStormUtils.sleepMs(sleepMs); lastTime = System.currentTimeMillis(); } else { lastTime = now; } } } // 為 null 或者 0 都繼續執行 return false; }
整個執行緒模型的設計和實現比較簡單,但是卻很實用,推薦大家將其納入自己的工具箱。對於 storm 基礎執行緒模型的分析就到此結束,從下一篇開始我們將分三篇分別介紹 nimbus、supervisor,以及 worker 的啟動和執行機制。
(本篇完)
轉載宣告 : 版權所有,商業轉載請聯絡作者,非商業轉載請註明出處
ofollow,noindex">本部落格所有文章除特別宣告外,均採用 CC BY-NC-SA 4.0 許可協議