1. 程式人生 > >ES學習筆記之health api的實現

ES學習筆記之health api的實現

sting don ber true ava == fail nfa ide

使用health api可以查看es集群的健康度。 health api的用法如下:

curl ‘http://localhost:9200/_cluster/health‘ 

health api的返回值中有一個核心的字段statusstatus 有3種取值: green, yellow, red。分別代表集群的3種狀態: 主分片和副本都已經分配,主分片已經分配副本分片沒有,制定分片沒有分配。

也就是說, health api關註的核心在於數據的高可用。

看health api的實現:
請求會路由到master節點,然後讀取clusterState中的routing_table.

基於routing_table

, 判斷索引的各個分片狀態

    public ClusterShardHealth(int shardId, final IndexShardRoutingTable shardRoutingTable) {
        this.shardId = shardId;
        for (ShardRouting shardRouting : shardRoutingTable) {
            if (shardRouting.active()) {
                activeShards++;
                if (shardRouting.relocating()) {
                    // the shard is relocating, the one it is relocating to will be in initializing state, so we don‘t count it
                    relocatingShards++;
                }
                if (shardRouting.primary()) {
                    primaryActive = true;
                }
            } else if (shardRouting.initializing()) {
                initializingShards++;
            } else if (shardRouting.unassigned()) {
                unassignedShards++;
            }
        }
        if (primaryActive) {
            if (activeShards == shardRoutingTable.size()) {
                status = ClusterHealthStatus.GREEN;
            } else {
                status = ClusterHealthStatus.YELLOW;
            }
        } else {
            status = ClusterHealthStatus.RED;
        }
    }

基於分片,決定索引的狀態

    public ClusterIndexHealth(IndexMetaData indexMetaData, IndexRoutingTable indexRoutingTable) {
        this.index = indexMetaData.getIndex();
        this.numberOfShards = indexMetaData.getNumberOfShards();
        this.numberOfReplicas = indexMetaData.getNumberOfReplicas();
        this.validationFailures = indexRoutingTable.validate(indexMetaData);

        for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
            int shardId = shardRoutingTable.shardId().id();
            shards.put(shardId, new ClusterShardHealth(shardId, shardRoutingTable));
        }

        // update the index status
        status = ClusterHealthStatus.GREEN;

        for (ClusterShardHealth shardHealth : shards.values()) {
            if (shardHealth.isPrimaryActive()) {
                activePrimaryShards++;
            }
            activeShards += shardHealth.getActiveShards();
            relocatingShards += shardHealth.getRelocatingShards();
            initializingShards += shardHealth.getInitializingShards();
            unassignedShards += shardHealth.getUnassignedShards();

            if (shardHealth.getStatus() == ClusterHealthStatus.RED) {
                status = ClusterHealthStatus.RED;
            } else if (shardHealth.getStatus() == ClusterHealthStatus.YELLOW && status != ClusterHealthStatus.RED) {
                // do not override an existing red
                status = ClusterHealthStatus.YELLOW;
            }
        }
        if (!validationFailures.isEmpty()) {
            status = ClusterHealthStatus.RED;
        } else if (shards.isEmpty()) { // might be since none has been created yet (two phase index creation)
            status = ClusterHealthStatus.RED;
        }
    }

基於索引,決定集群的狀態。

    public ClusterStateHealth(ClusterState clusterState, String[] concreteIndices) {
        RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
        validationFailures = validation.failures();
        numberOfNodes = clusterState.nodes().size();
        numberOfDataNodes = clusterState.nodes().dataNodes().size();

        for (String index : concreteIndices) {
            IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index);
            IndexMetaData indexMetaData = clusterState.metaData().index(index);
            if (indexRoutingTable == null) {
                continue;
            }

            ClusterIndexHealth indexHealth = new ClusterIndexHealth(indexMetaData, indexRoutingTable);

            indices.put(indexHealth.getIndex(), indexHealth);
        }

        status = ClusterHealthStatus.GREEN;

        for (ClusterIndexHealth indexHealth : indices.values()) {
            activePrimaryShards += indexHealth.getActivePrimaryShards();
            activeShards += indexHealth.getActiveShards();
            relocatingShards += indexHealth.getRelocatingShards();
            initializingShards += indexHealth.getInitializingShards();
            unassignedShards += indexHealth.getUnassignedShards();
            if (indexHealth.getStatus() == ClusterHealthStatus.RED) {
                status = ClusterHealthStatus.RED;
            } else if (indexHealth.getStatus() == ClusterHealthStatus.YELLOW && status != ClusterHealthStatus.RED) {
                status = ClusterHealthStatus.YELLOW;
            }
        }

        if (!validationFailures.isEmpty()) {
            status = ClusterHealthStatus.RED;
        } else if (clusterState.blocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE)) {
            status = ClusterHealthStatus.RED;
        }

        // shortcut on green
        if (status.equals(ClusterHealthStatus.GREEN)) {
            this.activeShardsPercent = 100;
        } else {
            List<ShardRouting> shardRoutings = clusterState.getRoutingTable().allShards();
            int activeShardCount = 0;
            int totalShardCount = 0;
            for (ShardRouting shardRouting : shardRoutings) {
                if (shardRouting.active()) activeShardCount++;
                totalShardCount++;
            }
            this.activeShardsPercent = (((double) activeShardCount) / totalShardCount) * 100;
        }
    }

理解health api, 需要理解clusterState。 好在這些都是只讀的信息,不難理解。

ES學習筆記之health api的實現