1. 程式人生 > >ES系列(一):編譯準備與server啟動過程解析

ES系列(一):編譯準備與server啟動過程解析

  ES作為強大的和流行的搜尋引擎服務元件,為我們提供了方便的和高效能的搜尋服務。在實際應用中也是用得比較爽,但如果能夠更深入一點。雖然網上有許多的文章已經完整說明,ES是如何如何做到高效能,如何做到高可用的,以及有許多的避坑指南。那些,畢竟還是太描述化。

  就讓我們以原始碼作為出發點,一探ES究竟吧,雖然也可能是片面的。

 

1. ES編譯及準備

  害,其實我們不想搞編譯。一個是意義不大;二個是ES是用java編寫的,打包後本質上它就是一個war包或者jar包;三個是編譯需要拉取外部的許多jar包依賴,而這些依賴又是在國外網站速度又是超級慢。

  簡單的,直接在es官網下載個安裝包就可以了。https://www.elastic.co/cn/downloads/elasticsearch  。 這是實際應用的通常路徑,但不是我們學習的路徑。

  如果要自己編譯,也可以,直接下載原始碼包: https://github.com/elastic/elasticsearch.git  ; 下載gradle: https://gradle.org/releases/ ; 安裝jdk11+...

  安裝, 直接切到elasticsearch 原始碼目錄執行: gradlew idea ; 即可。

  當然,中途你肯定會遇到許多問題,一般主要就是網路問題。主要就是裡面依賴了許多國外網站的資源,可以進行修改:

# 搜尋所有 repositories { , 將其網址替換為 aliyun 的地址,如下:
repositories {
//  jcenter()
  maven { url 'https://maven.aliyun.com/repository/gradle-plugin' }
  maven { url 'https://maven.aliyun.com/repository/google' }
  maven { url 'https://maven.aliyun.com/nexus/content/groups/public/' }
  maven { url 'https://maven.aliyun.com/repository/jcenter'}
}

  重新執行 gradle idea, 即可。如果再失敗,就換個快的網路,重試,直到成功。

  我們也可以直接將原始碼導致idea中,直接用ide進行編譯即可。同樣,需要替換相應的依賴地址。匯入完成後,就可以看到整個gradle的目錄了。如下:

 

2. ES server的啟動流程

 

  當環境準備好了,我們就可以順利進入正題了。理論上,一個應用的啟動流程都不會很複雜,我們就大致瞅瞅吧。

 

2.1. Elasticsearch入口類的作用

  因為入口類為 Elasticsearch, 所以通過入口類,就可以知道它是如何開始,它是否承擔了所有的工作,以及將下文都交給了誰。必然需要直接定位到入口:Elasticsearch#main

    // org.elasticsearch.bootstrap.Elasticsearch#main
    /**
     * Main entry point for starting elasticsearch
     */
    public static void main(final String[] args) throws Exception {
        overrideDnsCachePolicyProperties();
        /*
         * We want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the
         * presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy). This
         * forces such policies to take effect immediately.
         */
        System.setSecurityManager(new SecurityManager() {

            @Override
            public void checkPermission(Permission perm) {
                // grant all permissions so that we can later set the security manager to the one that we want
            }

        });
        LogConfigurator.registerErrorListener();
        final Elasticsearch elasticsearch = new Elasticsearch();
        // 看起來是轉移到另一個 main() 方法了
        int status = main(args, elasticsearch, Terminal.DEFAULT);
        // 如果執行未返回OK, 則說明發生了異常,直接結束JVM。 否則 es 程序將被繼續後臺執行
        if (status != ExitCodes.OK) {
            final String basePath = System.getProperty("es.logs.base_path");
            // It's possible to fail before logging has been configured, in which case there's no point
            // suggesting that the user look in the log file.
            if (basePath != null) {
                Terminal.DEFAULT.errorPrintln(
                    "ERROR: Elasticsearch did not exit normally - check the logs at "
                        + basePath
                        + System.getProperty("file.separator")
                        + System.getProperty("es.logs.cluster_name") + ".log"
                );
            }
            exit(status);
        }
    }

    static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {
        return elasticsearch.main(args, terminal);
    }

  入口比較簡單,但好像啥也看不出來。但大致就是例項化一個  Elasticsearch, 然後呼叫其main() 方法。這樣做有什麼好處呢?這樣就可以用 Elasticsearch 中定義的變量了,而不只是呼叫其靜態方法和變量了。

  我們先來看一下 Elasticsearch 的類繼承圖:Elasticsearch extends EnvironmentAwareCommand extends Command implements Closeable

  Elasticsearch 的類構造裡面也做了一些事情,實際上是增加幾個變數的解析規則:

    // visible for testing
    Elasticsearch() {
        // beforeMain 為空方法
        super("Starts Elasticsearch", () -> {}); // we configure logging later so we override the base class from configuring logging
        versionOption = parser.acceptsAll(Arrays.asList("V", "version"),
            "Prints Elasticsearch version information and exits");
        daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"),
            "Starts Elasticsearch in the background")
            .availableUnless(versionOption);
        pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"),
            "Creates a pid file in the specified path on start")
            .availableUnless(versionOption)
            .withRequiredArg()
            .withValuesConvertedBy(new PathConverter());
        quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"),
            "Turns off standard output/error streams logging in console")
            .availableUnless(versionOption)
            .availableUnless(daemonizeOption);
    }

  而 elasticsearch.main() 則是呼叫的Command中定義的通用方法,主要目的在於使用一般的命令執行模板方法。整個 Elasticsearch 類可以看作是啟動的門面類,它會很多的準備和驗證工作。比如建立配置上下文,驗證命令引數等等。所以通過它的執行,我們理解到,大體上需要關注什麼引數,以及可能使用者會遇到的報錯情況。

    // org.elasticsearch.cli.Command#main
    /** Parses options for this command from args and executes it. */
    public final int main(String[] args, Terminal terminal) throws Exception {
        if (addShutdownHook()) {
            // 新增關閉鉤子,做一些資源的關閉,避免資料損壞或丟失,但實際上此處為空執行
            shutdownHookThread = new Thread(() -> {
                try {
                    this.close();
                } catch (final IOException e) {
                    try (
                        StringWriter sw = new StringWriter();
                        PrintWriter pw = new PrintWriter(sw)) {
                        e.printStackTrace(pw);
                        terminal.errorPrintln(sw.toString());
                    } catch (final IOException impossible) {
                        // StringWriter#close declares a checked IOException from the Closeable interface but the Javadocs for StringWriter
                        // say that an exception here is impossible
                        throw new AssertionError(impossible);
                    }
                }
            });
            Runtime.getRuntime().addShutdownHook(shutdownHookThread);
        }
        // 此處將被執行空轉
        beforeMain.run();

        try {
            // 同樣是 Command 的私有實現
            mainWithoutErrorHandling(args, terminal);
        } catch (OptionException e) {
            // print help to stderr on exceptions
            printHelp(terminal, true);
            terminal.errorPrintln(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
            return ExitCodes.USAGE;
        } catch (UserException e) {
            if (e.exitCode == ExitCodes.USAGE) {
                printHelp(terminal, true);
            }
            if (e.getMessage() != null) {
                terminal.errorPrintln(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
            }
            // 異常返回
            return e.exitCode;
        }
        // 正常情況下都返回 OK
        return ExitCodes.OK;
    }

    /**
     * Executes the command, but all errors are thrown.
     */
    void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {
        // 命令列引數解析
        final OptionSet options = parser.parse(args);
        // -h 列印幫助文件
        if (options.has(helpOption)) {
            printHelp(terminal, false);
            return;
        }

        if (options.has(silentOption)) {
            terminal.setVerbosity(Terminal.Verbosity.SILENT);
        } else if (options.has(verboseOption)) {
            terminal.setVerbosity(Terminal.Verbosity.VERBOSE);
        } else {
            terminal.setVerbosity(Terminal.Verbosity.NORMAL);
        }
        // 回調回 Elasticsearch, 先到 EnvironmentAwareCommand
        execute(terminal, options);
    }
    
    // org.elasticsearch.cli.EnvironmentAwareCommand#execute(org.elasticsearch.cli.Terminal, joptsimple.OptionSet)
    @Override
    protected void execute(Terminal terminal, OptionSet options) throws Exception {
        final Map<String, String> settings = new HashMap<>();
        for (final KeyValuePair kvp : settingOption.values(options)) {
            // 不能存在空值引數
            if (kvp.value.isEmpty()) {
                throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty");
            }
            // 不能存在重複的引數
            if (settings.containsKey(kvp.key)) {
                final String message = String.format(
                        Locale.ROOT,
                        "setting [%s] already set, saw [%s] and [%s]",
                        kvp.key,
                        settings.get(kvp.key),
                        kvp.value);
                throw new UserException(ExitCodes.USAGE, message);
            }
            settings.put(kvp.key, kvp.value);
        }

        putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
        putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
        putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");
        // 回撥 Elasticsearch 的 execute 實現
        execute(terminal, options, createEnv(settings));
    }

    // org.elasticsearch.bootstrap.Elasticsearch#execute
    @Override
    protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
        if (options.nonOptionArguments().isEmpty() == false) {
            throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " + options.nonOptionArguments());
        }
        // 版本列印
        if (options.has(versionOption)) {
            final String versionOutput = String.format(
                Locale.ROOT,
                "Version: %s, Build: %s/%s/%s/%s, JVM: %s",
                Build.CURRENT.getQualifiedVersion(),
                Build.CURRENT.flavor().displayName(),
                Build.CURRENT.type().displayName(),
                Build.CURRENT.hash(),
                Build.CURRENT.date(),
                JvmInfo.jvmInfo().version()
            );
            terminal.println(versionOutput);
            return;
        }

        final boolean daemonize = options.has(daemonizeOption);
        final Path pidFile = pidfileOption.value(options);
        final boolean quiet = options.has(quietOption);

        // a misconfigured java.io.tmpdir can cause hard-to-diagnose problems later, so reject it immediately
        try {
            env.validateTmpFile();
        } catch (IOException e) {
            throw new UserException(ExitCodes.CONFIG, e.getMessage());
        }

        try {
            // 初始化
            init(daemonize, pidFile, quiet, env);
        } catch (NodeValidationException e) {
            throw new UserException(ExitCodes.CONFIG, e.getMessage());
        }
    }
    // Elasticsearch.init
    void init(final boolean daemonize, final Path pidFile, final boolean quiet, Environment initialEnv)
        throws NodeValidationException, UserException {
        try {
            Bootstrap.init(!daemonize, pidFile, quiet, initialEnv);
        } catch (BootstrapException | RuntimeException e) {
            // format exceptions to the console in a special way
            // to avoid 2MB stacktraces from guice, etc.
            throw new StartupException(e);
        }
    }

  以上,就是 Elasticsearch 類的使命了。兩個重點:1. 建立配置環境上下文; 2. 驗證傳入命令引數的合法性; 3. 提交啟動命令給到 Bootstrap 類。

 

2.2. Bootstrap啟動流程解析

  從表面意思來看,Bootstrap 更像是啟動工作的實力擔當。接過 Elasticsearch 類的配置上下文資訊,Bootstrap 又如何展開進一步的工作呢?我們一起來看下。

    // 它是以靜態方法 init() 作為切入點
    // org.elasticsearch.bootstrap.Bootstrap#init
    /**
     * This method is invoked by {@link Elasticsearch#main(String[])} to startup elasticsearch.
     */
    static void init(
            final boolean foreground,
            final Path pidFile,
            final boolean quiet,
            final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
        // force the class initializer for BootstrapInfo to run before
        // the security manager is installed
        BootstrapInfo.init();

        INSTANCE = new Bootstrap();

        final SecureSettings keystore = loadSecureSettings(initialEnv);
        final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile());

        // the LogConfigurator will replace System.out and System.err with redirects to our logfile, so we need to capture
        // the stream objects before calling LogConfigurator to be able to close them when appropriate
        final Runnable sysOutCloser = getSysOutCloser();
        final Runnable sysErrorCloser = getSysErrorCloser();

        LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));
        try {
            LogConfigurator.configure(environment);
        } catch (IOException e) {
            throw new BootstrapException(e);
        }
        if (environment.pidFile() != null) {
            try {
                PidFile.create(environment.pidFile(), true);
            } catch (IOException e) {
                throw new BootstrapException(e);
            }
        }


        try {
            final boolean closeStandardStreams = (foreground == false) || quiet;
            if (closeStandardStreams) {
                final Logger rootLogger = LogManager.getRootLogger();
                final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
                if (maybeConsoleAppender != null) {
                    Loggers.removeAppender(rootLogger, maybeConsoleAppender);
                }
                sysOutCloser.run();
            }

            // fail if somebody replaced the lucene jars
            checkLucene();

            // install the default uncaught exception handler; must be done before security is
            // initialized as we do not want to grant the runtime permission
            // setDefaultUncaughtExceptionHandler
            Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());
            // 檢查環境
            INSTANCE.setup(true, environment);

            try {
                // any secure settings must be read during node construction
                IOUtils.close(keystore);
            } catch (IOException e) {
                throw new BootstrapException(e);
            }
            // 啟動服務
            INSTANCE.start();

            // We don't close stderr if `--quiet` is passed, because that
            // hides fatal startup errors. For example, if Elasticsearch is
            // running via systemd, the init script only specifies
            // `--quiet`, not `-d`, so we want users to be able to see
            // startup errors via journalctl.
            if (foreground == false) {
                sysErrorCloser.run();
            }

        } catch (NodeValidationException | RuntimeException e) {
            // disable console logging, so user does not see the exception twice (jvm will show it already)
            final Logger rootLogger = LogManager.getRootLogger();
            final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
            if (foreground && maybeConsoleAppender != null) {
                Loggers.removeAppender(rootLogger, maybeConsoleAppender);
            }
            Logger logger = LogManager.getLogger(Bootstrap.class);
            // HACK, it sucks to do this, but we will run users out of disk space otherwise
            if (e instanceof CreationException) {
                // guice: log the shortened exc to the log file
                ByteArrayOutputStream os = new ByteArrayOutputStream();
                PrintStream ps = null;
                try {
                    ps = new PrintStream(os, false, "UTF-8");
                } catch (UnsupportedEncodingException uee) {
                    assert false;
                    e.addSuppressed(uee);
                }
                new StartupException(e).printStackTrace(ps);
                ps.flush();
                try {
                    logger.error("Guice Exception: {}", os.toString("UTF-8"));
                } catch (UnsupportedEncodingException uee) {
                    assert false;
                    e.addSuppressed(uee);
                }
            } else if (e instanceof NodeValidationException) {
                logger.error("node validation exception\n{}", e.getMessage());
            } else {
                // full exception
                logger.error("Exception", e);
            }
            // re-enable it if appropriate, so they can see any logging during the shutdown process
            if (foreground && maybeConsoleAppender != null) {
                Loggers.addAppender(rootLogger, maybeConsoleAppender);
            }

            throw e;
        }
    }

  以上就是BootStrap的啟動框架了。大體分為幾步:

    1. 例項化BootStrap類到INSTANCE中;
    2. 讀取密碼等安全資訊;
    3. 重新建立自己的環境上下文,主要是為加入更多配置如密碼資訊;
    4. 載入日誌例項;
    5. 建立pid;
    6. 檢查lucene版本資訊避免jar包被替換導致的異常;
    7. Bootstrap進行準備工作;
    8. Bootstrap進行啟動工作;
    9. 啟動完成;

  可見,整個框架還是很清晰的,但是又有一種意猶未盡的感覺。那是自然,因為框架只會有大概思路,並不會給你打通任督發二脈。除去一些檢查性的工作,其中的核心是的準備工作和啟動工作。下面細細分解下。

 

2.3. Bootstrap準備工作詳解

  上節說的兩個重點之一:Bootstrap準備,都需要準備啥呢?

    // org.elasticsearch.bootstrap.Bootstrap#setup
    private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
        Settings settings = environment.settings();

        try {
            // 載入外部模組
            spawner.spawnNativeControllers(environment, true);
        } catch (IOException e) {
            throw new BootstrapException(e);
        }
        // 初始化本地一些資源資訊
        initializeNatives(
                environment.tmpFile(),
                BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),
                BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),
                BootstrapSettings.CTRLHANDLER_SETTING.get(settings));

        // initialize probes before the security manager is installed
        // 初始化各需要的探針, 保證例項載入可用
        /**
        static void initializeProbes() {
            // Force probes to be loaded
            ProcessProbe.getInstance();
            OsProbe.getInstance();
            JvmInfo.jvmInfo();
        }
        */
        initializeProbes();
        // 關閉鉤子
        if (addShutdownHook) {
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    try {
                        IOUtils.close(node, spawner);
                        LoggerContext context = (LoggerContext) LogManager.getContext(false);
                        Configurator.shutdown(context);
                        if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) {
                            throw new IllegalStateException("Node didn't stop within 10 seconds. " +
                                    "Any outstanding requests or tasks might get killed.");
                        }
                    } catch (IOException ex) {
                        throw new ElasticsearchException("failed to stop node", ex);
                    } catch (InterruptedException e) {
                        LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
                        Thread.currentThread().interrupt();
                    }
                }
            });
        }

        try {
            // look for jar hell
            // 檢查重複類
            final Logger logger = LogManager.getLogger(JarHell.class);
            JarHell.checkJarHell(logger::debug);
        } catch (IOException | URISyntaxException e) {
            throw new BootstrapException(e);
        }

        // Log ifconfig output before SecurityManager is installed
        // 列印 ifconfig 輸出資訊
        IfConfig.logIfNecessary();

        // install SM after natives, shutdown hooks, etc.
        try {
            // 配置 SecurityManager
            Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
        } catch (IOException | NoSuchAlgorithmException e) {
            throw new BootstrapException(e);
        }
        // 終於例項化節點了
        node = new Node(environment) {
            @Override
            protected void validateNodeBeforeAcceptingRequests(
                final BootstrapContext context,
                final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
                // 將檢查節點功能委託給 Bootstrap 處理
                BootstrapChecks.check(context, boundTransportAddress, checks);
            }
        };
    }

    // org.elasticsearch.bootstrap.Spawner#spawnNativeControllers
    /**
     * Spawns the native controllers for each module.
     *
     * @param environment The node environment
     * @param inheritIo   Should the stdout and stderr of the spawned process inherit the
     *                    stdout and stderr of the JVM spawning it?
     * @throws IOException if an I/O error occurs reading the module or spawning a native process
     */
    void spawnNativeControllers(final Environment environment, final boolean inheritIo) throws IOException {
        if (spawned.compareAndSet(false, true) == false) {
            throw new IllegalStateException("native controllers already spawned");
        }
        if (Files.exists(environment.modulesFile()) == false) {
            throw new IllegalStateException("modules directory [" + environment.modulesFile() + "] not found");
        }
        /*
         * For each module, attempt to spawn the controller daemon. Silently ignore any module that doesn't include a controller for the
         * correct platform.
         */
        // plugin 目錄列舉
        List<Path> paths = PluginsService.findPluginDirs(environment.modulesFile());
        for (final Path modules : paths) {
            // 讀取 plugin-descriptor.properties 資訊
            final PluginInfo info = PluginInfo.readFromProperties(modules);
            final Path spawnPath = Platforms.nativeControllerPath(modules);
            if (Files.isRegularFile(spawnPath) == false) {
                continue;
            }
            if (info.hasNativeController() == false) {
                final String message = String.format(
                    Locale.ROOT,
                    "module [%s] does not have permission to fork native controller",
                    modules.getFileName());
                throw new IllegalArgumentException(message);
            }
            // 啟動外掛的 controller 程序
            final Process process = spawnNativeController(spawnPath, environment.tmpFile(), inheritIo);
            processes.add(process);
        }
    }
    // org.elasticsearch.bootstrap.Spawner#spawnNativeController
    /**
     * Attempt to spawn the controller daemon for a given module. The spawned process will remain connected to this JVM via its stdin,
     * stdout, and stderr streams, but the references to these streams are not available to code outside this package.
     */
    private Process spawnNativeController(final Path spawnPath, final Path tmpPath, final boolean inheritIo) throws IOException {
        final String command;
        if (Constants.WINDOWS) {
            /*
             * We have to get the short path name or starting the process could fail due to max path limitations. The underlying issue here
             * is that starting the process on Windows ultimately involves the use of CreateProcessW. CreateProcessW has a limitation that
             * if its first argument (the application name) is null, then its second argument (the command line for the process to start) is
             * restricted in length to 260 characters (cf. https://msdn.microsoft.com/en-us/library/windows/desktop/ms682425.aspx). Since
             * this is exactly how the JDK starts the process on Windows (cf.
             * http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/windows/native/java/lang/ProcessImpl_md.c#l319), this
             * limitation is in force. As such, we use the short name to avoid any such problems.
             */
            command = Natives.getShortPathName(spawnPath.toString());
        } else {
            command = spawnPath.toString();
        }
        final ProcessBuilder pb = new ProcessBuilder(command);

        // the only environment variable passes on the path to the temporary directory
        pb.environment().clear();
        pb.environment().put("TMPDIR", tmpPath.toString());

        // The process _shouldn't_ write any output via its stdout or stderr, but if it does then
        // it will block if nothing is reading that output. To avoid this we can inherit the
        // JVM's stdout and stderr (which are redirected to files in standard installations).
        if (inheritIo) {
            pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
            pb.redirectError(ProcessBuilder.Redirect.INHERIT);
        }

        // the output stream of the process object corresponds to the daemon's stdin
        return pb.start();
    }

  讀取配置檔案細節,感興趣的同學可以深入檢視下,主要是具體解析哪些變數的問題。

    // 1. 解析 plugin-descriptor.properties 檔案
    // org.elasticsearch.plugins.PluginInfo#readFromProperties
    /**
     * Reads the plugin descriptor file.
     *
     * @param path           the path to the root directory for the plugin
     * @return the plugin info
     * @throws IOException if an I/O exception occurred reading the plugin descriptor
     */
    public static PluginInfo readFromProperties(final Path path) throws IOException {
        final Path descriptor = path.resolve(ES_PLUGIN_PROPERTIES);

        final Map<String, String> propsMap;
        {
            final Properties props = new Properties();
            try (InputStream stream = Files.newInputStream(descriptor)) {
                props.load(stream);
            }
            propsMap = props.stringPropertyNames().stream().collect(Collectors.toMap(Function.identity(), props::getProperty));
        }

        final String name = propsMap.remove("name");
        if (name == null || name.isEmpty()) {
            throw new IllegalArgumentException(
                    "property [name] is missing in [" + descriptor + "]");
        }
        final String description = propsMap.remove("description");
        if (description == null) {
            throw new IllegalArgumentException(
                    "property [description] is missing for plugin [" + name + "]");
        }
        final String version = propsMap.remove("version");
        if (version == null) {
            throw new IllegalArgumentException(
                    "property [version] is missing for plugin [" + name + "]");
        }

        final String esVersionString = propsMap.remove("elasticsearch.version");
        if (esVersionString == null) {
            throw new IllegalArgumentException(
                    "property [elasticsearch.version] is missing for plugin [" + name + "]");
        }
        final Version esVersion = Version.fromString(esVersionString);
        final String javaVersionString = propsMap.remove("java.version");
        if (javaVersionString == null) {
            throw new IllegalArgumentException(
                    "property [java.version] is missing for plugin [" + name + "]");
        }
        JarHell.checkVersionFormat(javaVersionString);

        final String extendedString = propsMap.remove("extended.plugins");
        final List<String> extendedPlugins;
        if (extendedString == null) {
            extendedPlugins = Collections.emptyList();
        } else {
            extendedPlugins = Arrays.asList(Strings.delimitedListToStringArray(extendedString, ","));
        }

        final boolean hasNativeController = parseBooleanValue(name, "has.native.controller", propsMap.remove("has.native.controller"));

        final PluginType type = getPluginType(name, propsMap.remove("type"));

        final String classname = getClassname(name, type, propsMap.remove("classname"));

        final String javaOpts = propsMap.remove("java.opts");

        if (type != PluginType.BOOTSTRAP && Strings.isNullOrEmpty(javaOpts) == false) {
            throw new IllegalArgumentException(
                "[java.opts] can only have a value when [type] is set to [bootstrap] for plugin [" + name + "]"
            );
        }

        boolean isLicensed = parseBooleanValue(name, "licensed", propsMap.remove("licensed"));

        if (propsMap.isEmpty() == false) {
            throw new IllegalArgumentException("Unknown properties for plugin [" + name + "] in plugin descriptor: " + propsMap.keySet());
        }

        return new PluginInfo(name, description, version, esVersion, javaVersionString,
                              classname, extendedPlugins, hasNativeController, type, javaOpts, isLicensed);
    }
    // 2. 讀取controller路徑資訊
    /**
     * The path to the native controller for a plugin with native components.
     */
    public static Path nativeControllerPath(Path plugin) {
        if (Constants.MAC_OS_X) {
            return plugin
                .resolve("platform")
                .resolve(PLATFORM_NAME)
                .resolve(PROGRAM_NAME + ".app")
                .resolve("Contents")
                .resolve("MacOS")
                .resolve(PROGRAM_NAME);
        }
        // 根據系統平臺載入不同檔案,如windows為 controller.exe, 其他為 contoller
        return plugin
                .resolve("platform")
                .resolve(PLATFORM_NAME)
                .resolve("bin")
                .resolve(PROGRAM_NAME);
    }
View Code

  接下來是本地資源的初始化過程:

    // org.elasticsearch.bootstrap.Bootstrap#initializeNatives
    /** initialize native resources */
    public static void initializeNatives(Path tmpFile, boolean mlockAll, boolean systemCallFilter, boolean ctrlHandler) {
        final Logger logger = LogManager.getLogger(Bootstrap.class);

        // check if the user is running as root, and bail
        // 檢查是否是用root執行,windows忽略,linux上通過native方法 JNACLibrary.geteuid() == 0 來判定
        if (Natives.definitelyRunningAsRoot()) {
            throw new RuntimeException("can not run elasticsearch as root");
        }

        // enable system call filter
        // 如啟動 linux 的 linuxImpl(), 
        if (systemCallFilter) {
            Natives.tryInstallSystemCallFilter(tmpFile);
        }

        // mlockall if requested
        if (mlockAll) {
            if (Constants.WINDOWS) {
               Natives.tryVirtualLock();
            } else {
               Natives.tryMlockall();
            }
        }

        // listener for windows close event
        if (ctrlHandler) {
            Natives.addConsoleCtrlHandler(new ConsoleCtrlHandler() {
                @Override
                public boolean handle(int code) {
                    if (CTRL_CLOSE_EVENT == code) {
                        logger.info("running graceful exit on windows");
                        try {
                            Bootstrap.stop();
                        } catch (IOException e) {
                            throw new ElasticsearchException("failed to stop node", e);
                        }
                        return true;
                    }
                    return false;
                }
            });
        }

        // force remainder of JNA to be loaded (if available).
        try {
            /**
             * 
    private static final class Holder {
        private static final JNAKernel32Library instance = new JNAKernel32Library();
    }
             */
            JNAKernel32Library.getInstance();
        } catch (Exception ignored) {
            // we've already logged this.
        }

        Natives.trySetMaxNumberOfThreads();
        Natives.trySetMaxSizeVirtualMemory();
        Natives.trySetMaxFileSize();

        // init lucene random seed. it will use /dev/urandom where available:
        StringHelper.randomId();
    }

  整個過程如其方法名所示,初始化native的一系列支援性資源,實際上就是測試該執行平臺上的各裝置,是否可用,以預熱處理。

  以下是linux平臺測試filter/mlockAll過程細節速覽,詳細可展開。(需JNA支援)

    // org.elasticsearch.bootstrap.SystemCallFilter#init
    /**
     * Attempt to drop the capability to execute for the process.
     * <p>
     * This is best effort and OS and architecture dependent. It may throw any Throwable.
     * @return 0 if we can do this for application threads, 1 for the entire process
     */
    static int init(Path tmpFile) throws Exception {
        if (Constants.LINUX) {
            return linuxImpl();
        } else if (Constants.MAC_OS_X) {
            // try to enable both mechanisms if possible
            bsdImpl();
            macImpl(tmpFile);
            return 1;
        } else if (Constants.SUN_OS) {
            solarisImpl();
            return 1;
        } else if (Constants.FREE_BSD || OPENBSD) {
            bsdImpl();
            return 1;
        } else if (Constants.WINDOWS) {
            windowsImpl();
            return 1;
        } else {
            throw new UnsupportedOperationException("syscall filtering not supported for OS: '" + Constants.OS_NAME + "'");
        }
    }
    // org.elasticsearch.bootstrap.SystemCallFilter#linuxImpl
    /** try to install our BPF filters via seccomp() or prctl() to block execution */
    private static int linuxImpl() {
        // first be defensive: we can give nice errors this way, at the very least.
        // also, some of these security features get backported to old versions, checking kernel version here is a big no-no!
        final Arch arch = ARCHITECTURES.get(Constants.OS_ARCH);
        boolean supported = Constants.LINUX && arch != null;
        if (supported == false) {
            throw new UnsupportedOperationException("seccomp unavailable: '" + Constants.OS_ARCH + "' architecture unsupported");
        }

        // we couldn't link methods, could be some really ancient kernel (e.g. < 2.1.57) or some bug
        if (linux_libc == null) {
            throw new UnsupportedOperationException("seccomp unavailable: could not link methods. requires kernel 3.5+ " +
                "with CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER compiled in");
        }

        // try to check system calls really are who they claim
        // you never know (e.g. https://chromium.googlesource.com/chromium/src.git/+/master/sandbox/linux/seccomp-bpf/sandbox_bpf.cc#57)
        final int bogusArg = 0xf7a46a5c;

        // test seccomp(BOGUS)
        long ret = linux_syscall(arch.seccomp, bogusArg);
        if (ret != -1) {
            throw new UnsupportedOperationException("seccomp unavailable: seccomp(BOGUS_OPERATION) returned " + ret);
        } else {
            int errno = Native.getLastError();
            switch (errno) {
                case ENOSYS: break; // ok
                case EINVAL: break; // ok
                default: throw new UnsupportedOperationException("seccomp(BOGUS_OPERATION): " + JNACLibrary.strerror(errno));
            }
        }

        // test seccomp(VALID, BOGUS)
        ret = linux_syscall(arch.seccomp, SECCOMP_SET_MODE_FILTER, bogusArg);
        if (ret != -1) {
            throw new UnsupportedOperationException("seccomp unavailable: seccomp(SECCOMP_SET_MODE_FILTER, BOGUS_FLAG) returned " + ret);
        } else {
            int errno = Native.getLastError();
            switch (errno) {
                case ENOSYS: break; // ok
                case EINVAL: break; // ok
                default: throw new UnsupportedOperationException("seccomp(SECCOMP_SET_MODE_FILTER, BOGUS_FLAG): "
                                                                 + JNACLibrary.strerror(errno));
            }
        }

        // test prctl(BOGUS)
        ret = linux_prctl(bogusArg, 0, 0, 0, 0);
        if (ret != -1) {
            throw new UnsupportedOperationException("seccomp unavailable: prctl(BOGUS_OPTION) returned " + ret);
        } else {
            int errno = Native.getLastError();
            switch (errno) {
                case ENOSYS: break; // ok
                case EINVAL: break; // ok
                default: throw new UnsupportedOperationException("prctl(BOGUS_OPTION): " + JNACLibrary.strerror(errno));
            }
        }

        // now just normal defensive checks

        // check for GET_NO_NEW_PRIVS
        switch (linux_prctl(PR_GET_NO_NEW_PRIVS, 0, 0, 0, 0)) {
            case 0: break; // not yet set
            case 1: break; // already set by caller
            default:
                int errno = Native.getLastError();
                if (errno == EINVAL) {
                    // friendly error, this will be the typical case for an old kernel
                    throw new UnsupportedOperationException("seccomp unavailable: requires kernel 3.5+ with" +
                                                            " CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER compiled in");
                } else {
                    throw new UnsupportedOperationException("prctl(PR_GET_NO_NEW_PRIVS): " + JNACLibrary.strerror(errno));
                }
        }
        // check for SECCOMP
        switch (linux_prctl(PR_GET_SECCOMP, 0, 0, 0, 0)) {
            case 0: break; // not yet set
            case 2: break; // already in filter mode by caller
            default:
                int errno = Native.getLastError();
                if (errno == EINVAL) {
                    throw new UnsupportedOperationException("seccomp unavailable: CONFIG_SECCOMP not compiled into kernel," +
                                                            " CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER are needed");
                } else {
                    throw new UnsupportedOperationException("prctl(PR_GET_SECCOMP): " + JNACLibrary.strerror(errno));
                }
        }
        // check for SECCOMP_MODE_FILTER
        if (linux_prctl(PR_SET_SECCOMP, SECCOMP_MODE_FILTER, 0, 0, 0) != 0) {
            int errno = Native.getLastError();
            switch (errno) {
                case EFAULT: break; // available
                case EINVAL: throw new UnsupportedOperationException("seccomp unavailable: CONFIG_SECCOMP_FILTER not" +
                    " compiled into kernel, CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER are needed");
                default: throw new UnsupportedOperationException("prctl(PR_SET_SECCOMP): " + JNACLibrary.strerror(errno));
            }
        }

        // ok, now set PR_SET_NO_NEW_PRIVS, needed to be able to set a seccomp filter as ordinary user
        if (linux_prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0) != 0) {
            throw new UnsupportedOperationException("prctl(PR_SET_NO_NEW_PRIVS): " + JNACLibrary.strerror(Native.getLastError()));
        }

        // check it worked
        if (linux_prctl(PR_GET_NO_NEW_PRIVS, 0, 0, 0, 0) != 1) {
            throw new UnsupportedOperationException("seccomp filter did not really succeed: prctl(PR_GET_NO_NEW_PRIVS): " +
                                                    JNACLibrary.strerror(Native.getLastError()));
        }

        // BPF installed to check arch, limit, then syscall.
        // See https://www.kernel.org/doc/Documentation/prctl/seccomp_filter.txt for details.
        SockFilter insns[] = {
          /* 1  */ BPF_STMT(BPF_LD  + BPF_W   + BPF_ABS, SECCOMP_DATA_ARCH_OFFSET),             //
          /* 2  */ BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K,   arch.audit,     0, 7),                 // if (arch != audit) goto fail;
          /* 3  */ BPF_STMT(BPF_LD  + BPF_W   + BPF_ABS, SECCOMP_DATA_NR_OFFSET),               //
          /* 4  */ BPF_JUMP(BPF_JMP + BPF_JGT + BPF_K,   arch.limit,     5, 0),                 // if (syscall > LIMIT) goto fail;
          /* 5  */ BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K,   arch.fork,      4, 0),                 // if (syscall == FORK) goto fail;
          /* 6  */ BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K,   arch.vfork,     3, 0),                 // if (syscall == VFORK) goto fail;
          /* 7  */ BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K,   arch.execve,    2, 0),                 // if (syscall == EXECVE) goto fail;
          /* 8  */ BPF_JUMP(BPF_JMP + BPF_JEQ + BPF_K,   arch.execveat,  1, 0),                 // if (syscall == EXECVEAT) goto fail;
          /* 9  */ BPF_STMT(BPF_RET + BPF_K, SECCOMP_RET_ALLOW),                                // pass: return OK;
          /* 10 */ BPF_STMT(BPF_RET + BPF_K, SECCOMP_RET_ERRNO | (EACCES & SECCOMP_RET_DATA)),  // fail: return EACCES;
        };
        // seccomp takes a long, so we pass it one explicitly to keep the JNA simple
        SockFProg prog = new SockFProg(insns);
        prog.write();
        long pointer = Pointer.nativeValue(prog.getPointer());

        int method = 1;
        // install filter, if this works, after this there is no going back!
        // first try it with seccomp(SECCOMP_SET_MODE_FILTER), falling back to prctl()
        if (linux_syscall(arch.seccomp, SECCOMP_SET_MODE_FILTER, SECCOMP_FILTER_FLAG_TSYNC, new NativeLong(pointer)) != 0) {
            method = 0;
            int errno1 = Native.getLastError();
            if (logger.isDebugEnabled()) {
                logger.debug("seccomp(SECCOMP_SET_MODE_FILTER): {}, falling back to prctl(PR_SET_SECCOMP)...",
                             JNACLibrary.strerror(errno1));
            }
            if (linux_prctl(PR_SET_SECCOMP, SECCOMP_MODE_FILTER, pointer, 0, 0) != 0) {
                int errno2 = Native.getLastError();
                throw new UnsupportedOperationException("seccomp(SECCOMP_SET_MODE_FILTER): " + JNACLibrary.strerror(errno1) +
                                                        ", prctl(PR_SET_SECCOMP): " + JNACLibrary.strerror(errno2));
            }
        }

        // now check that the filter was really installed, we should be in filter mode.
        if (linux_prctl(PR_GET_SECCOMP, 0, 0, 0, 0) != 2) {
            throw new UnsupportedOperationException("seccomp filter installation did not really succeed. seccomp(PR_GET_SECCOMP): "
                                                    + JNACLibrary.strerror(Native.getLastError()));
        }

        logger.debug("Linux seccomp filter installation successful, threads: [{}]", method == 1 ? "all" : "app" );
        return method;
    }
    // org.elasticsearch.bootstrap.Natives#tryMlockall
    static void tryMlockall() {
        if (JNA_AVAILABLE == false) {
            logger.warn("cannot mlockall because JNA is not available");
            return;
        }
        JNANatives.tryMlockall();
    }
    // org.elasticsearch.bootstrap.JNANatives#tryMlockall
    static void tryMlockall() {
        int errno = Integer.MIN_VALUE;
        String errMsg = null;
        boolean rlimitSuccess = false;
        long softLimit = 0;
        long hardLimit = 0;

        try {
            int result = JNACLibrary.mlockall(JNACLibrary.MCL_CURRENT);
            if (result == 0) {
                LOCAL_MLOCKALL = true;
                return;
            }

            errno = Native.getLastError();
            errMsg = JNACLibrary.strerror(errno);
            if (Constants.LINUX || Constants.MAC_OS_X) {
                // we only know RLIMIT_MEMLOCK for these two at the moment.
                JNACLibrary.Rlimit rlimit = new JNACLibrary.Rlimit();
                if (JNACLibrary.getrlimit(JNACLibrary.RLIMIT_MEMLOCK, rlimit) == 0) {
                    rlimitSuccess = true;
                    softLimit = rlimit.rlim_cur.longValue();
                    hardLimit = rlimit.rlim_max.longValue();
                } else {
                    logger.warn("Unable to retrieve resource limits: {}", JNACLibrary.strerror(Native.getLastError()));
                }
            }
        } catch (UnsatisfiedLinkError e) {
            // this will have already been logged by CLibrary, no need to repeat it
            return;
        }

        // mlockall failed for some reason
        logger.warn("Unable to lock JVM Memory: error={}, reason={}", errno , errMsg);
        logger.warn("This can result in part of the JVM being swapped out.");
        if (errno == JNACLibrary.ENOMEM) {
            if (rlimitSuccess) {
                logger.warn("Increase RLIMIT_MEMLOCK, soft limit: {}, hard limit: {}", rlimitToString(softLimit),
                    rlimitToString(hardLimit));
                if (Constants.LINUX) {
                    // give specific instructions for the linux case to make it easy
                    String user = System.getProperty("user.name");
                    logger.warn("These can be adjusted by modifying /etc/security/limits.conf, for example: \n" +
                                "\t# allow user '{}' mlockall\n" +
                                "\t{} soft memlock unlimited\n" +
                                "\t{} hard memlock unlimited",
                                user, user, user
                                );
                    logger.warn("If you are logged in interactively, you will have to re-login for the new limits to take effect.");
                }
            } else {
                logger.warn("Increase RLIMIT_MEMLOCK (ulimit).");
            }
        }
    }
View Code

  檢查重複類的實現,主要是看是否存在重複jar包,以及類名,詳情可戳。

    // org.elasticsearch.bootstrap.JarHell#checkJarHell
    /**
     * Checks the current classpath for duplicate classes
     * @param output A {@link String} {@link Consumer} to which debug output will be sent
     * @throws IllegalStateException if jar hell was found
     */
    public static void checkJarHell(Consumer<String> output) throws IOException, URISyntaxException {
        ClassLoader loader = JarHell.class.getClassLoader();
        output.accept("java.class.path: " + System.getProperty("java.class.path"));
        output.accept("sun.boot.class.path: " + System.getProperty("sun.boot.class.path"));
        if (loader instanceof URLClassLoader) {
            output.accept("classloader urls: " + Arrays.toString(((URLClassLoader)loader).getURLs()));
        }
        checkJarHell(parseClassPath(), output);
    }
    
    /**
     * Checks the set of URLs for duplicate classes
     * @param urls A set of URLs from the classpath to be checked for conflicting jars
     * @param output A {@link String} {@link Consumer} to which debug output will be sent
     * @throws IllegalStateException if jar hell was found
     */
    @SuppressForbidden(reason = "needs JarFile for speed, just reading entries")
    public static void checkJarHell(Set<URL> urls, Consumer<String> output) throws URISyntaxException, IOException {
        // we don't try to be sneaky and use deprecated/internal/not portable stuff
        // like sun.boot.class.path, and with jigsaw we don't yet have a way to get
        // a "list" at all. So just exclude any elements underneath the java home
        String javaHome = System.getProperty("java.home");
        output.accept("java.home: " + javaHome);
        final Map<String,Path> clazzes = new HashMap<>(32768);
        Set<Path> seenJars = new HashSet<>();
        for (final URL url : urls) {
            final Path path = PathUtils.get(url.toURI());
            // exclude system resources
            if (path.startsWith(javaHome)) {
                output.accept("excluding system resource: " + path);
                continue;
            }
            if (path.toString().endsWith(".jar")) {
                // jar包重複
                if (seenJars.add(path) == false) {
                    throw new IllegalStateException("jar hell!" + System.lineSeparator() +
                                                    "duplicate jar on classpath: " + path);
                }
                output.accept("examining jar: " + path);
                try (JarFile file = new JarFile(path.toString())) {
                    Manifest manifest = file.getManifest();
                    if (manifest != null) {
                        // 檢查 MANIFEST.MF, 版本號...
                        checkManifest(manifest, path);
                    }
                    // inspect entries
                    Enumeration<JarEntry> elements = file.entries();
                    while (elements.hasMoreElements()) {
                        String entry = elements.nextElement().getName();
                        if (entry.endsWith(".class")) {
                            // for jar format, the separator is defined as /
                            entry = entry.replace('/', '.').substring(0, entry.length() - 6);
                            checkClass(clazzes, entry, path);
                        }
                    }
                }
            } else {
                output.accept("examining directory: " + path);
                // case for tests: where we have class files in the classpath
                final Path root = PathUtils.get(url.toURI());
                final String sep = root.getFileSystem().getSeparator();

                // don't try and walk class or resource directories that don't exist
                // gradle will add these to the classpath even if they never get created
                if (Files.exists(root)) {
                    Files.walkFileTree(root, new SimpleFileVisitor<Path>() {
                        @Override
                        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                            String entry = root.relativize(file).toString();
                            if (entry.endsWith(".class")) {
                                // normalize with the os separator, remove '.class'
                                entry = entry.replace(sep, ".").substring(0, entry.length() - ".class".length());
                                checkClass(clazzes, entry, path);
                            }
                            return super.visitFile(file, attrs);
                        }
                    });
                }
            }
        }
    }
    // class 檢查
    private static void checkClass(Map<String, Path> clazzes, String clazz, Path jarpath) {
        if (clazz.equals("module-info") || clazz.endsWith(".module-info")) {
            // Ignore jigsaw module descriptions
            return;
        }
        Path previous = clazzes.put(clazz, jarpath);
        if (previous != null) {
            if (previous.equals(jarpath)) {
                if (clazz.startsWith("org.apache.xmlbeans")) {
                    return; // https://issues.apache.org/jira/browse/XMLBEANS-499
                }
                // throw a better exception in this ridiculous case.
                // unfortunately the zip file format allows this buggy possibility
                // UweSays: It can, but should be considered as bug :-)
                throw new IllegalStateException("jar hell!" + System.lineSeparator() +
                        "class: " + clazz + System.lineSeparator() +
                        "exists multiple times in jar: " + jarpath + " !!!!!!!!!");
            } else {
                throw new IllegalStateException("jar hell!" + System.lineSeparator() +
                        "class: " + clazz + System.lineSeparator() +
                        "jar1: " + previous + System.lineSeparator() +
                        "jar2: " + jarpath);
            }
        }
    }
View Code

  設定 SecurityManager 如下:

    // org.elasticsearch.bootstrap.Security#configure
    /**
     * Initializes SecurityManager for the environment
     * Can only happen once!
     * @param environment configuration for generating dynamic permissions
     * @param filterBadDefaults true if we should filter out bad java defaults in the system policy.
     */
    static void configure(Environment environment, boolean filterBadDefaults) throws IOException, NoSuchAlgorithmException {

        // enable security policy: union of template and environment-based paths, and possibly plugin permissions
        Map<String, URL> codebases = PolicyUtil.getCodebaseJarMap(JarHell.parseClassPath());
        Policy.setPolicy(new ESPolicy(codebases, createPermissions(environment),
            getPluginAndModulePermissions(environment), filterBadDefaults, createRecursiveDataPathPermission(environment)));

        // enable security manager
        final String[] classesThatCanExit =
                new String[]{
                        // SecureSM matches class names as regular expressions so we escape the $ that arises from the nested class name
                        ElasticsearchUncaughtExceptionHandler.PrivilegedHaltAction.class.getName().replace("$", "\\$"),
                        Command.class.getName()};
        System.setSecurityManager(new SecureSM(classesThatCanExit));

        // do some basic tests
        selfTest();
    }
    /** Simple checks that everything is ok */
    @SuppressForbidden(reason = "accesses jvm default tempdir as a self-test")
    static void selfTest() throws IOException {
        // check we can manipulate temporary files
        try {
            // 建立和刪除臨時檔案,以測試 SM 有效性
            Path p = Files.createTempFile(null, null);
            try {
                Files.delete(p);
            } catch (IOException ignored) {
                // potentially virus scanner
            }
        } catch (SecurityException problem) {
            throw new SecurityException("Security misconfiguration: cannot access java.io.tmpdir", problem);
        }
    }

  以上,就是整個es的setup過程了,native環境檢查,依賴jar包的檢查,鉤子安裝,sm安裝。。。 當然了還有最重要的node的建立,這個我們下節再說。

 

2.4. Node的建立與檢查

  每個es-server實際上都是作為一個叢集的一個節點執行的,而它的核心工作也是以Node形式呈現的。所以單獨談談node的建立。

  node是在setup中例項化的,而且是另外實現了一個Node, 主要是為了覆蓋validateNodeBeforeAcceptingRequests() . 

    // org.elasticsearch.bootstrap.Bootstrap#setup
    private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
        ...
        node = new Node(environment) {
            @Override
            protected void validateNodeBeforeAcceptingRequests(
                final BootstrapContext context,
                final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
                BootstrapChecks.check(context, boundTransportAddress, checks);
            }
        };
    }

  所以除去校驗工作是在 Bootstrap 中完成外,其他工作都是在 Node 的原生實現中完成,當然這裡指的是構造方法。

    // org.elasticsearch.node.Node#Node
    public Node(Environment environment) {
        this(environment, Collections.emptyList(), true);
    }

    /**
     * Constructs a node
     *
     * @param initialEnvironment         the initial environment for this node, which will be added to by plugins
     * @param classpathPlugins           the plugins to be loaded from the classpath
     * @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the
     *                                   test framework for tests that rely on being able to set private settings
     */
    protected Node(final Environment initialEnvironment,
                   Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
        final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
        boolean success = false;
        try {
            Settings tmpSettings = Settings.builder().put(initialEnvironment.settings())
                .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();

            final JvmInfo jvmInfo = JvmInfo.jvmInfo();
            logger.info(
                "version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
                Build.CURRENT.getQualifiedVersion(),
                jvmInfo.pid(),
                Build.CURRENT.flavor().displayName(),
                Build.CURRENT.type().displayName(),
                Build.CURRENT.hash(),
                Build.CURRENT.date(),
                Constants.OS_NAME,
                Constants.OS_VERSION,
                Constants.OS_ARCH,
                Constants.JVM_VENDOR,
                Constants.JVM_NAME,
                Constants.JAVA_VERSION,
                Constants.JVM_VERSION);
            if (jvmInfo.getBundledJdk()) {
                logger.info("JVM home [{}], using bundled JDK [{}]", System.getProperty("java.home"), jvmInfo.getUsingBundledJdk());
            } else {
                logger.info("JVM home [{}]", System.getProperty("java.home"));
                deprecationLogger.deprecate(
                    DeprecationCategory.OTHER,
                    "no-jdk",
                    "no-jdk distributions that do not bundle a JDK are deprecated and will be removed in a future release");
            }
            logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
            if (Build.CURRENT.isProductionRelease() == false) {
                logger.warn(
                    "version [{}] is a pre-release version of Elasticsearch and is not suitable for production",
                    Build.CURRENT.getQualifiedVersion());
            }

            if (logger.isDebugEnabled()) {
                logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",
                    initialEnvironment.configFile(), Arrays.toString(initialEnvironment.dataFiles()),
                    initialEnvironment.logsFile(), initialEnvironment.pluginsFile());
            }
            // 1. 外掛服務例項化
            this.pluginsService = new PluginsService(tmpSettings, initialEnvironment.configFile(), initialEnvironment.modulesFile(),
                initialEnvironment.pluginsFile(), classpathPlugins);
            final Settings settings = pluginsService.updatedSettings();

            final Set<DiscoveryNodeRole> additionalRoles = pluginsService.filterPlugins(Plugin.class)
                .stream()
                .map(Plugin::getRoles)
                .flatMap(Set::stream)
                .collect(Collectors.toSet());
            // role儲存
            DiscoveryNode.setAdditionalRoles(additionalRoles);

            /*
             * Create the environment based on the finalized view of the settings. This is to ensure that components get the same setting
             * values, no matter they ask for them from.
             */
            this.environment = new Environment(settings, initialEnvironment.configFile());
            Environment.assertEquivalent(initialEnvironment, this.environment);
            nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
            logger.info("node name [{}], node ID [{}], cluster name [{}], roles {}",
                NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value(),
                DiscoveryNode.getRolesFromSettings(settings).stream()
                    .map(DiscoveryNodeRole::roleName)
                    .collect(Collectors.toCollection(LinkedHashSet::new)));
            resourcesToClose.add(nodeEnvironment);
            localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
            // 2. 建立各執行執行緒池例項
            final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
            // 這個 ThreadPool 包含了許多型別的請求執行緒池, 如get/search/post...
            final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
            resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
            final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
            resourcesToClose.add(resourceWatcherService);
            // adds the context to the DeprecationLogger so that it does not need to be injected everywhere
            HeaderWarning.setThreadContext(threadPool.getThreadContext());
            resourcesToClose.add(() -> HeaderWarning.removeThreadContext(threadPool.getThreadContext()));

            final List<Setting<?>> additionalSettings = new ArrayList<>();
            // register the node.data, node.ingest, node.master, node.remote_cluster_client settings here so we can mark them private
            additionalSettings.add(NODE_DATA_SETTING);
            additionalSettings.add(NODE_INGE