1. 程式人生 > >[ElasticSearch]Java API 之 滾動搜尋(Scroll API)

[ElasticSearch]Java API 之 滾動搜尋(Scroll API)

一般搜尋請求都是返回一"頁"資料,無論資料量多大都一起返回給使用者,Scroll API可以允許我們檢索大量資料(甚至全部資料)。Scroll API允許我們做一個初始階段搜尋並且持續批量從Elasticsearch里拉取結果直到沒有結果剩下。這有點像傳統資料庫裡的cursors(遊標)。

Scroll API的建立並不是為了實時的使用者響應,而是為了處理大量的資料(Scrolling is not intended for real time user requests, but rather for processing large amounts of data)。從 scroll 請求返回的結果只是反映了 search 發生那一時刻的索引狀態,就像一個快照(The results that are returned from a scroll request reflect the state of the index at the time that the initial search request was made, like a snapshot in time)。後續的對文件的改動(索引、更新或者刪除)都只會影響後面的搜尋請求。
1. 普通請求

假設我們想一次返回大量資料,下面程式碼中一次請求58000條資料:

       /**
        *  普通搜尋
        * @param client
        */
       public static void search(Client client) {
           String index = "simple-index";
           String type = "simple-type";
           // 搜尋條件
           SearchRequestBuilder searchRequestBuilder = client.prepareSearch();
           searchRequestBuilder.setIndices(index);
           searchRequestBuilder.setTypes(type);
           searchRequestBuilder.setSize(58000);
           // 執行
           SearchResponse searchResponse = searchRequestBuilder.get();
           // 搜尋結果
           SearchHit[] searchHits = searchResponse.getHits().getHits();
           for (SearchHit searchHit : searchHits) {
               String source = searchHit.getSource().toString();
               logger.info("--------- searchByScroll source {}", source);
           } // for
       }

執行結果:

    Caused by: QueryPhaseExecutionException[Result window is too large, from + size must be less than or equal to: [10000] but was [58000]. See the scroll api for a more efficient way to request large data sets. This limit can be set by changing the [index.max_result_window] index level parameter.]
    at org.elasticsearch.search.internal.DefaultSearchContext.preProcess(DefaultSearchContext.java:212)
    at org.elasticsearch.search.query.QueryPhase.preProcess(QueryPhase.java:103)
    at org.elasticsearch.search.SearchService.createContext(SearchService.java:676)
    at org.elasticsearch.search.SearchService.createAndPutContext(SearchService.java:620)
    at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:371)
    at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryTransportHandler.messageReceived(SearchServiceTransportAction.java:368)
    at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryTransportHandler.messageReceived(SearchServiceTransportAction.java:365)
    at org.elasticsearch.transport.TransportRequestHandler.messageReceived(TransportRequestHandler.java:33)
    at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:75)
    at org.elasticsearch.transport.TransportService$4.doRun(TransportService.java:376)
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
    ... 3 more

從上面我們可以知道,搜尋請求一次請求最大量為[10000]。我們的請求量已經超標,因此報錯,異常資訊提示我們請求大資料量的情況下使用Scroll API。
2. 使用Scroll API 請求

為了使用 scroll,初始搜尋請求應該在查詢中指定 scroll 引數,告訴 Elasticsearch 需要保持搜尋的上下文環境多長時間(滾動時間)。

    searchRequestBuilder.setScroll(new TimeValue(60000));

下面程式碼中指定了查詢條件以及滾動屬性,如滾動的有效時長(使用setScroll()方法)。我們通過SearchResponse物件的getScrollId()方法獲取滾動ID。滾動ID會在下一次請求中使用。

       /**
        * 使用scroll進行搜尋
        * @param client
        */
       public static String searchByScroll(Client client) {
           String index = "simple-index";
           String type = "simple-type";
           // 搜尋條件
           SearchRequestBuilder searchRequestBuilder = client.prepareSearch();
           searchRequestBuilder.setIndices(index);
           searchRequestBuilder.setTypes(type);
           searchRequestBuilder.setScroll(new TimeValue(30000));
           // 執行
           SearchResponse searchResponse = searchRequestBuilder.get();
           String scrollId = searchResponse.getScrollId();
           logger.info("--------- searchByScroll scrollID {}", scrollId);
           SearchHit[] searchHits = searchResponse.getHits().getHits();
           for (SearchHit searchHit : searchHits) {
               String source = searchHit.getSource().toString();
               logger.info("--------- searchByScroll source {}", source);
           } // for
           return scrollId;
           
       }

使用上面的請求返回的結果中的滾動ID,這個 ID 可以傳遞給 scroll API 來檢索下一個批次的結果。這一次請求中不用新增索引和型別,這些都指定在了原始的 search 請求中。

每次返回下一個批次結果 直到沒有結果返回時停止 即hits陣列空時(Each call to the scroll API returns the next batch of results until there are no more results left to return, ie the hits array is empty)。

       /**
        *  通過滾動ID獲取文件
        * @param client
        * @param scrollId
        */
       public static void searchByScrollId(Client client, String scrollId){
           TimeValue timeValue = new TimeValue(30000);
           SearchScrollRequestBuilder searchScrollRequestBuilder;
           SearchResponse response;
           // 結果
           while (true) {
               logger.info("--------- searchByScroll scrollID {}", scrollId);
               searchScrollRequestBuilder = client.prepareSearchScroll(scrollId);
               // 重新設定滾動時間
               searchScrollRequestBuilder.setScroll(timeValue);
               // 請求
               response = searchScrollRequestBuilder.get();
               // 每次返回下一個批次結果 直到沒有結果返回時停止 即hits陣列空時
               if (response.getHits().getHits().length == 0) {
                   break;
               } // if
               // 這一批次結果
               SearchHit[] searchHits = response.getHits().getHits();
               for (SearchHit searchHit : searchHits) {
                   String source = searchHit.getSource().toString();
                   logger.info("--------- searchByScroll source {}", source);
               } // for
               // 只有最近的滾動ID才能被使用
               scrollId = response.getScrollId();
           } // while
       }

備註:

初始搜尋請求和每個後續滾動請求返回一個新的 滾動ID——只有最近的滾動ID才能被使用。(The initial search request and each subsequent scroll request returns a new_scroll_id — only the most recent _scroll_id should be used)  

我每次後續滾動請求返回的滾動ID都是相同的,所以對上面的備註,不是很懂,有明白的可以告知,謝謝。


如果超過滾動時間,繼續使用該滾動ID搜尋資料,則會報錯:

    Caused by: SearchContextMissingException[No search context found for id [2861]]
    at org.elasticsearch.search.SearchService.findContext(SearchService.java:613)
    at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:403)
    at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryScrollTransportHandler.messageReceived(SearchServiceTransportAction.java:384)
    at org.elasticsearch.search.action.SearchServiceTransportAction$SearchQueryScrollTransportHandler.messageReceived(SearchServiceTransportAction.java:381)
    at org.elasticsearch.transport.TransportRequestHandler.messageReceived(TransportRequestHandler.java:33)
    at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:75)
    at org.elasticsearch.transport.TransportService$4.doRun(TransportService.java:376)
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)


3. 清除滾動ID

雖然當滾動有效時間已過,搜尋上下文(Search Context)會自動被清除,但是一值保持滾動代價也是很大的,所以當我們不在使用滾動時要儘快使用Clear-Scroll API進行清除。

    /**
    * 清除滾動ID
    * @param client
    * @param scrollIdList
    * @return
    */
    public static boolean clearScroll(Client client, List<String> scrollIdList){
    ClearScrollRequestBuilder clearScrollRequestBuilder = client.prepareClearScroll();
    clearScrollRequestBuilder.setScrollIds(scrollIdList);
    ClearScrollResponse response = clearScrollRequestBuilder.get();
    return response.isSucceeded();
    }
    /**
    * 清除滾動ID
    * @param client
    * @param scrollId
    * @return
    */
    public static boolean clearScroll(Client client, String scrollId){
    ClearScrollRequestBuilder clearScrollRequestBuilder = client.prepareClearScroll();
    clearScrollRequestBuilder.addScrollId(scrollId);
    ClearScrollResponse response = clearScrollRequestBuilder.get();
    return response.isSucceeded();
    }

 

4. 參考:

https://www.elastic.co/guide/en/elasticsearch/reference/2.4/search-request-scroll.html

http://www.jianshu.com/p/14aa8b09c789
5. 說明

本程式碼基於ElasticSearch 2.4.1
---------------------
作者:SunnyYoona
來源:CSDN
原文:https://blog.csdn.net/sunnyyoona/article/details/52810397
版權宣告:本文為博主原創文章,轉載請附上博文連結!