1. 程式人生 > >elasticsearch原始碼分析之分片分配(十)

elasticsearch原始碼分析之分片分配(十)

分片

什麼是分片

分片是把索引資料切分成多個小的索引塊,這些小的索引塊能夠分發到同一個叢集中的不同節點。在檢索時,檢索結果是該索引每個分片上檢索結果的合併。類似於資料庫的分庫分表。

為什麼分片

1、這樣可以提高讀寫效能,實現負載均衡。
2、副本容易擴充套件,備份恢復快。

怎麼分片

分片(或者叫分割槽)是分散式系統的一個經典問題。常用的分片方式:

分片方式 說明 優點 缺點
簡單hash 除餘取模 簡單 擴充套件困難、容易資料傾斜
資料範圍分佈 按照資料的所處範圍進行分類,每個分割槽可以動態分裂 易擴充套件 元資料服務維護複雜,容易成為瓶頸
資料量分佈 按照資料塊大小分佈 與資料內容無關,無資料傾斜,易擴容 元資料維護維護複雜
一致性hash 資料和節點hash後沿環形匹配、虛擬節點 易擴充套件 -

es採用的是簡單hash,預設hash的是_id欄位,另外也可以指定分片欄位。hash完同一結果的資料分配到一個分片shard中。

分片分配

什麼是分片分配

已經切分為多份的索引塊,索引塊分發到同一個叢集中的不同節點。這個把shard分發到node的過程就是分片的分配。分配的原則是主要還是基於提高讀寫效能,實現負載均衡,備份恢復快。

怎麼分片分配

分片時機

This can happen during initial recovery, replica allocation, rebalancing, or when nodes are added or removed.

  1. index的增刪
  2. node的增刪
  3. reroute操作
  4. replica的設定更改
  5. 初始化恢復過程

AllocationService.reroute呼叫位置,也就是呼叫分片分配的時機:
AllocationService.reroute呼叫位置

分片規則

es的分片規則主要分為以下幾類:
一、負載均衡規則,從負載均衡角度出發的一些規則,常見的有:

  1. SameShardAllocationDecider,該決策者不允許相同分片(primary\replication)出現在相同的節點上,重寫了canAllocate方法。該類也考慮到了同一物理機多個es例項的情況(es可能多個虛擬機器上,多個虛擬機器在一臺物理機上),通過cluster.routing.allocation.same_shard.host=true(預設false)來處理該情況。判斷的依據是hostname和hostaddress。
  2. ShardsLimitAllocationDecider類,限制同一個節點上shard的數目。可以限制同一節點上的shard總數、同一節點上同一index的shard數目,分別通過index.routing.allocation.total_shards_per_node、cluster.routing.allocation.total_shards_per_node實現。index級別可以覆蓋cluster級別。在elasticsearh.yml檔案中配置或者用update API實時更改。預設的值是-1,代表沒有任何限制。需要注意,如降低該值會導致叢集強制進行分片的重新分配,在叢集平衡這個過程中引發額外的負載。
  3. AwarenessAllocationDecider類,感知分配功能。更夠感知伺服器、服務機架等,儘量分散儲存shard。有兩類引數可以使用。第一類引數舉例:我們通過引數設定分組cluster.routing.allocation.awareness.attributes: rack_id,一node啟動設定了node.attr.rack_id:1,另外一node(兩個node不在一個機架上)啟動設定了node.attr.rack_id:2,所以shard會盡量分散到不同的rack_id上。第二類引數舉例:cluster.routing.allocation.awareness.attributes: zone,cluster.routing.allocation.awareness.force.zone.values: zone1,zone2 如果zone1的機器上不能容納所有的shard,並且zone2沒有啟動,剩餘沒有分配的shard則不會進行分配(zone1過載),直到等到zone2啟動才進行分配。

二、併發數量規則

  1. ConcurrentRebalanceAllocationDecider類,rebalance併發數控制類。配置cluster.routing.allocation.cluster_concurrent_rebalance來控制,該配置執行時可變,預設值為2,如果設定為-1,則表示無限制併發。
  2. ThrottlingAllocationDecider類,在recovery過程中,恢復分片併發數。可動態設定控制引數配置:cluster.routing.allocation.node_initial_primaries_recoveries:這個屬性的預設值為4,它用來描述單個節點上允許recovery操作的初始主分片數量;cluster.routing.allocation.node_concurrent_recoveries:它的預設值是2,它用來限制單個節點上進行recovery操作的併發數。

三、條件限制規則

  1. FilterAllocationDecider類,通過include、exclude引數(可動態設定)控制shard的節點分配。引數:index.routing.allocation.require.、index.routing.allocation.include.、index.routing.allocation.exclude.、cluster.routing.allocation.require.、cluster.routing.allocation.include.、cluster.routing.allocation.exclude.。其中require表示必須,include表示允許,exclude表示禁止。注意Cluster的設定會過載掉index的配置,意味著如果根據index的配置該shard可以分配到此node,但是cluster的配置是不允許,那麼此shard將不允許。filter被應用的順序依次為required、include、exclude。
  2. ReplicaAfterPrimaryActiveAllocationDecider類,該類保證只會在主分片分配完畢後才開始分配分片副本。
  3. RebalanceOnlyWhenActiveAllocationDecider類,保證該索引的所有分片都在活躍狀態才能進行rebalance過程。
  4. ClusterRebalanceAllocationDecider類,根據shard的active狀態來判斷是否可以執行rebalance。使用引數cluster.routing.allocation.allow_rebalance(不能動態更改)來進行判斷,引數值意義:①indices_all_active:它是預設值,表示只有叢集中所有的節點分配完畢,才能認定叢集再平衡完成。②indices_primaries_active:這個值表示只要所有主分片分配完畢了,就可以認定叢集再平衡完成。③always:它表示即使當主分片和分片副本都沒有分配,叢集再平衡操作也是允許的。
  5. DiskThresholdDecider類,通過磁碟空間閾值來控制是否分配。預設該功能是關閉的,通過cluster.routing.allocation.disk.threshold_enabled屬性設定為true可以開啟。cluster.routing.allocation.disk.watermark.low屬性允許使用者指定一個百分比閾值或者絕對數值來控制何時能夠進行分片分配。比如預設值是0.7,表示當可用磁碟空間低於70%時,新的分片才可以分配到該節點上。cluster.routing.allocation.disk.watermark.high屬性允許使用者指定一個百分比閾值或者絕對數值來控制何時需要將分片分配到其它的節點。比如預設值是0.85,表示當可用磁碟空間高於85%時,ElasticSearch會重新把該節點的分片分配到其它節點。引數可以yml檔案或者api動態設定。

上述三類分配規則的java類全部繼承了AllocationDeciders抽象類,該類是負責shard的分配做一個決策結果(Decision類,決策結果類。有四中型別,ALWAYS、YES、NO、THROTTLE)。定義了canRebalance方法(給定的shard routing是否可以rebalance),canAllocate方法(給定的shard routing是否可以分配到指定的node),canRemain方法,給定的shard routing是否可以繼續保留在指定的node;該類所有方法預設都返回ALWAYS。

分配執行

核心邏輯是根據上述規則&分片權重(index、cluster)進行位置判斷,然後進行資料移動、移動結束初始化啟動、最後調整clusterstate完成分配。

1、執行入口是AllocationService.reroute方法,根據clusterstate構造出RoutingAllocation,該類持有當前叢集shard分配的狀態資訊、決策資訊、節點資訊等,在後面的分配過程的主要操作類。

    protected ClusterState reroute(final ClusterState clusterState, String reason, boolean debug) {
        RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
        // shuffle the unassigned nodes, just so we won't have things like poison failed shards
        routingNodes.unassigned().shuffle();
        RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState,
            clusterInfoService.getClusterInfo(), currentNanoTime(), false);
        allocation.debugDecision(debug);
        reroute(allocation);
        if (allocation.routingNodesChanged() == false) {
            return clusterState;
        }
        return buildResultAndLogHealthChange(clusterState, allocation, reason);
    }

2、執行真正reroute邏輯,如果有節點沒有分配shard,則執行gatewayAllocator.allocateUnassigned。關於gatewayAllocator的分配主要分為primaryShardAllocator和replicaShardAllocator:

primaryShardAllocator.allocateUnassigned(allocation);
replicaShardAllocator.processExistingRecoveries(allocation);
replicaShardAllocator.allocateUnassigned(allocation);

3、執行資料分片分配BalancedShardsAllocator.allocate(allocation)。該類基於WeightFunction重新分配叢集節點node持有shard的分配關係。allocate方法主要分三步:

final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
balancer.allocateUnassigned();
balancer.moveShards();
balancer.balance();
  1. 第一步是allocateUnassigned,根據WeightFunction演算法和所有AllocationDecider把所有給定的shard分配一個最小化匹配的node
  2. 第二步是moveShards,根據第一步的結果對需要移動的節點進行移動,移動過程中為RELOCATING,移動過去初始化INITIALIZING
  3. 第三步是負載均衡,rebalance其實是從負載高的node向負載低的做轉移。

分配邏輯有很多沒有讀懂的地方,所以一些細節沒有深入描述。等我明白再來補充,或者歡迎各位指導。