1. 程式人生 > >elasticsearch 2.3.5 源碼簡單分析

elasticsearch 2.3.5 源碼簡單分析

elasticsearch 源碼

TransportClient, 它用來初始化與elasticsearch集群的鏈接,並調用 transportService.start(); 來啟動服務器,與elasticsearch建立通訊。其中
modules.add(new ActionModule(true)); 用來加載請求操作對應的類

ActionModule 定義每種操作對應的類,相當於入口
registerAction(SearchAction.INSTANCE, TransportSearchAction.class); 搜索請求對應這個類,
registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class); 刪除請求對應這個類。

重點看TransportSearchAction類。

 switch(searchRequest.searchType()) {
            case DFS_QUERY_THEN_FETCH:
                searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchService, clusterService,
                        indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
                break;
            case QUERY_THEN_FETCH:
                searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchService, clusterService,
                        indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
                break;
            case DFS_QUERY_AND_FETCH:
                searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchService, clusterService,
                        indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
                break;
            case QUERY_AND_FETCH:
                searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchService, clusterService,
                        indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
                break;

根據查詢方式,走不同的分支,

if (shardCount == 1) {
                    // if we only have one group, then we always want Q_A_F, no need for DFS, and no need to do THEN since we hit one shard
                    searchRequest.searchType(QUERY_AND_FETCH);
                }

當只有一個分片時,直接采用QUERY_AND_FETCH 方式,這種方式在多分片時,會返回 n*size 個結果,但在一個分片時是沒問題的,而且速度快,後面會講到。

首先看一下QUERY_AND_FETCH 對應的操作流程

SearchQueryAndFetchAsyncAction 是入口,我們看一下該類的start 方法,在其父類中

  for (final ShardIterator shardIt : shardsIts) {
            shardIndex++;
            final ShardRouting shard = shardIt.nextOrNull();
            if (shard != null) {
                performFirstPhase(shardIndex, shardIt, shard);
            } else {
                // really, no shards active in this group
                onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
            }
        }

根據涉及到分片,以此執行performFirstPhase,其的作用就是生成一個內部的分片搜索請求,這種請求只針對一個分片,最終調用以下代碼

 sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime()), new ActionListener<FirstResult>() {
                    @Override
                    public void onResponse(FirstResult result) {
                        onFirstPhaseResult(shardIndex, shard, result, shardIt);
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        onFirstPhaseResult(shardIndex, shard, node.id(), shardIt, t);
                    }
                });

onFirstPhaseResult主要作用是調用子類的moveToSecondPhase。這個方法在executeFetchPhase之後才執行的,因此在其後面再介紹。接著執行

 protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
                                         ActionListener<QueryFetchSearchResult> listener) {
        searchService.sendExecuteFetch(node, request, listener);
    }

sendExecuteFetch定義在SearchServiceTransportAction中,它會將分片搜索請求轉發到對應的節點上。

看一樣其源代碼

 public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener<QueryFetchSearchResult> listener) {
        transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request, new ActionListenerResponseHandler<QueryFetchSearchResult>(listener) {
            @Override
            public QueryFetchSearchResult newInstance() {
                return new QueryFetchSearchResult();
            }
        });
    }

註意另個handler

transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest.class, ThreadPool.Names.SEARCH, new SearchQueryFetchTransportHandler());`
class SearchQueryFetchTransportHandler extends TransportRequestHandler<ShardSearchTransportRequest> {
        @Override
        public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception {
            QueryFetchSearchResult result = searchService.executeFetchPhase(request);
            channel.sendResponse(result);
        }
    }

接著看sendRequest

if (node.equals(localNode)) {
                sendLocalRequest(requestId, action, request);
            } else {
                transport.sendRequest(node, requestId, action, request, options);
            }

sendLocalRequest 最終會調用SearchQueryFetchTransportHandler 的messageReceived 方法, 從而開始 執行 executeFetchPhase。

看一下 executeFetchPhase的源代碼:

 loadOrExecuteQueryPhase(request, context, queryPhase); 裏面是
  if (canCache) {
            indicesQueryCache.loadIntoContext(request, context, queryPhase);
  } else {
    queryPhase.execute(context);
 }

execute 裏面是最終的查詢操作,包括各種參數的解析,這裏先不講,
接著看executeFetchPhase 類, 發現裏面還有
fetchPhase.execute(context);
最終返回
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());

從上面的代碼可以看出,queryPhase和fetchPhase是連續執行的,這就是query_and_fetch的含義。

返回的結果中包含query的結果和fetch的結果。對應的兩個類分別是QuerySearchResult和FetchSearchResult。
QuerySearchResult 類

 private long id;
    private SearchShardTarget shardTarget;
    private int from;
    private int size;
    private TopDocs topDocs;
    private InternalAggregations aggregations;
    private List<SiblingPipelineAggregator> pipelineAggregators;
    private Suggest suggest;
    private boolean searchTimedOut;
    private Boolean terminatedEarly = null;
    private List<ProfileShardResult> profileShardResults;

再看看TopDocs的定義:

   /** The total number of hits for the query. */
  public int totalHits;

  /** The top hits for the query. */
  public ScoreDoc[] scoreDocs;

  /** Stores the maximum score value encountered, needed for normalizing. */
  private float maxScore;

裏面包含了搜索命中的結果數量、文檔編號、文檔匹配分數、最大匹配分數。再看看ScoreDoc是如何定義的。


 /** The score of this document for the query. */
  public float score;

  /** A hit document‘s number.
   * @see IndexSearcher#doc(int) */
  public int doc;

  /** Only set by {@link TopDocs#merge} */
  public int shardIndex;

這裏的doc就是內部的文檔編號,可以通過IndexSearcher#doc(int)方法獲取對應的文檔內容。

因此QuerySearchResult中只包含了內部的文檔編號、文檔的匹配分值。

再看 FetchSearchResult

 private long id;
    private SearchShardTarget shardTarget;
    private InternalSearchHits hits;
    // client side counter
    private transient int counter;

InternalSearchHits的定義如下:

 private InternalSearchHit[] hits;

    public long totalHits;

    private float maxScore;

InternalSearchHit 源碼如下
private transient int docId;


    private float score = Float.NEGATIVE_INFINITY;

    private Text id;
    private Text type;

    private InternalNestedIdentity nestedIdentity;

    private long version = -1;

    private BytesReference source;

    private Map<String, SearchHitField> fields = ImmutableMap.of();

    private Map<String, HighlightField> highlightFields = null;

    private Object[] sortValues = EMPTY_SORT_VALUES;

    private String[] matchedQueries = Strings.EMPTY_ARRAY;

    private Explanation explanation;

    @Nullable
    private SearchShardTarget shard;

    private Map<String, Object> sourceAsMap;
    private byte[] sourceAsBytes;

    private Map<String, InternalSearchHits> innerHits;

它包含了文檔的原始內容和解析後的內容。

moveToSecondPhase執行過程
在上面的executeFetchPhase執行完成之後,得到query結果和fetch結果之後,就執行moveToSecondPhase了,關鍵的兩行

 sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
                final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
                    firstResults, request);

從代碼中可以看出,搜索的第二階段是在search線程池中提交一個任務,首先是對分片結果進行整體排序,然後將搜索結果進行合並。這裏面分別調用了searchPhaseController.sortDocs和searchPhaseController.merge兩個方法。

排序先不說,merge 主要是合並 hits suggest addAggregation, 所以我們會看到每個search 請求會有 hit 區域 聚合區域
所以 如果在搜索的時候指定search_type為query_and_fetch,再指定size為10,那麽就會返回50個結果。,因為每個分片都返回了10個文檔,如果有5個分片的話,那麽最後合並的結果就是50個。

query_then_fetch執行有些類似
其 executeQueryPhase,如下

if (canCache) {
            indicesQueryCache.loadIntoContext(request, context, queryPhase);
        } else {
            queryPhase.execute(context);
        }

只有query沒有fetch。這是很重要的區別,它只包含文檔編號和必要的排序分值
當queryPhase結束之後,就開始第二階段了。第二階段從moveToSecondPhase開始。它的代碼定義在TransportSearchQueryThenFetchAction中。主要的作用是將第一階段獲取的文檔編號進行排序。排序完成之後再根據文檔編號獲取文檔裏面實際的內容。相關的代碼如下:


  sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
        searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
                for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
            QuerySearchResultProvider queryResult = firstResults.get(entry.index);
            DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
            ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
            executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
        }

sortDocs前面已經講過了,作用是將文檔編號根據每個文檔的匹配分值進行排序。
executeFetch相關的代碼如下。它的作用是調用searchService開啟一個異步任務,根據文檔編號獲取文檔的具體內容,並將結果存放到fetchResults中。根據counter判斷如果所有的fetch任務都執行完了,就調用finishHim來完成本次查詢結果。

finishHim相關的代碼如下。它的作用是合並每個分片的查詢結果,讓後將合並結果通知給listener。讓它完成最後的查詢結果。searchPhaseController.merge在前面講過了。
剩下的一些內容下次再聊,參考內容
https://blog.csdn.net/caipeichao2/article/details/46418413

elasticsearch 2.3.5 源碼簡單分析