1. 程式人生 > >elasticsearch原始碼分析之search查詢(十一)

elasticsearch原始碼分析之search查詢(十一)

分散式查詢

elasticsearch的搜尋主要分為結構化搜尋和全文檢索。
結構化搜尋(Structured search) 是指有關探詢那些具有內在結構資料的過程。比如日期、時間和數字都是結構化的:它們有精確的格式,我們可以對這些格式進行邏輯操作。比較常見的操作包括比較數字或時間的範圍,或判定兩個值的大小。說白了就是類SQL檢索。
全文搜尋(full-text search)是怎樣在全文欄位中搜索到最相關的文件。
因為我們主要針對解決OLAP問題,所以此處只介紹結構化搜尋。
elasticsearch整個查詢是scatter/gather思想,也是多數分散式查詢的套路,即:
1. master服務端(配置為node.master: true)接收客戶端請求,查詢對應的index、shard,分發資料請求到對應node服務端(node.data: true)
2. node端負責資料查詢,返回結果到master端
3. master端把查詢結果進行資料合併
上面流程是一個邏輯流程,es的具體查詢過程中會分為不同的查詢型別:QUERY_THEN_FETCH、QUERY_AND_FETCH(Deprecated),有不同的查詢動作。
由於QUERY_AND_FETCH在5.X已經廢除(使用QUERY_THEN_FETCH替代),所以這裡只介紹QUERY_THEN_FETCH查詢流程。

master服務端

1、接收查詢請求,進行readblock檢查。根據request的index構造相應的ShardsIterator,shardIterators由localShardsIterator和remoteShardIterators合併而成,使用者遍歷所有的shard。生成shardits會有一些查詢策略,控制每個shard的查詢優先次序和條件控制。

preferenceType = Preference.parse(preference);
switch (preferenceType) {
   case PREFER_NODES:
       final Set<String> nodesIds =
               Arrays.stream(
                       preference.substring(Preference.PREFER_NODES.type().length() + 1
).split(",") ).collect(Collectors.toSet()); return indexShard.preferNodeActiveInitializingShardsIt(nodesIds); case LOCAL: return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId)); case PRIMARY: return indexShard.primaryActiveInitializingShardIt(); case
REPLICA: return indexShard.replicaActiveInitializingShardIt(); case PRIMARY_FIRST: return indexShard.primaryFirstActiveInitializingShardsIt(); case REPLICA_FIRST: return indexShard.replicaFirstActiveInitializingShardsIt(); case ONLY_LOCAL: return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId); case ONLY_NODES: String nodeAttributes = preference.substring(Preference.ONLY_NODES.type().length() + 1); return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes); default: throw new IllegalArgumentException("unknown preference [" + preferenceType + "]"); }

2、根據條件設定查詢型別,根據查詢型別構造出AbstractSearchAsyncAction(繼承了InitialSearchPhase),非同步查詢action。查詢型別QUERY_THEN_FETCH構造出SearchQueryThenFetchAsyncAction。start方法啟動非同步查詢。

QUERY階段

3、query shard階段。如果需要查詢的shard數為空,則直接返回。遍歷shardits,每個shard執行query請求操作

for (final SearchShardIterator shardIt : shardsIts) {
    shardIndex++;
    final ShardRouting shard = shardIt.nextOrNull();
    if (shard != null) {
        performPhaseOnShard(shardIndex, shardIt, shard);
    } else {
        // really, no shards active in this group
        onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
    }
}

4、監聽所有shard query請求,成功返回回撥onShardResult方法,失敗返回回撥onShardFailure方法。onShardResult維護了shard計數器的工作,onShardFailure維護了計數器和shard失敗處理工作(失敗後請求該shard的下一個副本,重新發起請求)。上面所有shard均已返回(計數器判斷),則執行onPhaseDone,即executeNextPhase,進入fetch階段。

try {
    executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
        shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
        @Override
        public void innerOnResponse(FirstResult result) {
                onShardResult(result, shardIt);
        }

        @Override
        public void onFailure(Exception t) {
            onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);
        }
    });
} catch (ConnectTransportException | IllegalArgumentException ex) {
    onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, ex);
}

FETCH階段

5、FetchSearchPhase,fetch階段。如果query階段shard全部失敗,則通過raisePhaseFailure丟擲異常,否則執行FetchSearchPhase.innerRun。如果不需要進行fetch抓取(聚合查詢),則直接呼叫finishPhase進行資料合併處理;如果需要進行fetch抓取(明細查詢),則呼叫executeFetch進行資料抓取,返回後進行資料合併。
6、資料合併工作主要有searchPhaseController.merge完成。主要完成search hits,合併aggregations聚合和分析結果。結果返回給client。

context.onResponse(context.buildSearchResponse(response, scrollId));
...
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
   return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
       buildTookInMillis(), buildShardFailures());
}
...
public final void onResponse(SearchResponse response) {
    listener.onResponse(response);
}

node服務端

QUERY階段

1、接收到master端傳送來的queryaction,執行executeQueryPhase。其中SearchContext為查詢階段的上下文物件,讀取某個參考時間點快照的shard(IndexReader / contextindexsearcher),支援從query階段到fetch階段,查詢過程中主要操作該物件。

final SearchContext context = createAndPutContext(request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
boolean queryPhaseSuccess = false;
try {
    context.setTask(task);
    operationListener.onPreQueryPhase(context);
    long time = System.nanoTime();
    contextProcessing(context);

    loadOrExecuteQueryPhase(request, context);

    if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
        freeContext(context.id());
    } else {
        contextProcessedSuccessfully(context);
    }
    final long afterQueryTime = System.nanoTime();
    queryPhaseSuccess = true;
    operationListener.onQueryPhase(context, afterQueryTime - time);
    if (request.numberOfShards() == 1) {
        return executeFetchPhase(context, operationListener, afterQueryTime);
    }
    return context.queryResult();
} catch (Exception e) {
    // execution exception can happen while loading the cache, strip it
    if (e instanceof ExecutionException) {
        e = (e.getCause() == null || e.getCause() instanceof Exception) ?
            (Exception) e.getCause() : new ElasticsearchException(e.getCause());
    }
    if (!queryPhaseSuccess) {
        operationListener.onFailedQueryPhase(context);
    }
    logger.trace("Query phase failed", e);
    processFailure(context, e);
    throw ExceptionsHelper.convertToRuntime(e);
} finally {
    cleanContext(context);
}

建立context程式碼

final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase);

2、執行查詢階段,loadOrExecuteQueryPhase(request, context)。首先在cache裡面判斷是否有快取,如果有則執行快取查詢indicesService.loadIntoContext;如果cache裡面沒有,執行queryPhase.execute(context),程式碼如下:

if (searchContext.hasOnlySuggest()) {
    suggestPhase.execute(searchContext);
    // TODO: fix this once we can fetch docs for suggestions
    searchContext.queryResult().topDocs(
            new TopDocs(0, Lucene.EMPTY_SCORE_DOCS, 0),
            new DocValueFormat[0]);
    return;
}
// Pre-process aggregations as late as possible. In the case of a DFS_Q_T_F
// request, preProcess is called on the DFS phase phase, this is why we pre-process them
// here to make sure it happens during the QUERY phase
aggregationPhase.preProcess(searchContext);

boolean rescore = execute(searchContext, searchContext.searcher());

if (rescore) { // only if we do a regular search
    rescorePhase.execute(searchContext);
}
suggestPhase.execute(searchContext);
aggregationPhase.execute(searchContext);

if (searchContext.getProfilers() != null) {
    ProfileShardResult shardResults = SearchProfileShardResults
            .buildShardResults(searchContext.getProfilers());
    searchContext.queryResult().profileResults(shardResults);
}

3、其中execute是對索引進行查詢,呼叫lucene的searcher.search(query, collector)。還支援聚合查詢,aggregationPhase.execute(searchContext)(下節介紹)。
4、最終返回context.queryResult()。

FETCH階段

1、接收到來自master端的fetchquery,執行executeFetchPhase。首先通過request尋找SearchContext,findContext(request.id(), request)。

final SearchContext context = findContext(request.id(), request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
try {
    context.setTask(task);
    contextProcessing(context);
    if (request.lastEmittedDoc() != null) {
        context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
    }
    context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
    operationListener.onPreFetchPhase(context);
    long time = System.nanoTime();
    fetchPhase.execute(context);
    if (fetchPhaseShouldFreeContext(context)) {
        freeContext(request.id());
    } else {
        contextProcessedSuccessfully(context);
    }
    operationListener.onFetchPhase(context, System.nanoTime() - time);
    return context.fetchResult();
} catch (Exception e) {
    operationListener.onFailedFetchPhase(context);
    logger.trace("Fetch phase failed", e);
    processFailure(context, e);
    throw ExceptionsHelper.convertToRuntime(e);
} finally {
    cleanContext(context);
}

2、核心的查詢方法是fetchPhase.execute(context)。主要是輪流通過上輪query結果中的docsIds,建立SearchHit[]集合,最後放在fetchResult中。

for (int index = 0; index < context.docIdsToLoadSize(); index++) {
    ...
    final SearchHit searchHit;
    try {
        int rootDocId = findRootDocumentIfNested(context, subReaderContext, subDocId);
        if (rootDocId != -1) {
            searchHit = createNestedSearchHit(context, docId, subDocId, rootDocId, fieldNames, fieldNamePatterns, subReaderContext);
        } else {
            searchHit = createSearchHit(context, fieldsVisitor, docId, subDocId, subReaderContext);
        }
    } catch (IOException e) {
        throw ExceptionsHelper.convertToElastic(e);
    }

    hits[index] = searchHit;
    hitContext.reset(searchHit, subReaderContext, subDocId, context.searcher());
    for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
        fetchSubPhase.hitExecute(context, hitContext);
    }
}

for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
    fetchSubPhase.hitsExecute(context, hits);
}

context.fetchResult().hits(new SearchHits(hits, context.queryResult().getTotalHits(), context.queryResult().getMaxScore()));

3、釋放SearchContext,freeContext。該釋放有兩類情況:1是在masterquer端如果命中該shard(需要該shard執行fetch),則執行fetch完成之後(如上介紹);2是沒有命中該shard,則在master端會發送釋放context的請求到指定節點,進行釋放。
4、fetch查詢結果返回給master端。完成。

總結

ES整個查詢過程是scatter/gather的過程,具體如下:

Created with Raphaël 2.1.0client端client端master端master端node端node端SearchRequestShardSearchRequestSearchPhaseResultShardFetchRequestFetchPhaseResultSearchResponse

下節詳細學習一下es的聚合查詢Aggregation。