solr原始碼閱讀(記錄一次除錯熟悉solr整體呼叫流程)
歷史版本solr5.3,開發需要,把一次原始碼閱讀的經歷記錄一下,這次擼的是一次普通的select的流程程式碼閱讀,根據查詢引數肯定有很多分支,這裡只閱讀了一條,但大體也能看明白solr的整體編碼結構
第一步,web專案先看什麼,當然是web.xml
第二步:看一下SolrRequestFilter,前面主要做了一些url、許可權的驗證忽略跳過了,直接看核心
HttpSolrCall call = getHttpSolrCall((HttpServletRequest) request, (HttpServletResponse) response, retry);
對request、response還有核心 SolrCore進行了一次包裝聚合
Action result = call.call();
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain, boolean retry) throws IOException, ServletException { if (!(request instanceof HttpServletRequest)) return; AtomicReference<ServletRequest> wrappedRequest = new AtomicReference<>(); if (!authenticateRequest(request, response, wrappedRequest)) { // the response and status code have already been sent return; } if (wrappedRequest.get() != null) { request = wrappedRequest.get(); } if (cores.getAuthenticationPlugin() != null) { log.debug("User principal: {}", ((HttpServletRequest)request).getUserPrincipal()); } // No need to even create the HttpSolrCall object if this path is excluded. if(excludePatterns != null) { String requestPath = ((HttpServletRequest) request).getServletPath();// /db/select String extraPath = ((HttpServletRequest)request).getPathInfo(); if (extraPath != null) { // In embedded mode, servlet path is empty - include all post-context path here for testing requestPath += extraPath; } for (Pattern p : excludePatterns) { Matcher matcher = p.matcher(requestPath); if (matcher.lookingAt()) { chain.doFilter(request, response); return; } } } HttpSolrCall call = getHttpSolrCall((HttpServletRequest) request, (HttpServletResponse) response, retry);//重要 try { Action result = call.call();//重要 switch (result) { case PASSTHROUGH: chain.doFilter(request, response); break; case RETRY: doFilter(request, response, chain, true); break; case FORWARD: request.getRequestDispatcher(call.getPath()).forward(request, response); break; } } finally { call.destroy(); } }
第三步:HttpSolrCall.call() 做了什麼?
/** * This method processes the request. */ public Action call() throws IOException { MDCLoggingContext.reset(); MDCLoggingContext.setNode(cores); if (cores == null) { sendError(503, "Server is shutting down or failed to initialize"); return RETURN; } if (solrDispatchFilter.abortErrorMessage != null) { sendError(500, solrDispatchFilter.abortErrorMessage); return RETURN; } try { init(); //這裡面 做了一些初始化,關注了一下handler的初始化 handler = cores.getRequestHandler(path); /* Authorize the request if 1. Authorization is enabled, and 2. The requested resource is not a known static file */ if (cores.getAuthorizationPlugin() != null && shouldAuthorize()) { AuthorizationContext context = getAuthCtx(); log.info(context.toString()); AuthorizationResponse authResponse = cores.getAuthorizationPlugin().authorize(context); if (authResponse.statusCode == AuthorizationResponse.PROMPT.statusCode) { Map<String, String> headers = (Map) getReq().getAttribute(AuthenticationPlugin.class.getName()); if (headers != null) { for (Map.Entry<String, String> e : headers.entrySet()) response.setHeader(e.getKey(), e.getValue()); } log.debug("USER_REQUIRED "+req.getHeader("Authorization")+" "+ req.getUserPrincipal()); } if (!(authResponse.statusCode == HttpStatus.SC_ACCEPTED) && !(authResponse.statusCode == HttpStatus.SC_OK)) { sendError(authResponse.statusCode, "Unauthorized request, Response code: " + authResponse.statusCode); return RETURN; } } HttpServletResponse resp = response; switch (action) { case ADMIN: handleAdminRequest(); return RETURN; case REMOTEQUERY: remoteQuery(coreUrl + path, resp); return RETURN; case PROCESS: final Method reqMethod = Method.getMethod(req.getMethod()); HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod); // unless we have been explicitly told not to, do cache validation // if we fail cache validation, execute the query if (config.getHttpCachingConfig().isNever304() || !HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) { SolrQueryResponse solrRsp = new SolrQueryResponse(); /* even for HEAD requests, we need to execute the handler to * ensure we don't get an error (and to make sure the correct * QueryResponseWriter is selected and we get the correct * Content-Type) */ SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp)); execute(solrRsp); HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod); Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders(); while (headers.hasNext()) { Map.Entry<String, String> entry = headers.next(); resp.addHeader(entry.getKey(), entry.getValue()); } QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq); if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates); writeResponse(solrRsp, responseWriter, reqMethod); } return RETURN; default: return action; } } catch (Throwable ex) { sendError(ex); // walk the the entire cause chain to search for an Error Throwable t = ex; while (t != null) { if (t instanceof Error) { if (t != ex) { SolrDispatchFilter.log.error("An Error was wrapped in another exception - please report complete stacktrace on SOLR-6161", ex); } throw (Error) t; } t = t.getCause(); } return RETURN; } finally { MDCLoggingContext.clear(); } }
call方法裡面重要的有:
1.執行本次請求
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp)); execute(solrRsp); HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);
2.對搜尋結果進行封裝,並通過response進行回寫
QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq); if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates); writeResponse(solrRsp, responseWriter, reqMethod);
第五步:先觀察一下 上面的 execute,最終呼叫 SolrCore 核心類的 execute方法
solrReq.getCore().execute(handler, solrReq, rsp);//最終呼叫 SolrCore 核心類的 execute方法
public void execute(SolrRequestHandler handler, SolrQueryRequest req, SolrQueryResponse rsp) { if (handler==null) { String msg = "Null Request Handler '" + req.getParams().get(CommonParams.QT) + "'"; if (log.isWarnEnabled()) log.warn(logid + msg + ":" + req); throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, msg); } preDecorateResponse(req, rsp);//返回結果預拼裝,可以看一下,不過感覺返回的時候的封裝更值得一讀 if (requestLog.isDebugEnabled() && rsp.getToLog().size() > 0) { // log request at debug in case something goes wrong and we aren't able to log later requestLog.debug(rsp.getToLogAsString(logid)); } // TODO: this doesn't seem to be working correctly and causes problems with the example server and distrib (for example /spell) // if (req.getParams().getBool(ShardParams.IS_SHARD,false) && !(handler instanceof SearchHandler)) // throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,"isShard is only acceptable with search handlers"); handler.handleRequest(req,rsp);//這裡用到了第三步 init 初始化的handler ,我測試的seach方法,這裡用到的就是 SearchHandler類的 handler方法 postDecorateResponse(handler, req, rsp);//和 前面的 preDecorateResponse 成對拼接字串 if (rsp.getToLog().size() > 0) { if (requestLog.isInfoEnabled()) { requestLog.info(rsp.getToLogAsString(logid)); } if (log.isWarnEnabled() && slowQueryThresholdMillis >= 0) { final long qtime = (long) (req.getRequestTimer().getTime()); if (qtime >= slowQueryThresholdMillis) { log.warn("slow: " + rsp.getToLogAsString(logid)); } } } }
第六步: 繼續深挖,handler.handleRequest(req,rsp) 先執行了RequestHandlerBase handleRequest,然後裡面主要呼叫了 SearchHandler的 handleRequestBody方法,原因看第四步的程式碼,這個比較長,大部分邏輯都在這裡面了,但看主要的就是通過各個 Components來實現搜尋預處理、處理 prepare方法 和 process方法,這裡呼叫了 QueryComponent
@Override public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { List<SearchComponent> components = getComponents(); ResponseBuilder rb = new ResponseBuilder(req, rsp, components); if (rb.requestInfo != null) { rb.requestInfo.setResponseBuilder(rb); } boolean dbg = req.getParams().getBool(CommonParams.DEBUG_QUERY, false); rb.setDebug(dbg); if (dbg == false){//if it's true, we are doing everything anyway. SolrPluginUtils.getDebugInterests(req.getParams().getParams(CommonParams.DEBUG), rb); } final RTimer timer = rb.isDebug() ? req.getRequestTimer() : null; final ShardHandler shardHandler1 = getAndPrepShardHandler(req, rb); // creates a ShardHandler object only if it's needed if (timer == null) { // non-debugging prepare phase for( SearchComponent c : components ) { c.prepare(rb);//重點關注 } } else { // debugging prepare phase RTimer subt = timer.sub( "prepare" ); for( SearchComponent c : components ) { rb.setTimer( subt.sub( c.getName() ) ); c.prepare(rb);//重點關注 rb.getTimer().stop(); } subt.stop(); } if (!rb.isDistrib) { // a normal non-distributed request long timeAllowed = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L); if (timeAllowed > 0L) { SolrQueryTimeoutImpl.set(timeAllowed); } try { // The semantics of debugging vs not debugging are different enough that // it makes sense to have two control loops if(!rb.isDebug()) { // Process for( SearchComponent c : components ) { c.process(rb);//重點關注 } } else { // Process RTimer subt = timer.sub( "process" ); for( SearchComponent c : components ) { rb.setTimer( subt.sub( c.getName() ) ); c.process(rb);//重點關注 rb.getTimer().stop(); } subt.stop(); // add the timing info if (rb.isDebugTimings()) { rb.addDebugInfo("timing", timer.asNamedList() ); } } } catch (ExitableDirectoryReader.ExitingReaderException ex) { log.warn( "Query: " + req.getParamString() + "; " + ex.getMessage()); SolrDocumentList r = (SolrDocumentList) rb.rsp.getValues().get("response"); if(r == null) r = new SolrDocumentList(); r.setNumFound(0); rb.rsp.add("response", r); if(rb.isDebug()) { NamedList debug = new NamedList(); debug.add("explain", new NamedList()); rb.rsp.add("debug", debug); } rb.rsp.getResponseHeader().add("partialResults", Boolean.TRUE); } finally { SolrQueryTimeoutImpl.reset(); } } else { // a distributed request if (rb.outgoing == null) { rb.outgoing = new LinkedList<>(); } rb.finished = new ArrayList<>(); int nextStage = 0; do { rb.stage = nextStage; nextStage = ResponseBuilder.STAGE_DONE; // call all components for( SearchComponent c : components ) { // the next stage is the minimum of what all components report nextStage = Math.min(nextStage, c.distributedProcess(rb)); } // check the outgoing queue and send requests while (rb.outgoing.size() > 0) { // submit all current request tasks at once while (rb.outgoing.size() > 0) { ShardRequest sreq = rb.outgoing.remove(0); sreq.actualShards = sreq.shards; if (sreq.actualShards==ShardRequest.ALL_SHARDS) { sreq.actualShards = rb.shards; } sreq.responses = new ArrayList<>(sreq.actualShards.length); // presume we'll get a response from each shard we send to // TODO: map from shard to address[] for (String shard : sreq.actualShards) { ModifiableSolrParams params = new ModifiableSolrParams(sreq.params); params.remove(ShardParams.SHARDS); // not a top-level request params.set(CommonParams.DISTRIB, "false"); // not a top-level request params.remove("indent"); params.remove(CommonParams.HEADER_ECHO_PARAMS); params.set(ShardParams.IS_SHARD, true); // a sub (shard) request params.set(ShardParams.SHARDS_PURPOSE, sreq.purpose); params.set(ShardParams.SHARD_URL, shard); // so the shard knows what was asked if (rb.requestInfo != null) { // we could try and detect when this is needed, but it could be tricky params.set("NOW", Long.toString(rb.requestInfo.getNOW().getTime())); } String shardQt = params.get(ShardParams.SHARDS_QT); if (shardQt != null) { params.set(CommonParams.QT, shardQt); } else { // for distributed queries that don't include shards.qt, use the original path // as the default but operators need to update their luceneMatchVersion to enable // this behavior since it did not work this way prior to 5.1 if (req.getCore().getSolrConfig().luceneMatchVersion.onOrAfter(Version.LUCENE_5_1_0)) { String reqPath = (String) req.getContext().get(PATH); if (!"/select".equals(reqPath)) { params.set(CommonParams.QT, reqPath); } // else if path is /select, then the qt gets passed thru if set } else { // this is the pre-5.1 behavior, which translates to sending the shard request to /select params.remove(CommonParams.QT); } } shardHandler1.submit(sreq, shard, params); } } // now wait for replies, but if anyone puts more requests on // the outgoing queue, send them out immediately (by exiting // this loop) boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false); while (rb.outgoing.size() == 0) { ShardResponse srsp = tolerant ? shardHandler1.takeCompletedIncludingErrors(): shardHandler1.takeCompletedOrError(); if (srsp == null) break; // no more requests to wait for // Was there an exception? if (srsp.getException() != null) { // If things are not tolerant, abort everything and rethrow if(!tolerant) { shardHandler1.cancelAll(); if (srsp.getException() instanceof SolrException) { throw (SolrException)srsp.getException(); } else { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException()); } } else { if(rsp.getResponseHeader().get("partialResults") == null) { rsp.getResponseHeader().add("partialResults", Boolean.TRUE); } } } rb.finished.add(srsp.getShardRequest()); // let the components see the responses to the request for(SearchComponent c : components) { c.handleResponses(rb, srsp.getShardRequest()); } } } for(SearchComponent c : components) { c.finishStage(rb); } // we are done when the next stage is MAX_VALUE } while (nextStage != Integer.MAX_VALUE); } // SOLR-5550: still provide shards.info if requested even for a short circuited distrib request if(!rb.isDistrib && req.getParams().getBool(ShardParams.SHARDS_INFO, false) && rb.shortCircuitedURL != null) { NamedList<Object> shardInfo = new SimpleOrderedMap<Object>(); SimpleOrderedMap<Object> nl = new SimpleOrderedMap<Object>(); if (rsp.getException() != null) { Throwable cause = rsp.getException(); if (cause instanceof SolrServerException) { cause = ((SolrServerException)cause).getRootCause(); } else { if (cause.getCause() != null) { cause = cause.getCause(); } } nl.add("error", cause.toString() ); StringWriter trace = new StringWriter(); cause.printStackTrace(new PrintWriter(trace)); nl.add("trace", trace.toString() ); } else { nl.add("numFound", rb.getResults().docList.matches()); nl.add("maxScore", rb.getResults().docList.maxScore()); } nl.add("shardAddress", rb.shortCircuitedURL); nl.add("time", req.getRequestTimer().getTime()); // elapsed time of this request so far int pos = rb.shortCircuitedURL.indexOf("://"); String shardInfoName = pos != -1 ? rb.shortCircuitedURL.substring(pos+3) : rb.shortCircuitedURL; shardInfo.add(shardInfoName, nl); rsp.getValues().add(ShardParams.SHARDS_INFO,shardInfo); } }
第七步:來看一下 QueryComponent 的prepare 和 process
prepare 最重要的 就是根據defType(url傳遞或者solrconfig設定,預設是lucene),獲取對應的Parser和Query
@Override public void prepare(ResponseBuilder rb) throws IOException { SolrQueryRequest req = rb.req; SolrParams params = req.getParams(); if (!params.getBool(COMPONENT_NAME, true)) { return; } SolrQueryResponse rsp = rb.rsp; // Set field flags ReturnFields returnFields = new SolrReturnFields( req );//獲取指定的返回欄位,沒單獨指定的話這裡為null rsp.setReturnFields( returnFields ); int flags = 0; if (returnFields.wantsScore()) { flags |= SolrIndexSearcher.GET_SCORES; } rb.setFieldFlags( flags ); String defType = params.get(QueryParsing.DEFTYPE, QParserPlugin.DEFAULT_QTYPE); // get it from the response builder to give a different component a chance // to set it. String queryString = rb.getQueryString(); if (queryString == null) { // this is the normal way it's set. queryString = params.get( CommonParams.Q ); rb.setQueryString(queryString); } try { QParser parser = QParser.getParser(rb.getQueryString(), defType, req); Query q = parser.getQuery(); if (q == null) { // normalize a null query to a query that matches nothing q = new MatchNoDocsQuery(); } rb.setQuery( q ); String rankQueryString = rb.req.getParams().get(CommonParams.RQ); if(rankQueryString != null) { QParser rqparser = QParser.getParser(rankQueryString, defType, req); Query rq = rqparser.getQuery(); if(rq instanceof RankQuery) { RankQuery rankQuery = (RankQuery)rq; rb.setRankQuery(rankQuery); MergeStrategy mergeStrategy = rankQuery.getMergeStrategy(); if(mergeStrategy != null) { rb.addMergeStrategy(mergeStrategy); if(mergeStrategy.handlesMergeFields()) { rb.mergeFieldHandler = mergeStrategy; } } } else { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,"rq parameter must be a RankQuery"); } } rb.setSortSpec( parser.getSort(true) ); rb.setQparser(parser); final String cursorStr = rb.req.getParams().get(CursorMarkParams.CURSOR_MARK_PARAM); if (null != cursorStr) { final CursorMark cursorMark = new CursorMark(rb.req.getSchema(), rb.getSortSpec()); cursorMark.parseSerializedTotem(cursorStr); rb.setCursorMark(cursorMark); } String[] fqs = req.getParams().getParams(CommonParams.FQ); if (fqs!=null && fqs.length!=0) { List<Query> filters = rb.getFilters(); // if filters already exists, make a copy instead of modifying the original filters = filters == null ? new ArrayList<Query>(fqs.length) : new ArrayList<>(filters); for (String fq : fqs) { if (fq != null && fq.trim().length()!=0) { QParser fqp = QParser.getParser(fq, null, req); filters.add(fqp.getQuery()); } } // only set the filters if they are not empty otherwise // fq=&someotherParam= will trigger all docs filter for every request // if filter cache is disabled if (!filters.isEmpty()) { rb.setFilters( filters ); } } } catch (SyntaxError e) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e); } if (params.getBool(GroupParams.GROUP, false)) { prepareGrouping(rb); } else { //Validate only in case of non-grouping search. if(rb.getSortSpec().getCount() < 0) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'rows' parameter cannot be negative"); } } //Input validation. if (rb.getQueryCommand().getOffset() < 0) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'start' parameter cannot be negative"); } }
裡面獲取Parser單獨看了一下
QParserPlugin qplug = req.getCore().getQueryPlugin(parserName); QParser parser = qplug.createParser(qstr, localParams, req.getParams(), req);//如果要修改評分演算法,這裡可能會用到(馬後炮了)
最長的一個方法(感覺...),這裡東西太多可以自己細看,也可以直接擼最關鍵的一個方法 SolrIndexSearcher 類的search方法開始查詢資料了
/** * Actually run the query */ @Override public void process(ResponseBuilder rb) throws IOException { LOG.debug("process: {}", rb.req.getParams()); SolrQueryRequest req = rb.req; SolrParams params = req.getParams(); if (!params.getBool(COMPONENT_NAME, true)) { return; } SolrIndexSearcher searcher = req.getSearcher(); StatsCache statsCache = req.getCore().getStatsCache(); int purpose = params.getInt(ShardParams.SHARDS_PURPOSE, ShardRequest.PURPOSE_GET_TOP_IDS); if ((purpose & ShardRequest.PURPOSE_GET_TERM_STATS) != 0) { statsCache.returnLocalStats(rb, searcher); return; } // check if we need to update the local copy of global dfs if ((purpose & ShardRequest.PURPOSE_SET_TERM_STATS) != 0) { // retrieve from request and update local cache statsCache.receiveGlobalStats(req); } SolrQueryResponse rsp = rb.rsp; IndexSchema schema = searcher.getSchema(); // Optional: This could also be implemented by the top-level searcher sending // a filter that lists the ids... that would be transparent to // the request handler, but would be more expensive (and would preserve score // too if desired). String ids = params.get(ShardParams.IDS); if (ids != null) { SchemaField idField = schema.getUniqueKeyField(); List<String> idArr = StrUtils.splitSmart(ids, ",", true); int[] luceneIds = new int[idArr.size()]; int docs = 0; for (int i=0; i<idArr.size(); i++) { int id = searcher.getFirstMatch( new Term(idField.getName(), idField.getType().toInternal(idArr.get(i)))); if (id >= 0) luceneIds[docs++] = id; } DocListAndSet res = new DocListAndSet(); res.docList = new DocSlice(0, docs, luceneIds, null, docs, 0); if (rb.isNeedDocSet()) { // TODO: create a cache for this! List<Query> queries = new ArrayList<>(); queries.add(rb.getQuery()); List<Query> filters = rb.getFilters(); if (filters != null) queries.addAll(filters); res.docSet = searcher.getDocSet(queries); } rb.setResults(res); ResultContext ctx = new ResultContext(); ctx.docs = rb.getResults().docList; ctx.query = null; // anything? rsp.add("response", ctx); return; } // -1 as flag if not set. long timeAllowed = params.getLong(CommonParams.TIME_ALLOWED, -1L); if (null != rb.getCursorMark() && 0 < timeAllowed) { // fundamentally incompatible throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Can not search using both " + CursorMarkParams.CURSOR_MARK_PARAM + " and " + CommonParams.TIME_ALLOWED); } SolrIndexSearcher.QueryCommand cmd = rb.getQueryCommand(); cmd.setTimeAllowed(timeAllowed); req.getContext().put(SolrIndexSearcher.STATS_SOURCE, statsCache.get(req)); SolrIndexSearcher.QueryResult result = new SolrIndexSearcher.QueryResult(); // // grouping / field collapsing // GroupingSpecification groupingSpec = rb.getGroupingSpec(); if (groupingSpec != null) { try { boolean needScores = (cmd.getFlags() & SolrIndexSearcher.GET_SCORES) != 0; if (params.getBool(GroupParams.GROUP_DISTRIBUTED_FIRST, false)) { CommandHandler.Builder topsGroupsActionBuilder = new CommandHandler.Builder() .setQueryCommand(cmd) .setNeedDocSet(false) // Order matters here .setIncludeHitCount(true) .setSearcher(searcher); for (String field : groupingSpec.getFields()) { topsGroupsActionBuilder.addCommandField(new SearchGroupsFieldCommand.Builder() .setField(schema.getField(field)) .setGroupSort(groupingSpec.getGroupSort()) .setTopNGroups(cmd.getOffset() + cmd.getLen()) .setIncludeGroupCount(groupingSpec.isIncludeGroupCount()) .build() ); } CommandHandler commandHandler = topsGroupsActionBuilder.build(); commandHandler.execute(); SearchGroupsResultTransformer serializer = new SearchGroupsResultTransformer(searcher); rsp.add("firstPhase", commandHandler.processResult(result, serializer)); rsp.add("totalHitCount", commandHandler.getTotalHitCount()); rb.setResult(result); return; } else if (params.getBool(GroupParams.GROUP_DISTRIBUTED_SECOND, false)) { CommandHandler.Builder secondPhaseBuilder = new CommandHandler.Builder() .setQueryCommand(cmd) .setTruncateGroups(groupingSpec.isTruncateGroups() && groupingSpec.getFields().length > 0) .setSearcher(searcher); for (String field : groupingSpec.getFields()) { SchemaField schemaField = schema.getField(field); String[] topGroupsParam = params.getParams(GroupParams.GROUP_DISTRIBUTED_TOPGROUPS_PREFIX + field); if (topGroupsParam == null) { topGroupsParam = new String[0]; } List<SearchGroup<BytesRef>> topGroups = new ArrayList<>(topGroupsParam.length); for (String topGroup : topGroupsParam) { SearchGroup<BytesRef> searchGroup = new SearchGroup<>(); if (!topGroup.equals(TopGroupsShardRequestFactory.GROUP_NULL_VALUE)) { searchGroup.groupValue = new BytesRef(schemaField.getType().readableToIndexed(topGroup)); } topGroups.add(searchGroup); } secondPhaseBuilder.addCommandField( new TopGroupsFieldCommand.Builder() .setField(schemaField) .setGroupSort(groupingSpec.getGroupSort()) .setSortWithinGroup(groupingSpec.getSortWithinGroup()) .setFirstPhaseGroups(topGroups) .setMaxDocPerGroup(groupingSpec.getGroupOffset() + groupingSpec.getGroupLimit()) .setNeedScores(needScores) .setNeedMaxScore(needScores) .build() ); } for (String query : groupingSpec.getQueries()) { secondPhaseBuilder.addCommandField(new QueryCommand.Builder() .setDocsToCollect(groupingSpec.getOffset() + groupingSpec.getLimit()) .setSort(groupingSpec.getGroupSort()) .setQuery(query, rb.req) .setDocSet(searcher) .build() ); } CommandHandler commandHandler = secondPhaseBuilder.build(); commandHandler.execute(); TopGroupsResultTransformer serializer = new TopGroupsResultTransformer(rb); rsp.add("secondPhase", commandHandler.processResult(result, serializer)); rb.setResult(result); return; } int maxDocsPercentageToCache = params.getInt(GroupParams.GROUP_CACHE_PERCENTAGE, 0); boolean cacheSecondPassSearch = maxDocsPercentageToCache >= 1 && maxDocsPercentageToCache <= 100; Grouping.TotalCount defaultTotalCount = groupingSpec.isIncludeGroupCount() ? Grouping.TotalCount.grouped : Grouping.TotalCount.ungrouped; int limitDefault = cmd.getLen(); // this is normally from "rows" Grouping grouping = new Grouping(searcher, result, cmd, cacheSecondPassSearch, maxDocsPercentageToCache, groupingSpec.isMain()); grouping.setSort(groupingSpec.getGroupSort()) .setGroupSort(groupingSpec.getSortWithinGroup()) .setDefaultFormat(groupingSpec.getResponseFormat()) .setLimitDefault(limitDefault) .setDefaultTotalCount(defaultTotalCount) .setDocsPerGroupDefault(groupingSpec.getGroupLimit()) .setGroupOffsetDefault(groupingSpec.getGroupOffset()) .setGetGroupedDocSet(groupingSpec.isTruncateGroups()); if (groupingSpec.getFields() != null) { for (String field : groupingSpec.getFields()) { grouping.addFieldCommand(field, rb.req); } } if (groupingSpec.getFunctions() != null) { for (String groupByStr : groupingSpec.getFunctions()) { grouping.addFunctionCommand(groupByStr, rb.req); } } if (groupingSpec.getQueries() != null) { for (String groupByStr : groupingSpec.getQueries()) { grouping.addQueryCommand(groupByStr, rb.req); } } if (rb.doHighlights || rb.isDebug() || params.getBool(MoreLikeThisParams.MLT, false)) { // we need a single list of the returned docs cmd.setFlags(SolrIndexSearcher.GET_DOCLIST); } grouping.execute(); if (grouping.isSignalCacheWarning()) { rsp.add( "cacheWarning", String.format(Locale.ROOT, "Cache limit of %d percent relative to maxdoc has exceeded. Please increase cache size or disable caching.", maxDocsPercentageToCache) ); } rb.setResult(result); if (grouping.mainResult != null) { ResultContext ctx = new ResultContext(); ctx.docs = grouping.mainResult; ctx.query = null; // TODO? add the query? rsp.add("response", ctx); rsp.getToLog().add("hits", grouping.mainResult.matches()); } else if (!grouping.getCommands().isEmpty()) { // Can never be empty since grouping.execute() checks for this. rsp.add("grouped", result.groupedResults); rsp.getToLog().add("hits", grouping.getCommands().get(0).getMatches()); } return; } catch (SyntaxError e) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e); } } // normal search result searcher.search(result, cmd); //重點 rb.setResult(result); ResultContext ctx = new ResultContext(); ctx.docs = rb.getResults().docList; ctx.query = rb.getQuery(); rsp.add("response", ctx); rsp.getToLog().add("hits", rb.getResults().docList.matches()); if ( ! rb.req.getParams().getBool(ShardParams.IS_SHARD,false) ) { if (null != rb.getNextCursorMark()) { rb.rsp.add(CursorMarkParams.CURSOR_MARK_NEXT, rb.getNextCursorMark().getSerializedTotem()); } } if(rb.mergeFieldHandler != null) { rb.mergeFieldHandler.handleMergeFields(rb, searcher); } else { doFieldSortValues(rb, searcher); } doPrefetch(rb); }
第八步:SolrIndexSearcher 的 search方法,來回跳..最後getDocListC 走快取,沒有資料 繼續呼叫getDocListNC
/** * getDocList version that uses+populates query and filter caches. * In the event of a timeout, the cache is not populated. */ private void getDocListC(QueryResult qr, QueryCommand cmd) throws IOException { DocListAndSet out = new DocListAndSet(); qr.setDocListAndSet(out); QueryResultKey key=null; int maxDocRequested = cmd.getOffset() + cmd.getLen(); // check for overflow, and check for # docs in index if (maxDocRequested < 0 || maxDocRequested > maxDoc()) maxDocRequested = maxDoc(); int supersetMaxDoc= maxDocRequested; DocList superset = null; int flags = cmd.getFlags(); Query q = cmd.getQuery(); if (q instanceof ExtendedQuery) { ExtendedQuery eq = (ExtendedQuery)q; if (!eq.getCache()) { flags |= (NO_CHECK_QCACHE | NO_SET_QCACHE | NO_CHECK_FILTERCACHE); } } // we can try and look up the complete query in the cache. // we can't do that if filter!=null though (we don't want to // do hashCode() and equals() for a big DocSet). if (queryResultCache != null && cmd.getFilter()==null && (flags & (NO_CHECK_QCACHE|NO_SET_QCACHE)) != ((NO_CHECK_QCACHE|NO_SET_QCACHE))) {//是否走快取 // all of the current flags can be reused during warming, // so set all of them on the cache key. key = new QueryResultKey(q, cmd.getFilterList(), cmd.getSort(), flags);//封裝快取key if ((flags & NO_CHECK_QCACHE)==0) { superset = queryResultCache.get(key);//優先從快取獲取 if (superset != null) { // check that the cache entry has scores recorded if we need them if ((flags & GET_SCORES)==0 || superset.hasScores()) { // NOTE: subset() returns null if the DocList has fewer docs than // requested out.docList = superset.subset(cmd.getOffset(),cmd.getLen()); } } if (out.docList != null) { // found the docList in the cache... now check if we need the docset too. // OPT: possible future optimization - if the doclist contains all the matches, // use it to make the docset instead of rerunning the query. if (out.docSet==null && ((flags & GET_DOCSET)!=0) ) { if (cmd.getFilterList()==null) { out.docSet = getDocSet(cmd.getQuery()); } else { List<Query> newList = new ArrayList<>(cmd.getFilterList().size()+1); newList.add(cmd.getQuery()); newList.addAll(cmd.getFilterList()); out.docSet = getDocSet(newList); } } return; } } // If we are going to generate the result, bump up to the // next resultWindowSize for better caching. if ((flags & NO_SET_QCACHE) == 0) { // handle 0 special case as well as avoid idiv in the common case. if (maxDocRequested < queryResultWindowSize) { supersetMaxDoc=queryResultWindowSize; } else { supersetMaxDoc = ((maxDocRequested -1)/queryResultWindowSize + 1)*queryResultWindowSize; if (supersetMaxDoc < 0) supersetMaxDoc=maxDocRequested; } } else { key = null; // we won't be caching the result } } cmd.setSupersetMaxDoc(supersetMaxDoc); // OK, so now we need to generate an answer. // One way to do that would be to check if we have an unordered list // of results for the base query. If so, we can apply the filters and then // sort by the resulting set. This can only be used if: // - the sort doesn't contain score // - we don't want score returned. // check if we should try and use the filter cache boolean useFilterCache=false; if ((flags & (GET_SCORES|NO_CHECK_FILTERCACHE))==0 && useFilterForSortedQuery && cmd.getSort() != null && filterCache != null) { useFilterCache=true; SortField[] sfields = cmd.getSort().getSort(); for (SortField sf : sfields) { if (sf.getType() == SortField.Type.SCORE) { useFilterCache=false; break; } } } if (useFilterCache) { // now actually use the filter cache. // for large filters that match few documents, this may be // slower than simply re-executing the query. if (out.docSet == null) { out.docSet = getDocSet(cmd.getQuery(),cmd.getFilter()); DocSet bigFilt = getDocSet(cmd.getFilterList()); if (bigFilt != null) out.docSet = out.docSet.intersection(bigFilt); } // todo: there could be a sortDocSet that could take a list of // the filters instead of anding them first... // perhaps there should be a multi-docset-iterator sortDocSet(qr, cmd); } else { // do it the normal way... if ((flags & GET_DOCSET)!=0) { // this currently conflates returning the docset for the base query vs // the base query and all filters. DocSet qDocSet = getDocListAndSetNC(qr,cmd); // cache the docSet matching the query w/o filtering if (qDocSet!=null && filterCache!=null && !qr.isPartialResults()) filterCache.put(cmd.getQuery(),qDocSet); } else { getDocListNC(qr,cmd); } assert null != out.docList : "docList is null"; } if (null == cmd.getCursorMark()) { // Kludge... // we can't use DocSlice.subset, even though it should be an identity op // because it gets confused by situations where there are lots of matches, but // less docs in the slice then were requested, (due to the cursor) // so we have to short circuit the call. // None of which is really a problem since we can't use caching with // cursors anyway, but it still looks weird to have to special case this // behavior based on this condition - hence the long explanation. superset = out.docList; out.docList = superset.subset(cmd.getOffset(),cmd.getLen()); } else { // sanity check our cursor assumptions assert null == superset : "cursor: superset isn't null"; assert 0 == cmd.getOffset() : "cursor: command offset mismatch"; assert 0 == out.docList.offset() : "cursor: docList offset mismatch"; assert cmd.getLen() >= supersetMaxDoc : "cursor: superset len mismatch: " + cmd.getLen() + " vs " + supersetMaxDoc; } // lastly, put the superset in the cache if the size is less than or equal // to queryResultMaxDocsCached if (key != null && superset.size() <= queryResultMaxDocsCached && !qr.isPartialResults()) { queryResultCache.put(key, superset); } }
private void getDocListNC(QueryResult qr,QueryCommand cmd) throws IOException { int len = cmd.getSupersetMaxDoc(); int last = len; if (last < 0 || last > maxDoc()) last=maxDoc(); final int lastDocRequested = last; int nDocsReturned; int totalHits; float maxScore; int[] ids; float[] scores; boolean needScores = (cmd.getFlags() & GET_SCORES) != 0; Query query = QueryUtils.makeQueryable(cmd.getQuery()); ProcessedFilter pf = getProcessedFilter(cmd.getFilter(), cmd.getFilterList()); if (pf.filter != null) { query = new FilteredQuery(query, pf.filter); } // handle zero case... if (lastDocRequested<=0) { final float[] topscore = new float[] { Float.NEGATIVE_INFINITY }; final int[] numHits = new int[1]; Collector collector; if (!needScores) {//是否需要返回評分值 collector = new SimpleCollector () { @Override public void collect(int doc) { numHits[0]++; } @Override public boolean needsScores() { return false; } }; } else { collector = new SimpleCollector() { Scorer scorer; @Override public void setScorer(Scorer scorer) { this.scorer = scorer; } @Override public void collect(int doc) throws IOException { numHits[0]++; float score = scorer.score(); if (score > topscore[0]) topscore[0]=score; } @Override public boolean needsScores() { return true; } }; } buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter); nDocsReturned=0; ids = new int[nDocsReturned]; scores = new float[nDocsReturned]; totalHits = numHits[0]; maxScore = totalHits>0 ? topscore[0] : 0.0f; // no docs on this page, so cursor doesn't change qr.setNextCursorMark(cmd.getCursorMark()); } else { final TopDocsCollector topCollector = buildTopDocsCollector(len, cmd); Collector collector = topCollector; buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter); totalHits = topCollector.getTotalHits(); TopDocs topDocs = topCollector.topDocs(0, len); populateNextCursorMarkFromTopDocs(qr, cmd, topDocs); maxScore = totalHits>0 ? topDocs.getMaxScore() : 0.0f; nDocsReturned = topDocs.scoreDocs.length; ids = new int[nDocsReturned]; scores = (cmd.getFlags()&GET_SCORES)!=0 ? new float[nDocsReturned] : null; for (int i=0; i<nDocsReturned; i++) { ScoreDoc scoreDoc = topDocs.scoreDocs[i]; ids[i] = scoreDoc.doc; if (scores != null) scores[i] = scoreDoc.score; } } int sliceLen = Math.min(lastDocRequested,nDocsReturned); if (sliceLen < 0) sliceLen=0; qr.setDocList(new DocSlice(0,sliceLen,ids,scores,totalHits,maxScore)); }
buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter); 最後呼叫了 IndexSearcher的search方法
第九步:看IndexSearcher
/** * Lower-level search API. * * <p> * {@link LeafCollector#collect(int)} is called for every document. <br> * * <p> * NOTE: this method executes the searches on all given leaves exclusively. * To search across all the searchers leaves use {@link #leafContexts}. * * @param leaves * the searchers leaves to execute the searches on * @param weight * to match documents * @param collector * to receive hits * @throws BooleanQuery.TooManyClauses If a query would exceed * {@link BooleanQuery#getMaxClauseCount()} clauses. */ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException { // TODO: should we make this // threaded...? the Collector could be sync'd? // always use single thread: for (LeafReaderContext ctx : leaves) { // search each subreader final LeafCollector leafCollector; try { leafCollector = collector.getLeafCollector(ctx); } catch (CollectionTerminatedException e) { // there is no doc of interest in this reader context // continue with the following leaf continue; } BulkScorer scorer = weight.bulkScorer(ctx);//獲取匹配的doc if (scorer != null) { try { scorer.score(leafCollector, ctx.reader().getLiveDocs());//排序計算分值 } catch (CollectionTerminatedException e) { // collection was terminated prematurely // continue with the following leaf } } } }
第十步:計算分值跳了好多比較複雜,沒有細讀,最後跳到了
CustomScoreQuery類的
@Override public float score() throws IOException { // lazily advance to current doc. int doc = docID(); if (doc > valSrcDocID) { for (Scorer valSrcScorer : valSrcScorers) { valSrcScorer.advance(doc); } valSrcDocID = doc; } // TODO: this thing technically takes any Query, so what about when subs don't match? for (int i = 0; i < valSrcScorers.length; i++) { vScores[i] = valSrcScorers[i].score(); } return qWeight * provider.customScore(subQueryScorer.docID(), subQueryScorer.score(), vScores); }
接下來就是一層層往回返了,最後通過第四步的回寫返回介面資料。
總結主要流程:
SolrDispatchFilter->HttpSolrCall->SolrCore->RequestHandlerBase->SearchHandler->QueryComponent->QParser->SolrIndexSearcher->BulkScorer->Weight->TopScoreDocCollector->CustomScoreQuery
附棧的堆疊圖