1. 程式人生 > >聊聊Elasticsearch的OsProbe

聊聊Elasticsearch的OsProbe

本文主要研究一下Elasticsearch的OsProbe

OsProbe

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/os/OsProbe.java

public class OsProbe {

    private static final OperatingSystemMXBean osMxBean = ManagementFactory.getOperatingSystemMXBean();

    private static final Method getFreePhysicalMemorySize;
    private static final Method getTotalPhysicalMemorySize;
    private static final Method getFreeSwapSpaceSize;
    private static final Method getTotalSwapSpaceSize;
    private static final Method getSystemLoadAverage;
    private static final Method getSystemCpuLoad;

    static {
        getFreePhysicalMemorySize = getMethod("getFreePhysicalMemorySize");
        getTotalPhysicalMemorySize = getMethod("getTotalPhysicalMemorySize");
        getFreeSwapSpaceSize = getMethod("getFreeSwapSpaceSize");
        getTotalSwapSpaceSize = getMethod("getTotalSwapSpaceSize");
        getSystemLoadAverage = getMethod("getSystemLoadAverage");
        getSystemCpuLoad = getMethod("getSystemCpuLoad");
    }

    /**
     * Returns the amount of free physical memory in bytes.
     */
    public long getFreePhysicalMemorySize() {
        if (getFreePhysicalMemorySize == null) {
            return -1;
        }
        try {
            return (long) getFreePhysicalMemorySize.invoke(osMxBean);
        } catch (Exception e) {
            return -1;
        }
    }

    /**
     * Returns the total amount of physical memory in bytes.
     */
    public long getTotalPhysicalMemorySize() {
        if (getTotalPhysicalMemorySize == null) {
            return -1;
        }
        try {
            return (long) getTotalPhysicalMemorySize.invoke(osMxBean);
        } catch (Exception e) {
            return -1;
        }
    }

    /**
     * Returns the amount of free swap space in bytes.
     */
    public long getFreeSwapSpaceSize() {
        if (getFreeSwapSpaceSize == null) {
            return -1;
        }
        try {
            return (long) getFreeSwapSpaceSize.invoke(osMxBean);
        } catch (Exception e) {
            return -1;
        }
    }

    /**
     * Returns the total amount of swap space in bytes.
     */
    public long getTotalSwapSpaceSize() {
        if (getTotalSwapSpaceSize == null) {
            return -1;
        }
        try {
            return (long) getTotalSwapSpaceSize.invoke(osMxBean);
        } catch (Exception e) {
            return -1;
        }
    }

    /**
     * The system load averages as an array.
     *
     * On Windows, this method returns {@code null}.
     *
     * On Linux, this method returns the 1, 5, and 15-minute load averages.
     *
     * On macOS, this method should return the 1-minute load average.
     *
     * @return the available system load averages or {@code null}
     */
    final double[] getSystemLoadAverage() {
        if (Constants.WINDOWS) {
            return null;
        } else if (Constants.LINUX) {
            try {
                final String procLoadAvg = readProcLoadavg();
                assert procLoadAvg.matches("(\\d+\\.\\d+\\s+){3}\\d+/\\d+\\s+\\d+");
                final String[] fields = procLoadAvg.split("\\s+");
                return new double[]{Double.parseDouble(fields[0]), Double.parseDouble(fields[1]), Double.parseDouble(fields[2])};
            } catch (final IOException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("error reading /proc/loadavg", e);
                }
                return null;
            }
        } else {
            assert Constants.MAC_OS_X;
            if (getSystemLoadAverage == null) {
                return null;
            }
            try {
                final double oneMinuteLoadAverage = (double) getSystemLoadAverage.invoke(osMxBean);
                return new double[]{oneMinuteLoadAverage >= 0 ? oneMinuteLoadAverage : -1, -1, -1};
            } catch (IllegalAccessException | InvocationTargetException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("error reading one minute load average from operating system", e);
                }
                return null;
            }
        }
    }

    public short getSystemCpuPercent() {
        return Probes.getLoadAndScaleToPercent(getSystemCpuLoad, osMxBean);
    }

    public OsStats osStats() {
        final OsStats.Cpu cpu = new OsStats.Cpu(getSystemCpuPercent(), getSystemLoadAverage());
        final OsStats.Mem mem = new OsStats.Mem(getTotalPhysicalMemorySize(), getFreePhysicalMemorySize());
        final OsStats.Swap swap = new OsStats.Swap(getTotalSwapSpaceSize(), getFreeSwapSpaceSize());
        final OsStats.Cgroup cgroup = Constants.LINUX ? getCgroup() : null;
        return new OsStats(System.currentTimeMillis(), cpu, mem, swap, cgroup);
    }

    /**
     * Returns a given method of the OperatingSystemMXBean, or null if the method is not found or unavailable.
     */
    private static Method getMethod(String methodName) {
        try {
            return Class.forName("com.sun.management.OperatingSystemMXBean").getMethod(methodName);
        } catch (Exception e) {
            // not available
            return null;
        }
    }

    //......
}
  • OsProbe使用static程式碼塊反射獲取了getFreePhysicalMemorySize、getTotalPhysicalMemorySize、getFreeSwapSpaceSize、getTotalSwapSpaceSize、getSystemLoadAverage、getSystemCpuLoad這幾個method;它們是從com.sun.management.OperatingSystemMXBean獲取的;osStats方法返回OsStats,它由OsStats.Cpu、OsStats.Mem、OsStats.Swap、OsStats.Cgroup這幾部分組成

OsProbe.getCgroup

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/os/OsProbe.java

public class OsProbe {
	//......

    /**
     * Basic cgroup stats.
     *
     * @return basic cgroup stats, or {@code null} if an I/O exception occurred reading the cgroup stats
     */
    private OsStats.Cgroup getCgroup() {
        try {
            if (!areCgroupStatsAvailable()) {
                return null;
            } else {
                final Map<String, String> controllerMap = getControlGroups();
                assert !controllerMap.isEmpty();

                final String cpuAcctControlGroup = controllerMap.get("cpuacct");
                assert cpuAcctControlGroup != null;
                final long cgroupCpuAcctUsageNanos = getCgroupCpuAcctUsageNanos(cpuAcctControlGroup);

                final String cpuControlGroup = controllerMap.get("cpu");
                assert cpuControlGroup != null;
                final long cgroupCpuAcctCpuCfsPeriodMicros = getCgroupCpuAcctCpuCfsPeriodMicros(cpuControlGroup);
                final long cgroupCpuAcctCpuCfsQuotaMicros = getCgroupCpuAcctCpuCfsQuotaMicros(cpuControlGroup);
                final OsStats.Cgroup.CpuStat cpuStat = getCgroupCpuAcctCpuStat(cpuControlGroup);

                final String memoryControlGroup = controllerMap.get("memory");
                assert memoryControlGroup != null;
                final String cgroupMemoryLimitInBytes = getCgroupMemoryLimitInBytes(memoryControlGroup);
                final String cgroupMemoryUsageInBytes = getCgroupMemoryUsageInBytes(memoryControlGroup);

                return new OsStats.Cgroup(
                    cpuAcctControlGroup,
                    cgroupCpuAcctUsageNanos,
                    cpuControlGroup,
                    cgroupCpuAcctCpuCfsPeriodMicros,
                    cgroupCpuAcctCpuCfsQuotaMicros,
                    cpuStat,
                    memoryControlGroup,
                    cgroupMemoryLimitInBytes,
                    cgroupMemoryUsageInBytes);
            }
        } catch (final IOException e) {
            logger.debug("error reading control group stats", e);
            return null;
        }
    }

    @SuppressForbidden(reason = "access /proc/self/cgroup, /sys/fs/cgroup/cpu, /sys/fs/cgroup/cpuacct and /sys/fs/cgroup/memory")
    boolean areCgroupStatsAvailable() {
        if (!Files.exists(PathUtils.get("/proc/self/cgroup"))) {
            return false;
        }
        if (!Files.exists(PathUtils.get("/sys/fs/cgroup/cpu"))) {
            return false;
        }
        if (!Files.exists(PathUtils.get("/sys/fs/cgroup/cpuacct"))) {
            return false;
        }
        if (!Files.exists(PathUtils.get("/sys/fs/cgroup/memory"))) {
            return false;
        }
        return true;
    }

    private Map<String, String> getControlGroups() throws IOException {
        final List<String> lines = readProcSelfCgroup();
        final Map<String, String> controllerMap = new HashMap<>();
        for (final String line : lines) {
            /*
             * The virtual file /proc/self/cgroup lists the control groups that the Elasticsearch process is a member of. Each line contains
             * three colon-separated fields of the form hierarchy-ID:subsystem-list:cgroup-path. For cgroups version 1 hierarchies, the
             * subsystem-list is a comma-separated list of subsystems. The subsystem-list can be empty if the hierarchy represents a cgroups
             * version 2 hierarchy. For cgroups version 1
             */
            final String[] fields = line.split(":");
            assert fields.length == 3;
            final String[] controllers = fields[1].split(",");
            for (final String controller : controllers) {
                final String controlGroupPath;
                if (CONTROL_GROUPS_HIERARCHY_OVERRIDE != null) {
                    /*
                     * Docker violates the relationship between /proc/self/cgroup and the /sys/fs/cgroup hierarchy. It's possible that this
                     * will be fixed in future versions of Docker with cgroup namespaces, but this requires modern kernels. Thus, we provide
                     * an undocumented hack for overriding the control group path. Do not rely on this hack, it will be removed.
                     */
                    controlGroupPath = CONTROL_GROUPS_HIERARCHY_OVERRIDE;
                } else {
                    controlGroupPath = fields[2];
                }
                final String previous = controllerMap.put(controller, controlGroupPath);
                assert previous == null;
            }
        }
        return controllerMap;
    }

    private long getCgroupCpuAcctUsageNanos(final String controlGroup) throws IOException {
        return Long.parseLong(readSysFsCgroupCpuAcctCpuAcctUsage(controlGroup));
    }

    @SuppressForbidden(reason = "access /sys/fs/cgroup/cpuacct")
    String readSysFsCgroupCpuAcctCpuAcctUsage(final String controlGroup) throws IOException {
        return readSingleLine(PathUtils.get("/sys/fs/cgroup/cpuacct", controlGroup, "cpuacct.usage"));
    }

    private long getCgroupCpuAcctCpuCfsPeriodMicros(final String controlGroup) throws IOException {
        return Long.parseLong(readSysFsCgroupCpuAcctCpuCfsPeriod(controlGroup));
    }

    @SuppressForbidden(reason = "access /sys/fs/cgroup/cpu")
    String readSysFsCgroupCpuAcctCpuCfsPeriod(final String controlGroup) throws IOException {
        return readSingleLine(PathUtils.get("/sys/fs/cgroup/cpu", controlGroup, "cpu.cfs_period_us"));
    }

    private long getCgroupCpuAcctCpuCfsQuotaMicros(final String controlGroup) throws IOException {
        return Long.parseLong(readSysFsCgroupCpuAcctCpuAcctCfsQuota(controlGroup));
    }

    @SuppressForbidden(reason = "access /sys/fs/cgroup/cpu")
    String readSysFsCgroupCpuAcctCpuAcctCfsQuota(final String controlGroup) throws IOException {
        return readSingleLine(PathUtils.get("/sys/fs/cgroup/cpu", controlGroup, "cpu.cfs_quota_us"));
    }    

    private OsStats.Cgroup.CpuStat getCgroupCpuAcctCpuStat(final String controlGroup) throws IOException {
        final List<String> lines = readSysFsCgroupCpuAcctCpuStat(controlGroup);
        long numberOfPeriods = -1;
        long numberOfTimesThrottled = -1;
        long timeThrottledNanos = -1;
        for (final String line : lines) {
            final String[] fields = line.split("\\s+");
            switch (fields[0]) {
                case "nr_periods":
                    numberOfPeriods = Long.parseLong(fields[1]);
                    break;
                case "nr_throttled":
                    numberOfTimesThrottled = Long.parseLong(fields[1]);
                    break;
                case "throttled_time":
                    timeThrottledNanos = Long.parseLong(fields[1]);
                    break;
            }
        }
        assert numberOfPeriods != -1;
        assert numberOfTimesThrottled != -1;
        assert timeThrottledNanos != -1;
        return new OsStats.Cgroup.CpuStat(numberOfPeriods, numberOfTimesThrottled, timeThrottledNanos);
    }

    private String getCgroupMemoryLimitInBytes(final String controlGroup) throws IOException {
        return readSysFsCgroupMemoryLimitInBytes(controlGroup);
    }

    @SuppressForbidden(reason = "access /sys/fs/cgroup/memory")
    String readSysFsCgroupMemoryLimitInBytes(final String controlGroup) throws IOException {
        return readSingleLine(PathUtils.get("/sys/fs/cgroup/memory", controlGroup, "memory.limit_in_bytes"));
    }

    private String getCgroupMemoryUsageInBytes(final String controlGroup) throws IOException {
        return readSysFsCgroupMemoryUsageInBytes(controlGroup);
    }

            
	//......
}
  • OsProbe.getCgroup方法首先通過areCgroupStatsAvailable方法確定是否有cgroup的統計資訊,不存在則直接返回null;存在則通過getControlGroups讀取相關資訊構建OsStats.Cgroup
  • areCgroupStatsAvailable方法判斷/proc/self/cgroup/sys/fs/cgroup/cpu/sys/fs/cgroup/cpuacct/sys/fs/cgroup/memory這幾個檔案是否存在,不存在則返回false,都存在則返回true
  • getControlGroups()方法從/proc/self/cgroup讀取資訊,它包括cpu、cpuacct、memory三部分;其中cpu資訊從/sys/fs/cgroup/cpu讀取;cpuacct資訊從/sys/fs/cgroup/cpuacct讀取;memory資訊從/sys/fs/cgroup/memory讀取

cgroup資訊例項

/proc/self/cgroup

cat /proc/self/cgroup
12:pids:/docker/d8942cd630dbfddeb5696bd2dc134405c11433458d2aaa1845218086a2015845
11:hugetlb:/docker/d8942cd630dbfddeb5696bd2dc134405c11433458d2aaa1845218086a2015845
10:net_prio:/docker/d8942cd630dbfddeb5696bd2dc134405c11433458d2aaa1845218086a2015845
9:perf_event:/docker/d8942cd630dbfddeb5696bd2dc134405c11433458d2aaa1845218086a2015845
8:net_cls:/docker/d8942cd630dbfddeb5696bd2dc134405c11433458d2aaa1845218086a2015845
7:freezer:/docker/d8942cd630dbfddeb5696bd2dc134405c11433458d2aaa1845218086a2015845
6:devices:/docker/d8942cd630dbfddeb5696bd2dc134405c11433458d2aaa1845218086a2015845
5:memory:/docker/d8942cd630dbfddeb5696bd2dc134405c11433458d2aaa1845218086a2015845
4:blkio:/docker/d8942cd630dbfddeb5696bd2dc134405c11433458d2aaa1845218086a2015845
3:cpuacct:/docker/d8942cd630dbfddeb5696bd2dc134405c11433458d2aaa1845218086a2015845
2:cpu:/docker/d8942cd630dbfddeb5696bd2dc134405c11433458d2aaa1845218086a2015845
1:cpuset:/docker/d8942cd630dbfddeb5696bd2dc134405c11433458d2aaa1845218086a2015845

/sys/fs/cgroup/cpu

/sys/fs/cgroup/cpu # ls
cgroup.clone_children  cpu.rt_period_us       notify_on_release
cgroup.procs           cpu.rt_runtime_us      tasks
cpu.cfs_period_us      cpu.shares
cpu.cfs_quota_us       cpu.stat

cat /sys/fs/cgroup/cpu/cpu.cfs_period_us
100000

cat /sys/fs/cgroup/cpu/cpu.cfs_quota_us
-1

/sys/fs/cgroup/cpuacct

/sys/fs/cgroup/cpuacct # ls
cgroup.clone_children      cpuacct.usage_percpu_sys
cgroup.procs               cpuacct.usage_percpu_user
cpuacct.stat               cpuacct.usage_sys
cpuacct.usage              cpuacct.usage_user
cpuacct.usage_all          notify_on_release
cpuacct.usage_percpu       tasks

cat /sys/fs/cgroup/cpuacct/cpuacct.usage
31322196847

cat /sys/fs/cgroup/cpuacct/cpuacct.stat
user 2759
system 179

/sys/fs/cgroup/memory

/sys/fs/cgroup/memory # ls
cgroup.clone_children               memory.memsw.failcnt
cgroup.event_control                memory.memsw.limit_in_bytes
cgroup.procs                        memory.memsw.max_usage_in_bytes
memory.failcnt                      memory.memsw.usage_in_bytes
memory.force_empty                  memory.move_charge_at_immigrate
memory.kmem.failcnt                 memory.numa_stat
memory.kmem.limit_in_bytes          memory.oom_control
memory.kmem.max_usage_in_bytes      memory.pressure_level
memory.kmem.slabinfo                memory.soft_limit_in_bytes
memory.kmem.tcp.failcnt             memory.stat
memory.kmem.tcp.limit_in_bytes      memory.swappiness
memory.kmem.tcp.max_usage_in_bytes  memory.usage_in_bytes
memory.kmem.tcp.usage_in_bytes      memory.use_hierarchy
memory.kmem.usage_in_bytes          notify_on_release
memory.limit_in_bytes               tasks
memory.max_usage_in_bytes

cat /sys/fs/cgroup/memory/memory.limit_in_bytes
9223372036854771712

cat /sys/fs/cgroup/memory/memory.usage_in_bytes
198819840

小結

  • OsProbe使用static程式碼塊反射獲取了getFreePhysicalMemorySize、getTotalPhysicalMemorySize、getFreeSwapSpaceSize、getTotalSwapSpaceSize、getSystemLoadAverage、getSystemCpuLoad這幾個method;它們是從com.sun.management.OperatingSystemMXBean獲取的;osStats方法返回OsStats,它由OsStats.Cpu、OsStats.Mem、OsStats.Swap、OsStats.Cgroup這幾部分組成
  • OsProbe.getCgroup方法首先通過areCgroupStatsAvailable方法確定是否有cgroup的統計資訊,不存在則直接返回null;存在則通過getControlGroups讀取相關資訊構建OsStats.Cgroup
  • areCgroupStatsAvailable方法判斷/proc/self/cgroup/sys/fs/cgroup/cpu/sys/fs/cgroup/cpuacct/sys/fs/cgroup/memory這幾個檔案/目錄是否存在,不存在則返回false,都存在則返回true;getControlGroups()方法從/proc/self/cgroup讀取資訊,它包括cpu、cpuacct、memory三部分;其中cpu資訊從/sys/fs/cgroup/cpu讀取;cpuacct資訊從/sys/fs/cgroup/cpuacct讀取;memory資訊從/sys/fs/cgroup/memory讀取

doc