1. 程式人生 > >elasticsearch原始碼分析---threadpool模組

elasticsearch原始碼分析---threadpool模組

elasticsearch的執行緒池實現在org.elasticsearch.threadpool下。

初始化過程中會載入以threadpool開頭的配置項的配置資訊,然後確定各個執行緒池的大小,預設情況下,會參照處理器個數進行設定:

        int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
        int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5);
        int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10);

其中像INDEX/BULK/GET/SUGGEST/PERCOLATOR等這些執行緒池的大小都設定成了availableProcessor

SEARCH執行緒池大小設定為availableProcessor*3

而FLUSH/SNAPSHOT/OPTIMIZE等這些執行緒池大小設定成halfProcMaxAt5

REFRESH執行緒池則設定成halpProcMaxAt10.

elasticsearch對threadpool的設定思路是相當合理的,一個核對應一個執行緒,減少context switch。而且,物件FLUSH等這些執行緒池的大小也做了限制。

根據這些設定生成executor了。

執行緒池分為cache fix scaling三類,具體設定不再解釋,可以瞭解下java的執行緒池相關知識。

除了es固定的執行緒池之外,還可以自定義執行緒池。

當然,以上預設值都可以在設定中進行更改,build方法的介面就說明了這一點:

 private ExecutorHolder build(String name, @Nullable Settings settings, Settings defaultSettings) {
        return rebuild(name, null, settings, defaultSettings);
    }
其中defaultSetting是上邊執行緒池的預設設定,而settinng則是從配置檔案中載入的資訊,最終會以setting的設定為準。

build會呼叫rebuild函式去建立ExecutorHolder,其中SAME的thread是不能更改的,然後依次對cache fix scaling進行對應的設定。

從rebuild的函式介面就可以看出,rebuild不僅僅用於建立(第二個引數設定為null),還用於更新執行緒池的資訊。

stat會得到當前各個執行緒池的統計資訊。

除了以上固有執行緒,還註冊了一個定期執行器:

 this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
可以通過以下介面在其他模組註冊執行的物件:
 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) {
        return scheduler.scheduleWithFixedDelay(new LoggingRunnable(command), interval.millis(), interval.millis(), TimeUnit.MILLISECONDS);
    }
比如在SearchService模組中,有如下注冊程式碼:
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval);

總體來看,elasticsearch提供的執行緒池模組是對java threadpool的進一步封裝,如果對java的threadpool比較熟悉的話,這個模組還是比較容易理解。