1. 程式人生 > >elasticsearch原始碼分析之索引操作(九)

elasticsearch原始碼分析之索引操作(九)

上節介紹了es的node啟動如何建立叢集服務的過程,這節在其基礎之上介紹es索引的基本操作功能(create、exist、delete),用來進一步細化es叢集是如果工作的。
客戶端部分的操作就不予介紹了,詳細可以參照elasticsearch原始碼分析之客戶端(三)。這裡只介紹服務端處理工作。

createIndex

1、建立索引,客戶端提交索引的基本資訊(索引名稱、分割槽數、副本數),封裝成CreateIndexRequest類,提交給服務端。
2、服務端接收到請求,跟actionname判斷獲得具體響應action,具體參照elasticsearch原始碼分析之服務端(四)。此處為執行TransportCreateIndexAction的masterOperation,該類繼承TransportMasterNodeAction。

TransportMasterNodeAction類,master節點執行類,是所有在master節點執行的響應類的父類。主要完成master節點判斷請求和處理失敗重試功能。繼承了HandledTransportAction類,實現了doExecute方法。新啟動一個非同步單執行緒(非同步響應,不佔用接收request執行緒),如果本節點是master節點,則啟動執行masterOperation;如果不是,則傳送給mastr節點執行,執行抽象方法masterOperation由子類實現;另外通過ClusterStateObserver.waitForNextChange完成了錯誤重試功能。

ClusterStateObserver類,叢集狀態監控器。一個簡化叢集狀態處理的工具類,用於失敗重試和依賴其他叢集狀態等。其中waitForNextChange(listener,statePredicate)方法,當①叢集狀態變化並且滿足傳入的statePredicate時候,或者②達到超時時間timeOutValue時候(藉助clusterService.addTimeoutListener實現
),回撥listener執行。

3、TransportCreateIndexAction.masterOperation方法將CreateIndexRequest轉換為CreateIndexClusterStateUpdateRequest,呼叫MetaDataCreateIndexService.createIndex方法。該方法負責在clusterstate中建立新的index(onlyCreateIndex方法),並且等待指定數目的active狀態的shard副本數(activeShardsObserver.waitForActiveShards方法實現),最終返回給listener。

 1. 如果在clusterstate中成功建立index,則CreateIndexClusterStateUpdateResponse#isAcknowledged()就返回true,否則為false
 2. 如果clusterstate建立成功,並且在超時時間內指定數目的active狀態shard副本數返回成功,則CreateIndexClusterStateUpdateResponse#isShardsAcked()為true,否則為false

4、onlyCreateIndex方法,其內部執行clusterService.submitStateUpdateTask,提交叢集狀態修改任務,提交任務的執行邏輯是AckedClusterStateUpdateTask類內部的execute方法。內部邏輯為:

 1. 校驗index的名字和index的settings是否合法,校驗 request 的別名是否合法
 2. 查詢合適的模板findTemplates資訊
 3. 解析customs、mappings、templatesAliases、templateNames資訊
 4. 構建indexSettings
 5. indicesService服務增加index服務,mapperService服務合併mappings(indicesService.createIndex,mapperService.merge)
 6. 構建IndexMetaData
 7. 構建並生成新的ClusterState updatedState
 8. 如果index狀態open,執行allocationService.reroute對shard進行分配
 9. 最後判斷如果執行異常則刪除索引服務(indicesService.removeIndex)

5、修改完成clusterstate,通過submitStateUpdateTask進行叢集狀態修改提交(上步驟邏輯),1釋出叢集狀態(如果是master)、2通知叢集狀態應用器applier(如GatewayService持久化state、IndicesClusterStateService等)、3通知叢集狀態監聽器listener(包含gateway等)完成叢集功能。其他叢集接收到叢集狀態變化,對應啟動indicesService服務(通過IndicesClusterStateService.applyClusterState完成)。

6、clusterstate修改成功,開始等待指定數目的active狀態的shard副本數。ActiveShardsObserver.waitForActiveShards。核心的判斷邏輯是:

Predicate shardsAllocatedPredicate = newState -> activeShardCount.enoughShardsActive(newState, indexName);

上面的Predicate判斷和執行通過ClusterStateObserver.waitForNextChange方法完成。最後判斷執行結果設定為ShardsAcked。

至此,es叢集完成了createIndex的功能。

existIndex

索引存在操作比較簡單,通過TransportIndicesExistsAction類完成,該類繼承了TransportMasterNodeReadAction類,由名字可以在master節點上執行讀操作。
TransportIndicesExistsAction.masterOperation中進行了indexName的解析。無需進行任何叢集狀態處理。

Override
    protected void masterOperation(final IndicesExistsRequest request, final ClusterState state, final ActionListener<IndicesExistsResponse> listener) {
        boolean exists;
        try {
            // Similar as the previous behaviour, but now also aliases and wildcards are supported.
            indexNameExpressionResolver.concreteIndexNames(state, request);
            exists = true;
        } catch (IndexNotFoundException e) {
            exists = false;
        }
        listener.onResponse(new IndicesExistsResponse(exists));
    }

解析的過程實質是查詢clusterState.MetaData中是否存在indexName,如果沒有則丟擲異常,解析類IndexNameExpressionResolver.concreteIndices

        final Set<Index> concreteIndices = new HashSet<>(expressions.size());
        for (String expression : expressions) {
            AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(expression);
            if (aliasOrIndex == null) {
                if (failNoIndices) {
                    IndexNotFoundException infe = new IndexNotFoundException(expression);
                    infe.setResources("index_expression", expression);
                    throw infe;
                } else {
                    continue;
                }
            }
            ...
        }    

deleteIndex

如果理解了CreateIndex,則刪除也會比較簡單。服務端通過TransportDeleteIndexAction實現,同樣也繼承了TransportMasterNodeAction類,完成master節點執行和重試功能。
1、TransportDeleteIndexAction.masterOperation方法把request轉化為DeleteIndexClusterStateUpdateRequest,執行deleteIndexService.deleteIndices方法。
2、刪除索引主要是修改clusterstate元資料,刪數其中的index。

for (final Index index : indices) {
     String indexName = index.getName();
     logger.info("{} deleting index", index);
     routingTableBuilder.remove(indexName);
     clusterBlocksBuilder.removeIndexBlocks(indexName);
     metaDataBuilder.remove(indexName);
}

3、allocationService.reroute,對shard進行重新分配。
4、修改生成新clusterstate,通過submitStateUpdateTask進行狀態釋出。此處不在贅述。

總結

elasticsearch的索引管理操作分析流程已經分析完畢。主要有三類工作:

  1. clusterstate中indexMeta的維護
  2. 叢集狀態clusterstate的釋出和應用
  3. 索引服務indiesService、分配服務allocationService的同步處理

ElasticSearch的索引處理還是比較精妙的,各部分結構定義比較清晰,如TransportMasterNodeAction主節點操作、ClusterStateObserver叢集狀態監控執行器、各類Builder(clusterstate、setting)。究其原因,是叢集服務認識深刻,作者厲害!