1. 程式人生 > >Elasticsearch原始碼分析 | 單節點的啟動和關閉

Elasticsearch原始碼分析 | 單節點的啟動和關閉

本文主要簡要介紹Elasticsearch單節點的啟動和關閉流程。Elasticsearch版本:6.3.2

相關文章

1、Google Guice 快速入門
2、Elasticsearch 中的 Guice
3、教你編譯除錯Elasticsearch 6.3.2原始碼
4、Elasticsearch 6.3.2 啟動過程

建立節點

Elasticsearch的啟動引導類為 Bootstrap 類,在建立節點 Node 物件之前,Bootstrap 會解析配置和進行一些安全檢查等

建立節點物件

environment 物件主要是解析出來的配置資訊

environment 物件

建立節點過程的主要工作是建立各個模組物件和服務物件,完成 Guice 依賴繫結

,獲取並初始化探測器。

ModulesBuilder 用於統一管理 Module

ModulesBuilder modules = new ModulesBuilder();
ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
modules.add(clusterModule);     // 將模組加入管理
//....
// 例項繫結
modules.add(b -> {
        b.bind(Node.class).toInstance(this);
        b.bind(NodeService.class).toInstance(nodeService);
        b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
        b.bind(PluginsService.class).toInstance(pluginsService);
        b.bind(Client.class).toInstance(client);
        b.bind(NodeClient.class).toInstance(client);
        b.bind(Environment.class).toInstance(this.environment);
        b.bind(ThreadPool.class).toInstance(threadPool);
        b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
        // ....
    }
);
injector = modules.createInjector();    // 生成注入器

主要的服務類簡介如下:

服務 簡介
ResourceWatcherService 通用資源監視服務
HttpServerTransport HTTP傳輸服務,提供Rest介面服務
SnapshotsService 快照服務
SnapshotShardsService 負責啟動和停止shard級快照
IndicesClusterStateService 根據收到的叢集狀態資訊,處理相關索引
Discovery 叢集拓撲管理
RoutingService 處理路由(節點之間遷移shard)
ClusterService 叢集管理服務,主要處理叢集任務,釋出叢集狀態
NodeConnectionsService 節點連線管理服務
MonitorService 提供程序級、系統級、檔案系統和JVM的監控服務
GatewayService 負責叢集元資料持久化與恢復
SearchService 處理搜尋請求
TransportService 底層傳輸服務
plugins 外掛
IndicesService 負責建立、刪除索引等索引操作

啟動節點

啟動節點的主要工作是啟動各個模組的服務物件,服務物件從注入器 injector 中取出來,然後呼叫它們的 start 方法,服務物件的 start 方法的工作基本是初始化內部資料、建立執行緒池、啟動執行緒池等,詳細的流程留到後面的文章中再介紹。

injector.getInstance(MappingUpdatedAction.class).setClient(client);
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();

在啟動 Discovery 和 ClusterService 之前,還會呼叫 validateNodeBeforeAcceptingRequests 方法來檢測環境外部,外部環境主要是JVM、作業系統相關引數,將一些影響效能的配置標記為錯誤以引起使用者的重視。

環境檢測

節點的環境檢測程式碼都封裝在 BootstrapChecks 類中,BootstrapChecks 類通過責任鏈模式對十幾個檢測項進行檢測,關於責任鏈模式可以翻看這篇文章《設計模式之責任鏈模式及典型應用

這裡的責任鏈模式中的抽象處理者由 BootstrapCheck 介面扮演,它定義了一個處理方法 check,而每個檢查項則是具體處理者,都有對應的一個靜態類,具體的檢查則在 check 介面中完成

以第一個檢查項 “堆大小檢查” 為例,從 JvmInfo 類中獲取配置的堆的初始值和最大值進行比較,不相等則格式化提示資訊,最後返回檢查結果

    static class HeapSizeCheck implements BootstrapCheck {
        @Override
        public BootstrapCheckResult check(BootstrapContext context) {
            final long initialHeapSize = getInitialHeapSize();
            final long maxHeapSize = getMaxHeapSize();
            if (initialHeapSize != 0 && maxHeapSize != 0 && initialHeapSize != maxHeapSize) {
                final String message = String.format(Locale.ROOT,
                        "initial heap size [%d] not equal to maximum heap size [%d]; " +
                                "this can cause resize pauses and prevents mlockall from locking the entire heap",
                        getInitialHeapSize(), getMaxHeapSize());
                return BootstrapCheckResult.failure(message);
            } else {
                return BootstrapCheckResult.success();
            }
        }
        long getInitialHeapSize() { 
            return JvmInfo.jvmInfo().getConfiguredInitialHeapSize();
        }
        long getMaxHeapSize() {
            return JvmInfo.jvmInfo().getConfiguredMaxHeapSize();
        }
    }

把所有檢查項的物件新增到一個 List 鏈中

    static List<BootstrapCheck> checks() {
        final List<BootstrapCheck> checks = new ArrayList<>();
        checks.add(new HeapSizeCheck());
        final FileDescriptorCheck fileDescriptorCheck
            = Constants.MAC_OS_X ? new OsXFileDescriptorCheck() : new FileDescriptorCheck();
        checks.add(fileDescriptorCheck);
        checks.add(new MlockallCheck());
        if (Constants.LINUX) {
            checks.add(new MaxNumberOfThreadsCheck());
        }
        if (Constants.LINUX || Constants.MAC_OS_X) {
            checks.add(new MaxSizeVirtualMemoryCheck());
        }
        if (Constants.LINUX || Constants.MAC_OS_X) {
            checks.add(new MaxFileSizeCheck());
        }
        if (Constants.LINUX) {
            checks.add(new MaxMapCountCheck());
        }
        checks.add(new ClientJvmCheck());
        checks.add(new UseSerialGCCheck());
        checks.add(new SystemCallFilterCheck());
        checks.add(new OnErrorCheck());
        checks.add(new OnOutOfMemoryErrorCheck());
        checks.add(new EarlyAccessCheck());
        checks.add(new G1GCCheck());
        checks.add(new AllPermissionCheck());
        return Collections.unmodifiableList(checks);
    }

for 迴圈分別呼叫 check 方法進行檢查,有些檢查項檢查不通過是可以忽略的,如果有不能忽略的錯誤則會丟擲異常

for (final BootstrapCheck check : checks) {
    final BootstrapCheck.BootstrapCheckResult result = check.check(context);
    if (result.isFailure()) {
        if (!(enforceLimits || enforceBootstrapChecks) && !check.alwaysEnforce()) {
            ignoredErrors.add(result.getMessage());
        } else {
            errors.add(result.getMessage());
        }
    }
}

那麼檢查項有哪些呢?

  • 堆大小檢查:如果開啟了bootstrap.memory_lock,則JVM在啟動時將鎖定堆的初始大小,若配置的初始值與最大值不等,堆變化後無法保證堆都鎖定在記憶體中
  • 檔案描述符檢查:ES程序需要非常多的檔案描述符,所以須配置系統的檔案描述符的最大數量 ulimit -n 65535
  • 記憶體鎖定檢查:ES允許程序只使用實體記憶體,若使用交換分割槽可能會帶來很多問題,所以最好讓ES鎖定記憶體
  • 最大執行緒數檢查:ES程序會建立很多執行緒,這個數最少需2048
  • 最大虛擬記憶體檢查
  • 最大檔案大小檢查:段檔案和事務日誌檔案可能會非常大,建議這個數設定為無限
  • 虛擬記憶體區域最大數量檢查
  • JVM Client模式檢查
  • 序列收集檢查:ES預設使用 CMS 垃圾回收器,而不是 Serial 收集器
  • 系統呼叫過濾器檢查
  • OnError與OnOutOfMemoryError檢查
  • Early-access檢查:ES最好執行在JVM的穩定版本上
  • G1GC檢查

順便一提,JvmInfo 則是利用了 JavaSDK 自帶的 ManagementFactory 類來獲取JVM資訊的,獲取的 JVM 屬性如下所示

long pid;   // 程序ID
String version; // Java版本
String vmName;  // JVM名稱
String vmVersion;   // JVM版本
String vmVendor;    // JVM開發商
long startTime;     // 啟動時間
long configuredInitialHeapSize; // 配置的堆的初始值
long configuredMaxHeapSize;     // 配置的堆的最大值
Mem mem;            // 記憶體資訊
String[] inputArguments;    // JVM啟動時輸入的引數
String bootClassPath;
String classPath;   
Map<String, String> systemProperties;   // 系統環境變數
String[] gcCollectors;
String[] memoryPools;
String onError;
String onOutOfMemoryError;
String useCompressedOops;
String useG1GC;     // 是否使用 G1 垃圾回收器
String useSerialGC; // 是否使用 Serial 垃圾回收器

keepAlive 執行緒

在啟動引導類 Bootstrap 的 start 方法中,啟動節點之後還會啟動一個 keepAlive 執行緒

private void start() throws NodeValidationException {
    node.start();
    keepAliveThread.start();
}

// CountDownLatch 初始值為 1
private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
Bootstrap() {
    keepAliveThread = new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                keepAliveLatch.await(); // 一直等待直到 CountDownLatch 減為 0
            } catch (InterruptedException e) {
                // bail out
            }
        }
    }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
    keepAliveThread.setDaemon(false);   // false 使用者執行緒
    // keep this thread alive (non daemon thread) until we shutdown
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            // 當程序收到關閉 SIGTERM 或 SIGINT 訊號時,CountDownLatch 減1 
            keepAliveLatch.countDown();
        }
    });
}

if (addShutdownHook) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            try {
                IOUtils.close(node, spawner);
                LoggerContext context = (LoggerContext) LogManager.getContext(false);
                Configurator.shutdown(context);
            } catch (IOException ex) {
                throw new ElasticsearchException("failed to stop node", ex);
            }
        }
    });
}

keepAliveThread 執行緒本身不做具體的工作。主執行緒執行完啟動流程後會退出,keepAliveThread 執行緒是唯一的使用者執行緒,作用是保持程序執行。在Java程式中,一個程序至少需要有一個使用者執行緒,當用戶執行緒為零時將退出程序。

做個試驗,將 keepAliveThread.setDaemon(false); 中的 false 改為 true,會發現Elasticsearch啟動後馬上就停止了

[2019-01-08T01:28:47,522][INFO ][o.e.n.Node               ] [1yGidog] started
[2019-01-08T01:28:47,525][INFO ][o.e.n.Node               ] [1yGidog] stopping ...

關閉節點

關閉的順序大致為:

  • 關閉快照和HTTPServer,不再響應使用者REST請求
  • 關閉叢集拓撲管理,不再響應ping請求
  • 關閉網路模組,讓節點離線
  • 執行各個外掛的關閉流程
  • 關閉IndicesService,這期間需要等待釋放的資源最多,時間最長
public static void close(final Exception ex, final Iterable<? extends Closeable> objects) throws IOException {
    Exception firstException = ex;
    for (final Closeable object : objects) {
        try {
            if (object != null) {
                object.close();
            }
        } catch (final IOException | RuntimeException e) {
            if (firstException == null) {
                firstException = e;
            } else {
                firstException.addSuppressed(e);
            }
        }
    }
    // ...
}

private Node stop() {
    if (!lifecycle.moveToStopped()) {
        return this;
    }
    Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
    logger.info("stopping ...");

    injector.getInstance(ResourceWatcherService.class).stop();
    if (NetworkModule.HTTP_ENABLED.get(settings)) {
        injector.getInstance(HttpServerTransport.class).stop();
    }

    injector.getInstance(SnapshotsService.class).stop();
    injector.getInstance(SnapshotShardsService.class).stop();
    // stop any changes happening as a result of cluster state changes
    injector.getInstance(IndicesClusterStateService.class).stop();
    // close discovery early to not react to pings anymore.
    // This can confuse other nodes and delay things - mostly if we're the master and we're running tests.
    injector.getInstance(Discovery.class).stop();
    // we close indices first, so operations won't be allowed on it
    injector.getInstance(RoutingService.class).stop();
    injector.getInstance(ClusterService.class).stop();
    injector.getInstance(NodeConnectionsService.class).stop();
    nodeService.getMonitorService().stop();
    injector.getInstance(GatewayService.class).stop();
    injector.getInstance(SearchService.class).stop();
    injector.getInstance(TransportService.class).stop();

    pluginLifecycleComponents.forEach(LifecycleComponent::stop);
    // we should stop this last since it waits for resources to get released
    // if we had scroll searchers etc or recovery going on we wait for to finish.
    injector.getInstance(IndicesService.class).stop();
    logger.info("stopped");

    return this;
}

節點的關閉當然沒那麼簡單。更多細節敬請期待。

參考:
張超.Elasticsearch原始碼解析與優化實戰

後記

歡迎評論、轉發、分享,您的支援是我最大的動力

更多內容可訪問我的個人部落格:http://laijianfeng.org

關注【小旋鋒】微信公眾號,及時接收博文推送

關注_小旋鋒_微信公眾號