1. 程式人生 > >elasticsearch(六)java 使用批量查詢multiGet介紹及使用

elasticsearch(六)java 使用批量查詢multiGet介紹及使用

BulkRequest是用來進行批量索引、更新、刪除操作的請求物件,前面已經介紹過。

本節介紹下用來進行批量查詢的操作: Mult-Get Request

1,首先建立一個主查詢請求物件:

MultiGetRequest request = new MultiGetRequest();

然後依次使用主請求物件的add方法,將子查詢物件加入到主查詢中

request.add(new MultiGetRequest.Item("index", "type", "another_id")); 

2,可以分別針對每一個子查詢進行如下設定:

1)查詢的文件內容不返回:.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE)

request.add(new MultiGetRequest.Item("posts", "doc", "2").fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));

2) 指定查詢哪些欄位內容 或 過濾掉哪些欄位:

String[] includes = new String[] {"user", "*r"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("posts2", "doc", "3").fetchSourceContext(fetchSourceContext));

3)分別指定查詢的路由分片和版本等:

request.add(new MultiGetRequest.Item("posts", "doc", "with_routing")
                    .routing("some_routing"));
request.add(new MultiGetRequest.Item("index", "type", "with_parent")
                    .parent("some_parent"));
request.add(new MultiGetRequest.Item("index", "type", "with_version")
                    .versionType(VersionType.EXTERNAL)
                    .version(10123L));

注:以上設定無法在主請求中設定

3,對主請求的設定

preference, realtime and refresh 需要在主請求裡設定,子請求中無法設定這些值
request.preference("some_preference");
// realtime的值預設為true
request.realtime(false);
request.refresh(true);

4,執行請求並獲取結果:

MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);

5,對結果的處理及說明:

1)可以指定處理某條查詢:

MultiGetItemResponse firstItem = response.getResponses()[0];

2)遍歷查詢的結果:

for(MultiGetItemResponse item: response.getResponses()) {
    String index = item.getIndex();
    String type = item.getType();
    String id = item.getId();
    System.out.println("第" + ++count + "條-》index:" + index + "; type:" + type + "; id:" + id);
    if(item.getFailure() != null) {
        Exception e = item.getFailure().getFailure();
        ElasticsearchException ee = (ElasticsearchException) e;
        if(ee.getMessage().contains("reason=no such index")) {
            System.out.println("查詢的文件庫不存在!");
        }
    }

    GetResponse getResponse = item.getResponse();

    if (getResponse.isExists()) {
        long version = getResponse.getVersion();
        String sourceAsString = getResponse.getSourceAsString();
        System.out.println("查詢的結果為:");
        System.out.println(sourceAsString);
        Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
        byte[] sourceAsBytes = getResponse.getSourceAsBytes();
    } else {
        System.out.println("沒有查詢到相應文件");
    }
}

特殊情況說明:

1)查詢的index索引庫不存在時,則返回結果的failure引數不為null,含報錯資訊,

GetResponse getResponse = item.getResponse() 得到的getResponse值為null

故為了後面資料丟擲空指標異常,要做相容處理:

if(item.getFailure() != null) {
   Exception e = item.getFailure().getFailure();
   ElasticsearchException ee = (ElasticsearchException) e;
   if(ee.getMessage().contains("reason=no such index")) {
      System.out.println("查詢的文件庫不存在!");
   }
}

2)如果文件型別type或id不存在,getResponse.isExists() 的結果是false

6,完整程式碼示例:

package com.example.elasticsearch.document;

import org.apache.http.HttpHost;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.util.Map;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: Weichang Zhong
 * @Date: 2018/11/8
 * @Time: 17:20
 * @Description:
 */
public class SynMultiGetRequest {
    public static void main(String[] args) {
        try (RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9200, "http")
                )
        )) {
            // 建立查詢父物件
            MultiGetRequest request = new MultiGetRequest();
            // 新增子查詢
            request.add(new MultiGetRequest.Item("posts", "doc", "1"));
            request.add(new MultiGetRequest.Item("posts", "doc2", "2").fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));
            String[] includes = new String[] {"user", "*r"};
            String[] excludes = Strings.EMPTY_ARRAY;
            FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
            request.add(new MultiGetRequest.Item("posts2", "doc", "3").fetchSourceContext(fetchSourceContext));
//            // user必須在map集合中
//            request.add(new MultiGetRequest.Item("posts", "doc", "4")
//                    .storedFields("user"));
//            MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
//            MultiGetItemResponse item = response.getResponses()[0];
//            // user必須在map集合中
//            String value = item.getResponse().getField("user").getValue();

            // 針對每個子請求分別設定,無法在主請求中設定
            // 指定去哪個分片上查詢,如何指定分片上沒有,不會再去其它分片查詢,如果不指定,則依次輪詢各個分片查詢
            request.add(new MultiGetRequest.Item("posts", "doc", "with_routing")
                    .routing("some_routing"));
            request.add(new MultiGetRequest.Item("index", "type", "with_parent")
                    .parent("some_parent"));
            request.add(new MultiGetRequest.Item("index", "type", "with_version")
                    .versionType(VersionType.EXTERNAL)
                    .version(10123L));
            // preference, realtime and refresh 需要在主請求裡設定,子請求中無法設定
            request.preference("some_preference");
            // realtime的值預設為true
            request.realtime(false);
            request.refresh(true);
            MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
            int count = 0;
            for(MultiGetItemResponse item: response.getResponses()) {
                String index = item.getIndex();
                String type = item.getType();
                String id = item.getId();
                System.out.println("第" + ++count + "條-》index:" + index + "; type:" + type + "; id:" + id);
                if(item.getFailure() != null) {
                    Exception e = item.getFailure().getFailure();
                    ElasticsearchException ee = (ElasticsearchException) e;
                    if(ee.getMessage().contains("reason=no such index")) {
                        System.out.println("查詢的文件庫不存在!");
                    }
                }

                GetResponse getResponse = item.getResponse();

                if (getResponse.isExists()) {
                    long version = getResponse.getVersion();
                    String sourceAsString = getResponse.getSourceAsString();
                    System.out.println("查詢的結果為:");
                    System.out.println(sourceAsString);
                    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
                    byte[] sourceAsBytes = getResponse.getSourceAsBytes();
                } else {
                    System.out.println("沒有查詢到相應文件");
                }
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}

另外:同時也可以查詢map集合中的物件,前提是對應的map集合中要存在相應的文件:

// user必須在map集合中
request.add(new MultiGetRequest.Item("posts", "doc", "4").storedFields("user"));
MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
MultiGetItemResponse item = response.getResponses()[0];
// user必須在map集合中
String value = item.getResponse().getField("user").getValue();