1. 程式人生 > >ES查詢流程原始碼解析

ES查詢流程原始碼解析

 

一些基礎知識

早先ES的HTTP協議支援還是依賴Jetty的,現在不管是Rest還是RPC都是直接基於Netty了。

另外值得一提的是,ES 是使用Google的Guice 進行模組管理,所以瞭解Guice的基本使用方式有助於你瞭解ES的程式碼組織。

ES 的啟動類是 org.elasticsearch.bootstrap.Bootstrap。在這裡進行一些配置和環境初始化後會啟動org.elasticsearch.node.Node。Node 的概念還是蠻重要的,節點的意思,也就是一個ES例項。RPC 和 Http的對應的監聽啟動都由在該類完成。

Node 屬性裡有一個很重要的物件,叫client,型別是 NodeClient,我們知道ES是一個叢集,所以每個Node都需要和其他的Nodes 進行互動,這些互動則依賴於NodeClient來完成。所以這個物件會在大部分物件中傳遞,完成相關的互動。

先簡要說下:

  • NettyTransport 對應RPC 協議支援
  • NettyHttpServerTransport 則對應HTTP協議支援

Rest 模組解析

首先,NettyHttpServerTransport 會負責進行監聽Http請求。通過配置http.netty.http.blocking_server 你可以選擇是Nio還是傳統的阻塞式服務。預設是NIO。該類在配置pipeline的時候,最後添加了HttpRequestHandler,所以具體的接受到請求後的處理邏輯就由該類來完成了。

pipeline.addLast("handler", requestHandler);

HttpRequestHandler 實現了標準的 messageReceived(ChannelHandlerContext ctx, MessageEvent e) 方法,在該方法中,HttpRequestHandler 會回撥NettyHttpServerTransport.dispatchRequest方法,而該方法會呼叫HttpServerAdapter.dispatchRequest,接著又會呼叫HttpServer.internalDispatchRequest方法(額,好吧,我承認巢狀挺深有點深):



public void internalDispatchRequest(final HttpRequest request, final HttpChannel channel) {

String rawPath = request.rawPath();

if (rawPath.startsWith("/_plugin/")) {

RestFilterChain filterChain = restController.filterChain(pluginSiteFilter);

filterChain.continueProcessing(request, channel);

return;

} else if (rawPath.equals("/favicon.ico")) {

handleFavicon(request, channel);

return;

}

restController.dispatchRequest(request, channel);

}

這個方法裡我們看到了plugin等被有限處理。最後請求又被轉發給 RestController。

RestController 大概類似一個微型的Controller層框架,實現了:

  1. 儲存了 Method + Path -> Controller 的關係
  2. 提供了註冊關係的方法
  3. 執行Controller的功能。

那麼各個Controller(Action) 是怎麼註冊到RestController中的呢?在ES中,Rest*Action 命名的類的都是提供http服務的,他們會在RestActionModule 中被初始化,對應的構造方法會注入RestController例項,接著在構造方法中,這些Action會呼叫controller.registerHandler 將自己註冊到RestController。典型的樣子是這樣的:



@Inject

public RestSearchAction(Settings settings, RestController controller, Client client) {

super(settings, controller, client);

controller.registerHandler(GET, "/_search", this);

controller.registerHandler(POST, "/_search", this);

controller.registerHandler(GET, "/{index}/_search", this);

每個Rest*Action 都會實現一個handleRequest方法。該方法接入實際的邏輯處理。


@Override

public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {

SearchRequest searchRequest;

searchRequest = RestSearchAction.parseSearchRequest(request, parseFieldMatcher);

client.search(searchRequest, new RestStatusToXContentListener<SearchResponse>(channel));

}

首先是會把 請求封裝成一個SearchRequest物件,然後交給 NodeClient 執行。

如果用過ES的NodeClient Java API,你會發現,其實上面這些東西就是為了暴露NodeClient API 的功能,使得你可以通過HTTP的方式呼叫。

Transport*Action,兩層對映關係解析

我們先跑個題,在ES中,Transport*Action 是比較核心的類集合。這裡至少有兩組對映關係。

  • Action -> Transport*Action
  • TransportAction -> TransportHandler

第一層對映關係由類似下面的程式碼在ActionModule中完成:

 registerAction(PutMappingAction.INSTANCE,  TransportPutMappingAction.class);

第二層對映則在類似 SearchServiceTransportAction 中維護。目前看來,第二層對映只有在查詢相關的功能才有,如下:

transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest.class, ThreadPool.Names.SAME, new FreeContextTransportHandler<>());

SearchServiceTransportAction 可以看做是SearchService進一步封裝。其他的Transport*Action 則只調用對應的Service 來完成實際的操作。

對應的功能是,可以通過Action 找到對應的TransportAction,這些TransportAction 如果是query類,則會呼叫SearchServiceTransportAction,並且通過第二層對映找到對應的Handler,否則可能就直接通過對應的Service完成操作。

下面關於RPC呼叫解析這塊,我們會以查詢為例。

RPC 模組解析

前面我們提到,Rest介面最後會呼叫NodeClient來完成後續的請求。對應的程式碼為:


public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {

TransportAction<Request, Response> transportAction = actions.get(action);

if (transportAction == null) {

throw new IllegalStateException("failed to find action [" + action + "] to execute");

}

transportAction.execute(request, listener);

}

這裡的action 就是我們提到的第一層對映,找到Transport*Action.如果是查詢,則會找到TransportSearchAction。呼叫對應的doExecute 方法,接著根據searchRequest.searchType找到要執行的實際程式碼。下面是預設的:

else if (searchRequest.searchType() == SearchType.QUERY_THEN_FETCH) {    queryThenFetchAction.execute(searchRequest, listener);}

我們看到Transport*Action 是可以巢狀的,這裡呼叫了TransportSearchQueryThenFetchAction.doExecute


@Overrideprotected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {

new AsyncAction(searchRequest, listener).start();

}

在AsyncAction中完成三個步驟:

  1. query
  2. fetch
  3. merge

為了分析方便,我們只分析第一個步驟。


@Overrideprotected void sendExecuteFirstPhase(

DiscoveryNode node,

ShardSearchTransportRequest request,

ActionListener<QuerySearchResultProvider> listener) {

searchService.sendExecuteQuery(node, request, listener);

}

這是AsyncAction 中執行query的程式碼。我們知道ES是一個叢集,所以query 必然要發到多個節點去,如何知道某個索引對應的Shard 所在的節點呢?這個是在AsyncAction的父類中完成,該父類分析完後會回撥子類中的對應的方法來完成,譬如上面的sendExecuteFirstPhase 方法。

說這個是因為需要讓你知道,上面貼出來的程式碼只是針對一個節點的查詢結果,但其實最終多個節點都會通過相同的方式進行呼叫。所以才會有第三個環節 merge操作,合併多個節點返回的結果。

searchService.sendExecuteQuery(node, request, listener);

其實會呼叫transportService的sendRequest方法。大概值得分析的地方有兩個:



if (node.equals(localNode)) {

sendLocalRequest(requestId, action, request);

} else {

transport.sendRequest(node, requestId, action, request, options);

}

我們先分析,如果是本地的節點,則sendLocalRequest是怎麼執行的。如果你跑到senLocalRequest裡去看,很簡單,其實就是:

reg.getHandler().messageReceived(request, channel);

reg 其實就是前面我們提到的第二個對映,不過這個對映其實還包含了使用什麼執行緒池等資訊,我們在前面沒有說明。這裡 reg.getHandler == SearchServiceTransportAction.SearchQueryTransportHandler,所以messageReceived 方法對應的邏輯是:



QuerySearchResultProvider result = searchService.executeQueryPhase(request);

channel.sendResponse(result);

這裡,我們終於看到searchService。 在searchService裡,就是整兒八景的Lucene相關查詢了。這個我們後面的系列文章會做詳細分析。

如果不是本地節點,則會由NettyTransport.sendRequest 發出遠端請求。
假設當前請求的節點是A,被請求的節點是B,則B的入口為MessageChannelHandler.messageReceived。在NettyTransport中你可以看到最後新增的pipeline裡就有MessageChannelHandler。我們跑進去messageReceived 看看,你會發現基本就是一些協議解析,核心方法是handleRequest,接著就和本地差不多了,我提取了關鍵的幾行程式碼:


final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);

threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));

這裡被RequestHandler包了一層,其實內部執行的就是本地的那個。RequestHandler 的run方法是這樣的:


protected void doRun() throws Exception { reg.getHandler().messageReceived(request, transportChannel);

}

這個就和前面的sendLocalRequest裡的一模一樣了。

總結

到目前為止,我們知道整個ES的Rest/RPC 的起點是從哪裡開始的。RPC對應的endpoint 是MessageChannelHandler,在NettyTransport 被註冊。Rest 介面的七點則在NettyHttpServerTransport,經過層層代理,最終在RestController中被執行具體的Action。 Action 的所有執行都會被委託給NodeClient。 NodeClient的功能執行單元是各種Transport*Action。對於查詢類請求,還多了一層對映關係。

 

分散式查詢

elasticsearch的搜尋主要分為結構化搜尋和全文檢索。
結構化搜尋(Structured search) 是指有關探詢那些具有內在結構資料的過程。比如日期、時間和數字都是結構化的:它們有精確的格式,我們可以對這些格式進行邏輯操作。比較常見的操作包括比較數字或時間的範圍,或判定兩個值的大小。說白了就是類SQL檢索。
全文搜尋(full-text search)是怎樣在全文欄位中搜索到最相關的文件。
因為我們主要針對解決OLAP問題,所以此處只介紹結構化搜尋。
elasticsearch整個查詢是scatter/gather思想,也是多數分散式查詢的套路,即:
1. master服務端(配置為node.master: true)接收客戶端請求,查詢對應的index、shard,分發資料請求到對應node服務端(node.data: true)
2. node端負責資料查詢,返回結果到master端
3. master端把查詢結果進行資料合併
上面流程是一個邏輯流程,es的具體查詢過程中會分為不同的查詢型別:QUERY_THEN_FETCH、QUERY_AND_FETCH(Deprecated),有不同的查詢動作。
由於QUERY_AND_FETCH在5.X已經廢除(使用QUERY_THEN_FETCH替代),所以這裡只介紹QUERY_THEN_FETCH查詢流程。

master服務端

1、接收查詢請求,進行readblock檢查。根據request的index構造相應的ShardsIterator,shardIterators由localShardsIterator和remoteShardIterators合併而成,使用者遍歷所有的shard。生成shardits會有一些查詢策略,控制每個shard的查詢優先次序和條件控制。

preferenceType = Preference.parse(preference);
switch (preferenceType) {
   case PREFER_NODES:
       final Set<String> nodesIds =
               Arrays.stream(
                       preference.substring(Preference.PREFER_NODES.type().length() + 1).split(",")
               ).collect(Collectors.toSet());
       return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
   case LOCAL:
       return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
   case PRIMARY:
       return indexShard.primaryActiveInitializingShardIt();
   case REPLICA:
       return indexShard.replicaActiveInitializingShardIt();
   case PRIMARY_FIRST:
       return indexShard.primaryFirstActiveInitializingShardsIt();
   case REPLICA_FIRST:
       return indexShard.replicaFirstActiveInitializingShardsIt();
   case ONLY_LOCAL:
       return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
   case ONLY_NODES:
       String nodeAttributes = preference.substring(Preference.ONLY_NODES.type().length() + 1);
       return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes);
   default:
       throw new IllegalArgumentException("unknown preference [" + preferenceType + "]");
}

2、根據條件設定查詢型別,根據查詢型別構造出AbstractSearchAsyncAction(繼承了InitialSearchPhase),非同步查詢action。查詢型別QUERY_THEN_FETCH構造出SearchQueryThenFetchAsyncAction。start方法啟動非同步查詢。

QUERY階段

3、query shard階段。如果需要查詢的shard數為空,則直接返回。遍歷shardits,每個shard執行query請求操作

for (final SearchShardIterator shardIt : shardsIts) {
    shardIndex++;
    final ShardRouting shard = shardIt.nextOrNull();
    if (shard != null) {
        performPhaseOnShard(shardIndex, shardIt, shard);
    } else {
        // really, no shards active in this group
        onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
    }
}

4、監聽所有shard query請求,成功返回回撥onShardResult方法,失敗返回回撥onShardFailure方法。onShardResult維護了shard計數器的工作,onShardFailure維護了計數器和shard失敗處理工作(失敗後請求該shard的下一個副本,重新發起請求)。上面所有shard均已返回(計數器判斷),則執行onPhaseDone,即executeNextPhase,進入fetch階段。

try {
    executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
        shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
        @Override
        public void innerOnResponse(FirstResult result) {
                onShardResult(result, shardIt);
        }

        @Override
        public void onFailure(Exception t) {
            onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);
        }
    });
} catch (ConnectTransportException | IllegalArgumentException ex) {
    onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, ex);
}

FETCH階段

5、FetchSearchPhase,fetch階段。如果query階段shard全部失敗,則通過raisePhaseFailure丟擲異常,否則執行FetchSearchPhase.innerRun。如果不需要進行fetch抓取(聚合查詢),則直接呼叫finishPhase進行資料合併處理;如果需要進行fetch抓取(明細查詢),則呼叫executeFetch進行資料抓取,返回後進行資料合併。
6、資料合併工作主要有searchPhaseController.merge完成。主要完成search hits,合併aggregations聚合和分析結果。結果返回給client。

context.onResponse(context.buildSearchResponse(response, scrollId));
...
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
   return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
       buildTookInMillis(), buildShardFailures());
}
...
public final void onResponse(SearchResponse response) {
    listener.onResponse(response);
}

node服務端

QUERY階段

1、接收到master端傳送來的queryaction,執行executeQueryPhase。其中SearchContext為查詢階段的上下文物件,讀取某個參考時間點快照的shard(IndexReader / contextindexsearcher),支援從query階段到fetch階段,查詢過程中主要操作該物件。

final SearchContext context = createAndPutContext(request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
boolean queryPhaseSuccess = false;
try {
    context.setTask(task);
    operationListener.onPreQueryPhase(context);
    long time = System.nanoTime();
    contextProcessing(context);

    loadOrExecuteQueryPhase(request, context);

    if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
        freeContext(context.id());
    } else {
        contextProcessedSuccessfully(context);
    }
    final long afterQueryTime = System.nanoTime();
    queryPhaseSuccess = true;
    operationListener.onQueryPhase(context, afterQueryTime - time);
    if (request.numberOfShards() == 1) {
        return executeFetchPhase(context, operationListener, afterQueryTime);
    }
    return context.queryResult();
} catch (Exception e) {
    // execution exception can happen while loading the cache, strip it
    if (e instanceof ExecutionException) {
        e = (e.getCause() == null || e.getCause() instanceof Exception) ?
            (Exception) e.getCause() : new ElasticsearchException(e.getCause());
    }
    if (!queryPhaseSuccess) {
        operationListener.onFailedQueryPhase(context);
    }
    logger.trace("Query phase failed", e);
    processFailure(context, e);
    throw ExceptionsHelper.convertToRuntime(e);
} finally {
    cleanContext(context);
}

建立context程式碼

final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout, fetchPhase);

2、執行查詢階段,loadOrExecuteQueryPhase(request, context)。首先在cache裡面判斷是否有快取,如果有則執行快取查詢indicesService.loadIntoContext;如果cache裡面沒有,執行queryPhase.execute(context),程式碼如下:

if (searchContext.hasOnlySuggest()) {
    suggestPhase.execute(searchContext);
    // TODO: fix this once we can fetch docs for suggestions
    searchContext.queryResult().topDocs(
            new TopDocs(0, Lucene.EMPTY_SCORE_DOCS, 0),
            new DocValueFormat[0]);
    return;
}
// Pre-process aggregations as late as possible. In the case of a DFS_Q_T_F
// request, preProcess is called on the DFS phase phase, this is why we pre-process them
// here to make sure it happens during the QUERY phase
aggregationPhase.preProcess(searchContext);

boolean rescore = execute(searchContext, searchContext.searcher());

if (rescore) { // only if we do a regular search
    rescorePhase.execute(searchContext);
}
suggestPhase.execute(searchContext);
aggregationPhase.execute(searchContext);

if (searchContext.getProfilers() != null) {
    ProfileShardResult shardResults = SearchProfileShardResults
            .buildShardResults(searchContext.getProfilers());
    searchContext.queryResult().profileResults(shardResults);
}

3、其中execute是對索引進行查詢,呼叫lucene的searcher.search(query, collector)。還支援聚合查詢,aggregationPhase.execute(searchContext)(下節介紹)。
4、最終返回context.queryResult()。

FETCH階段

1、接收到來自master端的fetchquery,執行executeFetchPhase。首先通過request尋找SearchContext,findContext(request.id(), request)。

final SearchContext context = findContext(request.id(), request);
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
context.incRef();
try {
    context.setTask(task);
    contextProcessing(context);
    if (request.lastEmittedDoc() != null) {
        context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
    }
    context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
    operationListener.onPreFetchPhase(context);
    long time = System.nanoTime();
    fetchPhase.execute(context);
    if (fetchPhaseShouldFreeContext(context)) {
        freeContext(request.id());
    } else {
        contextProcessedSuccessfully(context);
    }
    operationListener.onFetchPhase(context, System.nanoTime() - time);
    return context.fetchResult();
} catch (Exception e) {
    operationListener.onFailedFetchPhase(context);
    logger.trace("Fetch phase failed", e);
    processFailure(context, e);
    throw ExceptionsHelper.convertToRuntime(e);
} finally {
    cleanContext(context);
}

2、核心的查詢方法是fetchPhase.execute(context)。主要是輪流通過上輪query結果中的docsIds,建立SearchHit[]集合,最後放在fetchResult中。

for (int index = 0; index < context.docIdsToLoadSize(); index++) {
    ...
    final SearchHit searchHit;
    try {
        int rootDocId = findRootDocumentIfNested(context, subReaderContext, subDocId);
        if (rootDocId != -1) {
            searchHit = createNestedSearchHit(context, docId, subDocId, rootDocId, fieldNames, fieldNamePatterns, subReaderContext);
        } else {
            searchHit = createSearchHit(context, fieldsVisitor, docId, subDocId, subReaderContext);
        }
    } catch (IOException e) {
        throw ExceptionsHelper.convertToElastic(e);
    }

    hits[index] = searchHit;
    hitContext.reset(searchHit, subReaderContext, subDocId, context.searcher());
    for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
        fetchSubPhase.hitExecute(context, hitContext);
    }
}

for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
    fetchSubPhase.hitsExecute(context, hits);
}

context.fetchResult().hits(new SearchHits(hits, context.queryResult().getTotalHits(), context.queryResult().getMaxScore()));

3、釋放SearchContext,freeContext。該釋放有兩類情況:1是在masterquer端如果命中該shard(需要該shard執行fetch),則執行fetch完成之後(如上介紹);2是沒有命中該shard,則在master端會發送釋放context的請求到指定節點,進行釋放。
4、fetch查詢結果返回給master端。完成。

elasticsearch原始碼分析之search模組(server端) 

繼續接著上一篇的來說啊,當client端將search的請求傳送到某一個node之後,剩下的事情就是server端來處理了,具體包括哪些步驟呢?

過程

一、首先我們來看看接收地方其實就是在org.elasticsearch.action.search.TransportSearchAction中,收到請求之後會判斷請求的index的shard是否只有一個,如果是一個的話,那麼會強制將請求的type設定為QUERY_AND_FETCH,因為所以的事情在此shard上就能夠做完了。所以如果設定了routing,而讓請求落在了一個shard上時,搜尋的效率會高很多的原因。

二、根據不同的type來確定不同的處理方式,這裡補充一下,上一篇可能忘記說了,search的type一般來說分為“DFS_QUERY_THEN_FETCH、QUERY_THEN_FETCH、DFS_QUERY_AND_FETCH、QUERY_AND_FETCH”這四種,還有“SCAN、COUNT”在ES2.X裡面其實已經被捨棄掉了。我們一般都是用的預設的QUERY_THEN_FETCH,上面說的一個shard的除外。所以本篇就只討論這種情況了。

三、得到搜尋的index所涉及的shard,並依次執行: 1、獲取該shard所在的node並執行sendExecuteFirstPhase,實際上是向node傳送了一個“QUERY”的請求:

transportService.sendRequest(node, QUERY_ACTION_NAME, request, new ActionListenerResponseHandler<QuerySearchResultProvider>(listener) {
    @Override
    public QuerySearchResult newInstance() {
        return new QuerySearchResult();
    }
});

2、node接收到"QUERY"的請求之後,執行executeQueryPhase首先是建立一個search的context,

SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter(), parseFieldMatcher, defaultSearchTimeout);

建立的具體過程就不詳細說了,之後做的事情還是有parseSource、對size做判斷(2.X裡面最大不超過10000,可以通過配置檔案配置)、……

最重要的其實是loadOrExecuteQueryPhase(request, context, queryPhase);,具體的內容是首先從cache裡面執行query,如果cache裡面沒有找到,才會執行queryPhase:queryPhase.execute(context);;裡面的處理邏輯就比較複雜了,但是最重要的是searcher.search(query, collector);,其實是呼叫了Lucene裡面IndexSeartcher的search方法。

3、如此一來,第一階段的query已經做完了,,接下來便是fetch的執行,入口在onFirstPhaseResult這裡,在底層同樣是向node傳送一個“FETCH”請求咯:

4、node接收到“fetch”請求之後,執行executeFetchPhase:

fetch的核心程式碼如下:

。。。

大意就是輪流通過之前query結果中的docid,然後創建出InternalSearchHit的集合,並將之放在fetchResult中context.fetchResult().hits(new InternalSearchHits(hits, context.queryResult().topDocs().totalHits, context.queryResult().topDocs().getMaxScore()));,並將之返回到傳送fetch的node。

四、到目前為止,該獲取的資料都已經拿到了,現在要做的則是要把個node的返回結果做merge,merge的操作由SearchPhaseController來控制:

final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
        fetchResults, request);

具體的過程就不細說了,大體就是該排序的就做排序,有aggs的就做aggs……

五、通過listener將上面的結果返回:listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));給發出接收search請求的node,也就是上一篇說道的client。