1. 程式人生 > >ElasticSearch6.5.0【Java客戶端之TransportClient】

ElasticSearch6.5.0【Java客戶端之TransportClient】

說明

TransportClient:網上流傳最多的客戶端,目前最新版本

Java REST Client:官方推薦的客戶端,

官方:我們要在Elasticsearch 7.0的版本中不贊成使用TransportClient,在Elasticsearch 8.0的版本中完全移除TransportClient。轉而使用Java REST Client

照這個勢頭看,現在都6.5了,8.0還會遠嘛。使用客戶端要注意版本對應的問題,最好版本完全一致,實在不行也要保障主版本一致,比如5.X6.X

TransportClient

 連線

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.net.InetAddress; import java.net.UnknownHostException; public class TransportClientFactory {
private TransportClientFactory(){} private static class Inner{ private static final TransportClientFactory instance = new TransportClientFactory(); } public static TransportClientFactory getInstance(){ return Inner.instance; } public TransportClient getClient() throws
UnknownHostException { Settings settings = Settings.builder() .put("cluster.name", "my-elasticsearch") // 預設的叢集名稱是elasticsearch,如果不是要指定 .build(); return new PreBuiltTransportClient(settings) //.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9301)) .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300)); } }

 Index

1. 新增文件

    public static TransportClient addDoc1() throws IOException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        // 構建物件
        XContentBuilder builder = jsonBuilder()
                .startObject()
                .field("brand", "ANTA")
                .field("color", "red")
                .field("model", "S")
                .field("postDate", new Date())
                .endObject();
        // 轉成JSON格式
        String json = Strings.toString(builder);
        /**
         * 引數1:index
         * 引數2:type
         * 引數3:id
         */
        IndexRequestBuilder indexRequestBuilder = client.prepareIndex("clothes", "young", "1");
        IndexResponse response = indexRequestBuilder.setSource(json, XContentType.JSON).get();
        System.out.println("Index:" + response.getIndex() + "," +
                "Type:" + response.getType() + "," +
                "ID:" + response.getId() + "," +
                "Version:" + response.getVersion() + "," +
                "Status:" + response.status().name()
        );
        return client;
    }

執行結果:

Index:clothes,Type:young,ID:1,Version:1,Status:CREATED

 

2. 新增文件還可以不指定id,這個時候預設生成一個唯一id

    public static TransportClient addDoc2() throws IOException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        // 構建物件
        XContentBuilder builder = jsonBuilder()
                .startObject()
                .field("brand", "YISHION")
                .field("color", "Blue")
                .field("model", "S")
                .field("postDate", new Date())
                .endObject();
        // 轉成JSON格式
        String json = Strings.toString(builder);
        /**
         * 引數1:index
         * 引數2:type
         */
        IndexRequestBuilder indexRequestBuilder = client.prepareIndex("clothes", "young");
        IndexResponse response = indexRequestBuilder.setSource(json, XContentType.JSON).get();
        System.out.println("Index:" + response.getIndex() + "," +
                "Type:" + response.getType() + "," +
                "ID:" + response.getId() + "," +
                "Version:" + response.getVersion() + "," +
                "Status:" + response.status().name()
        );
        return client;
    }

執行結果:

Index:clothes,Type:young,ID:J5uAoWcBb9TcvgEh2GJ3,Version:1,Status:CREATED

 

3. 根據id獲取文件

    /**
     * 根據id獲取
     * @throws IOException
     */
    public static TransportClient getDoc() throws IOException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        // prepareGet 引數分別為index、type、id
        GetResponse response = client.prepareGet("clothes", "young", "1").get();
        String id = response.getId();
        String index = response.getIndex();
        Map<String, DocumentField> fields = response.getFields();
        // 返回的source,也就是資料來源
        Map<String, Object> source = response.getSource();
        System.out.println("ID:" + id + ",Index:" + index);
        Set<String> fieldKeys = fields.keySet();
        for (String s : fieldKeys){
            DocumentField documentField = fields.get(s);
            String name = documentField.getName();
            List<Object> values = documentField.getValues();
            System.out.println(name + ":" + values.toString());
        }
        System.out.println("==========");
        Set<String> sourceKeys = source.keySet();
        for(String s : sourceKeys){
            Object o = source.get(s);
            System.out.println(s + ":" + o);
        }
        return client;
    }

執行結果:

ID:1,Index:clothes
==========
color:red
postDate:2018-12-12T08:18:21.509Z
model:S
brand:ANTA

 

4. 根據id刪除,我們刪除那個預設生成的那個id

    /**
     * 根據id刪除
     * @throws IOException
     */
    public static TransportClient delDoc() throws IOException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        DeleteResponse response = client.prepareDelete("clothes", "young", "J5uAoWcBb9TcvgEh2GJ3").get();
        String id = response.getId();
        String index = response.getIndex();
        String status = response.status().name();
        System.out.println("ID:" + id + ",Index:" + index + ",Status:" + status);
        return client;
    }

執行結果:

ID:J5uAoWcBb9TcvgEh2GJ3,Index:clothes,Status:OK

 

5. 根據條件刪除

    /**
     * 根據條件刪除
     * @throws IOException
     */
    public static TransportClient delDocByQuery() throws IOException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                .filter(QueryBuilders.matchQuery("brand", "ANTA")) // 屬性-值
                .source("clothes")  // index
                .get();
        long deleted = response.getDeleted();
        System.out.println(deleted);
        return client;
    }

執行結果:

1

 

這裡第一次遇到QueryBuilders,這個東西很常用,回頭我介紹這個類。

根據條件刪除還可以指定type

    DeleteByQueryRequestBuilder builder = DeleteByQueryAction.INSTANCE.newRequestBuilder(client);
    builder.filter(QueryBuilders.matchQuery("brand", "ANTA")) // 屬性-值
            .source("clothes")  // index
            .source().setTypes("young"); // type
    BulkByScrollResponse response = builder.get();

 

6. 批量插入(這個bulk不僅可以批量建立,也可以更新或者刪除)

    /**
     * 批量插入
     * @throws IOException
     */
    public static TransportClient bulkDoc() throws IOException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        BulkRequestBuilder bulk = client.prepareBulk();
        bulk.add(client.prepareIndex("car", "model", "1")
                .setSource(jsonBuilder()
                        .startObject()
                        .field("name", "法拉利488")
                        .field("price", "315.50-418.80萬")
                        .field("postDate", new Date())
                        .endObject()
                )
        );
        bulk.add(client.prepareIndex("car", "model", "2")
                .setSource(jsonBuilder()
                        .startObject()
                        .field("name", "法拉利LaFerrari")
                        .field("price", "2250.00萬")
                        .field("postDate", new Date())
                        .endObject()
                )
        );
        bulk.add(client.prepareIndex("car", "model", "3")
                .setSource(jsonBuilder()
                        .startObject()
                        .field("name", "法拉利GTC4Lusso")
                        .field("price", "322.80-485.80萬")
                        .field("postDate", new Date())
                        .endObject()
                )
        );
        BulkResponse responses = bulk.get();
        String status = responses.status().name();
        System.out.println(status);
        return client;
    }

執行結果:

OK

 

7. 獲取多個結果

    /**
     * 批量獲取
     * @throws IOException
     */
    public static TransportClient multiGetDoc() throws IOException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
                // 可以指定多個index,多個id
                .add("clothes", "young", "1", "2")
                .add("car", "model", "1","2","3")
                .get();

        for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
            GetResponse response = itemResponse.getResponse();
            if (response.isExists()) {
                String json = response.getSourceAsString();
                System.out.println(json);
            }
        }
        return client;
    }

執行結果:由於之前clothes裡面沒資料了,所以只顯示了下面三條資料

{"name":"法拉利488","price":"315.50-418.80萬","postDate":"2018-12-12T08:38:09.107Z"}
{"name":"法拉利LaFerrari","price":"2250.00萬","postDate":"2018-12-12T08:38:09.129Z"}
{"name":"法拉利GTC4Lusso","price":"322.80-485.80萬","postDate":"2018-12-12T08:38:09.129Z"}

 

8. 更新

    /**
     * 更新方式一:通過UpdateRequest
     * @throws IOException
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static TransportClient updateDoc1() throws IOException, ExecutionException, InterruptedException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index("car"); // 指定index
        updateRequest.type("model");// 指定type
        updateRequest.id("3");       // 指定id
        // 更新內容
        updateRequest.doc(jsonBuilder()
                .startObject()
                .field("name", "Aventador")
                .field("price", "630.00-755.94萬")
                .field("postDate", new Date())
                .field("extra", "Extra Data")   // 不存在的會自動新增
                .endObject());
        UpdateResponse updateResponse = client.update(updateRequest).get();
        System.out.println(updateResponse.status().name());
        return client;
    }

執行結果:

OK

 在Kibana上檢視結果:GET /car/model/3

客戶端有兩種請求方式,一種是***Request(比如UpdateRequest ),另一種是prepare***(比如prepareUpdate),我更喜歡用prepare***

 

    /**
     * 更新方式二:通過prepareUpdate
     * @throws IOException
     */
    public static TransportClient updateDoc2() throws IOException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        client.prepareUpdate("car", "model", "1")
                .setDoc(jsonBuilder()
                        .startObject()
                        .field("name", "法拉利812 Superfast")
                        .field("price", "498.80萬")
                        .field("postDate", new Date())
                        .endObject()
                )
                .get();
        return client;
    }

 

 9. upset更新

    /**
     * 文件存在則更新doc,不存在則新增upsert
     * @throws IOException
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static TransportClient upsert() throws IOException, ExecutionException, InterruptedException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        IndexRequest indexRequest = new IndexRequest("clothes", "young", "3")
                .source(jsonBuilder()
                        .startObject()
                        .field("brand", "Pierre Cardin")
                        .field("color", "Black")
                        .field("model", "L")
                        .field("postDate", new Date())
                        .endObject());
        UpdateRequest updateRequest = new UpdateRequest("clothes", "young", "3")
                .doc(jsonBuilder()
                        .startObject()
                        .field("model", "XL")
                        .endObject())
                .upsert(indexRequest);
        UpdateResponse response = client.update(updateRequest).get();
        System.out.println(response.status().name());
        return client;
    }

什麼意思呢,如果文件存在,則只更新model欄位,相反會新增IndexRequest裡面的內容。

第一次執行:(文件不存在) 

CREATED

 

 GET /clothes/young/3

 

第二次執行:

OK

 

檢視Kibana

10. bulkProcessor 另外一個批量工具

 基本的配置

.setBulkActions(10000)  // 每10000個request,bulk一次
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) // 每5M的資料重新整理一次
.setFlushInterval(TimeValue.timeValueSeconds(5))    // 每5s重新整理一次,而不管有多少資料量
.setConcurrentRequests(0)   // 設定併發請求的數量。值為0意味著只允許執行一個請求。值為1意味著在積累新的批量請求時允許執行1個併發請求。
.setBackoffPolicy(  // 設定一個自定義的重試策略,該策略最初將等待100毫秒,按指數增長,最多重試3次。當一個或多個批量項請求失敗時,如果出現EsRejectedExecutionException異常,將嘗試重試,該異常表明用於處理請求的計算資源太少。要禁用backoff,請傳遞BackoffPolicy.noBackoff()。
        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))

 

測試

    /**
     * 造資料
     * @throws IOException
     */
    public static TransportClient scrollSearchPreData() throws IOException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                client,
                new BulkProcessor.Listener() {
                    @Override
                    public void beforeBulk(long executionId, BulkRequest request) {
                        // bulk 執行之前
                        System.out.println("beforeBulk-----" + request.getDescription());
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {
                        // bulk 執行之後
                        System.out.println("afterBulk------" + request.getDescription() + ",是否有錯誤:" + response.hasFailures());
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {
                        //bulk 失敗
                        System.out.println("報錯-----" + request.getDescription() + "," + failure.getMessage());
                    }
                })
                .setBulkActions(100)  // 每100個request,bulk一次
                .setConcurrentRequests(0)   // 設定併發請求的數量。值為0意味著只允許執行一個請求。值為1意味著在積累新的批量請求時允許執行1個併發請求。
                .build();
        Random random = new Random();
        for (int i = 1; i <= 1000; i++){
            bulkProcessor.add(new IndexRequest("book", "elasticsearch", i+"").source(jsonBuilder()
                    .startObject()
                    .field("name", "book_" + i)
                    .field("price", random.nextDouble()*1000)
                    .field("postDate", new Date())
                    .endObject()));
        }
        bulkProcessor.flush();
        bulkProcessor.close();
        return client;
    }

 

執行結果:1000條資料,bulk10次

beforeBulk-----requests[100], indices[book]
afterBulk------requests[100], indices[book],是否有錯誤:false
beforeBulk-----requests[100], indices[book]
afterBulk------requests[100], indices[book],是否有錯誤:false
beforeBulk-----requests[100], indices[book]
afterBulk------requests[100], indices[book],是否有錯誤:false
beforeBulk-----requests[100], indices[book]
afterBulk------requests[100], indices[book],是否有錯誤:false
beforeBulk-----requests[100], indices[book]
afterBulk------requests[100], indices[book],是否有錯誤:false
beforeBulk-----requests[100], indices[book]
afterBulk------requests[100], indices[book],是否有錯誤:false
beforeBulk-----requests[100], indices[book]
afterBulk------requests[100], indices[book],是否有錯誤:false
beforeBulk-----requests[100], indices[book]
afterBulk------requests[100], indices[book],是否有錯誤:false
beforeBulk-----requests[100], indices[book]
afterBulk------requests[100], indices[book],是否有錯誤:false
beforeBulk-----requests[100], indices[book]
afterBulk------requests[100], indices[book],是否有錯誤:false

 

11. scroll(讓資料都滾出來)

    /**
     * 當搜尋請求返回一個結果的“頁面”時,滾動API可以用於從一個搜尋請求檢索大量的結果(甚至所有結果)
     * 其方式與在傳統資料庫中使用遊標非常類似。滾動不是為了實時的使用者請求,而是為了處理大量的資料
     * @throws UnknownHostException
     */
    public static TransportClient scrollSearch() throws UnknownHostException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        SearchResponse response = client.prepareSearch("book")
                .addSort("price", SortOrder.ASC)
                .setScroll(new TimeValue(30000))
                .setSize(1000).get();   // 每次滾出1000條就返回
        do {
            System.out.println("========Begin=======");
            for (SearchHit hit : response.getHits().getHits()) {
                System.out.println(hit.getSourceAsString());
            }
            System.out.println("========End=======");
            response = client.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(30000)).execute().actionGet();
        } while(response.getHits().getHits().length != 0);
        return client;
    }

 

執行結果:

========Begin=======
{"name":"book_233","price":0.7903630819869889,"postDate":"2018-12-12T09:27:32.629Z"}
{"name":"book_46","price":1.9862330698061648,"postDate":"2018-12-12T09:27:30.722Z"}
{"name":"book_18","price":2.8024592316934216,"postDate":"2018-12-12T09:27:30.721Z"}
{"name":"book_512","price":3.5739663933835875,"postDate":"2018-12-12T09:27:33.275Z"}
{"name":"book_275","price":5.449351054677254,"postDate":"2018-12-12T09:27:32.632Z"}
{"name":"book_112","price":8.035476335226166,"postDate":"2018-12-12T09:27:32.424Z"}
...此處省略
========End=======

 

12. 根據查詢更新

    /**
     * 當版本匹配時,updateByQuery更新文件並增加版本號。
     * 所有更新和查詢失敗都會導致updateByQuery中止。這些故障可從BulkByScrollResponse#getBulkFailures方法中獲得。
     * 任何成功的更新都會保留並不會回滾。當第一個失敗導致中止時,響應包含由失敗的批量請求生成的所有失敗。
     * 當文件在快照時間和索引請求過程時間之間發生更改時,就會發生版本衝突
     * 為了防止版本衝突導致updateByQuery中止,設定abortOnVersionConflict(false)。
     * ScriptType.INLINE:在大量查詢中指定內聯指令碼並動態編譯。它們將基於指令碼的lang和程式碼進行快取。
     * ScriptType.STORED:儲存的指令碼作為{@link org.elasticsearch.cluster.ClusterState}的一部分儲存基於使用者請求。它們將在查詢中首次使用時被快取。
     * https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-update-by-query.html
     * https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
     * @throws UnknownHostException
     */
    public static TransportClient updateByQuery() throws UnknownHostException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
        updateByQuery.source("book")
                .size(100)  // 嘗試獲取的最大文件數
                .filter(QueryBuilders.termsQuery("name","book_233", "book_46", "book_18", "book_512"))  // 注意term,value要變成小寫!!
                // 以下指令碼:保留id=781的,刪除id=316的,其它的價格都變為79
                .script(new Script(
                        ScriptType.INLINE,Script.DEFAULT_SCRIPT_LANG,
                        "if (ctx._source['id'] == 781) {"
                                + "  ctx.op='noop'" // ctx.op='noop'  不做處理
                                + "} else if (ctx._source['id'] == '316') {"
                                + "  ctx.op='delete'"   // ctx.op='delete'刪除
                                + "} else {"
                                + "ctx._source['price'] = 79}",
                         Collections.emptyMap()))
                .abortOnVersionConflict(false); // 版本衝突策略:abortOnVersionConflict 版本衝突時不終止
//                .source().setTypes("young") // 指定type
//                .setSize(10)   // 返回搜尋的命中數
//                .addSort("postDate", SortOrder.DESC);
        BulkByScrollResponse response = updateByQuery.get();
        System.out.println("Deleted:" + response.getDeleted() + ",Created:" +
                response.getCreated() + ",Updated:" + response.getUpdated() + ",Noops:" + response.getNoops());

        List<BulkItemResponse.Failure> failures = response.getBulkFailures();
        System.out.println(failures.size());
        // 如果目標值是Cat,更新內容也是Cat,則不會去更新
        return client;
    }

執行結果:(這個term查詢有點坑,value必須為小寫,並且不能帶-,我之前生成的格式為book-100,結果查詢不出來。。。)

Deleted:0,Created:0,Updated:4,Noops:0
0

檢視資料:

GET /book/elasticsearch/_mget
{
    "ids" : ["233", "46", "18", "512"]
}

 

結果:

{
  "docs" : [
    {
      "_index" : "book",
      "_type" : "elasticsearch",
      "_id" : "233",
      "_version" : 2,
      "found" : true,
      "_source" : {
        "price" : 79,
        "name" : "book_233",
        "postDate" : "2018-12-12T09:27:32.629Z"
      }
    },
    {
      "_index" : "book",
      "_type" : "elasticsearch",
      "_id" : "46",
      "_version" : 2,
      "found" : true,
      "_source" : {
        "price" : 79,
        "name" : "book_46",
        "postDate" : "2018-12-12T09:27:30.722Z"
      }
    },
    {
      "_index" : "book",
      "_type" : "elasticsearch",
      "_id" : "18",
      "_version" : 2,
      "found" : true,
      "_source" : {
        "price" : 79,
        "name" : "book_18",
        "postDate" : "2018-12-12T09:27:30.721Z"
      }
    },
    {
      "_index" : "book",
      "_type" : "elasticsearch",
      "_id" : "512",
      "_version" : 2,
      "found" : true,
      "_source" : {
        "price" : 79,
        "name" : "book_512",
        "postDate" : "2018-12-12T09:27:33.275Z"
      }
    }
  ]
}

13. 簡單查詢

    /**
     * 簡單查詢【萬用字元查詢,篩選價格範圍,設定返回數量,排序】
     * @throws UnknownHostException
     */
    public static TransportClient search() throws UnknownHostException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        SearchResponse response = client.prepareSearch("book")    // index,可以多個
                .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
                .setQuery(QueryBuilders.wildcardQuery("name", "*book_1*"))          // Query
                .setPostFilter(QueryBuilders.rangeQuery("price").from(800).to(900))     // Filter
                .setFrom(0).setSize(100).setExplain(true).addSort("postDate", SortOrder.DESC)
                .get();
        response.getHits().forEach(e ->{
            System.out.println(e.getSourceAsString());
        });
        return client;
    }

 

執行結果:

{"name":"book_1000","price":811.3812414198577,"postDate":"2018-12-12T09:27:34.095Z"}
{"name":"book_194","price":828.6484294585816,"postDate":"2018-12-12T09:27:32.433Z"}
{"name":"book_171","price":839.1475764183831,"postDate":"2018-12-12T09:27:32.432Z"}
{"name":"book_170","price":869.7835076374234,"postDate":"2018-12-12T09:27:32.431Z"}
{"name":"book_161","price":838.5131747806441,"postDate":"2018-12-12T09:27:32.429Z"}
{"name":"book_153","price":805.041724108352,"postDate":"2018-12-12T09:27:32.429Z"}
{"name":"book_154","price":893.982844708382,"postDate":"2018-12-12T09:27:32.429Z"}
{"name":"book_105","price":883.039302643907,"postDate":"2018-12-12T09:27:32.424Z"}
{"name":"book_19","price":877.0523728410054,"postDate":"2018-12-12T09:27:30.721Z"}

 

14. 多個查詢MultiSearch

    /**
     * 多個查詢
     */
    public static TransportClient multiSearch() throws UnknownHostException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        // 第一個查詢
        SearchRequestBuilder srb1 = client
                .prepareSearch("book")
                .setQuery(QueryBuilders.queryStringQuery("book_9*").field("name"))
                .setFrom(0)    // 開始位置
                .setSize(10);   // 設定返回的最大條數
        // 第二個查詢
        SearchRequestBuilder srb2 = client
                .prepareSearch("car")
                .setQuery(QueryBuilders.queryStringQuery("*r*"))
                .setSize(10);
        // 組合
        MultiSearchResponse sr = client.prepareMultiSearch()
                .add(srb1)
                .add(srb2)
                .get();

        // You will get all individual responses from MultiSearchResponse#getResponses()
        long nbHits = 0;
        for (MultiSearchResponse.Item item : sr.getResponses()) {
            SearchResponse response = item.getResponse();
            response.getHits().forEach(e ->{
                System.out.println(e.getSourceAsString());
            });
            long hits = response.getHits().getTotalHits();
            System.out.println("Hits:" + hits);
            nbHits += hits;
        }
        System.out.println("Total:" + nbHits);
        return client;
    }

 

執行結果:

{"name":"book_92","price":176.35847694096162,"postDate":"2018-12-12T09:27:30.724Z"}
{"name":"book_98","price":611.4318589503413,"postDate":"2018-12-12T09:27:30.724Z"}
{"name":"book_99","price":214.4653626273969,"postDate":"2018-12-12T09:27:30.724Z"}
{"name":"book_900","price":973.3382073380857,"postDate":"2018-12-12T09:27:33.892Z"}
{"name":"book_915","price":35.30856326485343,"postDate":"2018-12-12T09:27:34.091Z"}
{"name":"book_922","price":299.58144612743064,"postDate":"2018-12-12T09:27:34.091Z"}
{"name":"book_930","price":591.6598815227311,"postDate":"2018-12-12T09:27:34.092Z"}
{"name":"book_933","price":287.18727780940037,"postDate":"2018-12-12T09:27:34.092Z"}
{"name":"book_935","price":693.6036227965725,"postDate":"2018-12-12T09:27:34.092Z"}
{"name":"book_942","price":701.4129722487066,"postDate":"2018-12-12T09:27:34.092Z"}
Hits:111
{"name":"法拉利LaFerrari","price":"2250.00萬","postDate":"2018-12-12T08:38:09.129Z"}
{"name":"Aventador","price":"630.00-755.94萬","postDate":"2018-12-12T08:49:01.736Z","extra":"Extra Data"}
Hits:2
Total:113

 聚合

15. 聚合查詢

    /**
     * 聚合查詢
     * 搜尋是查詢某些具體的文件.然而聚合就是對這些搜尋到的文件進行統計
     * https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html
     * https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-aggs.html
     * https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-search-aggs.html
     * 可以在聚合中定義子聚合
     * @return
     * @throws UnknownHostException
     */
    public static TransportClient aggregationsSearch() throws UnknownHostException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        SearchResponse sr = client.prepareSearch("book")
                .setQuery(QueryBuilders.matchAllQuery())
                .addAggregation(
                        AggregationBuilders.stats("agg1").field("price")
                )
                .addAggregation(
                        AggregationBuilders.dateHistogram("agg2")
                                .field("postDate")
                                .dateHistogramInterval(DateHistogramInterval.YEAR)
                )
                .get(); // Short version of execute().actionGet().

        // Get your facet results
        Aggregation agg1 = sr.getAggregations().get("agg1");
        System.out.println(agg1.getClass());    // class org.elasticsearch.search.aggregations.metrics.stats.InternalStats
        Aggregation agg2 = sr.getAggregations().get("agg2");
        System.out.println(agg2.getClass());    // class org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram
        return client;
    }

 

15.1 metrics聚合

    /**
     * metrics聚合
     * 主要為了統計資訊
     * org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles
     * org.elasticsearch.search.aggregations.metrics.percentiles.PercentileRanks
     * org.elasticsearch.search.aggregations.metrics.cardinality.Cardinality
     * 地理位置聚合:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/_metrics_aggregations.html#java-aggs-metrics-geobounds
     * https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/_metrics_aggregations.html#java-aggs-metrics-tophits
     * https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/_metrics_aggregations.html#java-aggs-metrics-scripted-metric
     * @return
     * @throws UnknownHostException
     */
    public static TransportClient metricsAggregationsSearch() throws UnknownHostException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        SearchResponse sr = client.prepareSearch("book")
                .setQuery(QueryBuilders.matchAllQuery())
                .addAggregation(
                        AggregationBuilders.min("agg1").field("price")
                )
                .addAggregation(
                        AggregationBuilders.max("agg2").field("price")
                )
                .addAggregation(
                        AggregationBuilders.sum("agg3").field("price")
                )
                .addAggregation(
                        AggregationBuilders.avg("agg4").field("price")
                )
                .addAggregation(
                        AggregationBuilders.count("agg5").field("price")
                )
                .addAggregation(
                        AggregationBuilders.stats("agg6").field("price")
                )
                .get();
        Min agg1 = sr.getAggregations().get("agg1");
        Max agg2 = sr.getAggregations().get("agg2");
        Sum agg3 = sr.getAggregations().get("agg3");
        Avg agg4 = sr.getAggregations().get("agg4");
        ValueCount agg5 = sr.getAggregations().get("agg5");
        Stats agg6 = sr.getAggregations().get("agg6");
        System.out.println("Min:" + agg1.getValue() + ",Max:" + agg2.getValue() + ",Sum:" + agg3.getValue() + ",Avg:" + agg4.getValue() + ",Count:" + agg5.getValue() +
                ",Stats:(" + agg6.getMin() + "," + agg6.getMax() + "," + agg6.getSum() + "," + agg6.getAvg() + "," + agg6.getCount() + ")");
        return client;
    }

 

執行結果:

Min:5.449350833892822,Max:999.3211669921875,Sum:502966.58267736435,Avg:502.96658267736433,Count:1000,Stats:(5.449350833892822,999.3211669921875,502966.58267736435,502.96658267736433,1000)

 

15.2地理位置聚合(計算座標的左上/右下邊界值)

    /**
     * 準備地理位置資訊
     * @return
     * @throws IOException
     */
    public static TransportClient geoSearchPreData() throws IOException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        // 建立索引
        CreateIndexResponse indexResponse = client.admin().indices().prepareCreate("area")
                .setSettings(Settings.builder()
                        .put("index.number_of_shards", 1)   // 分片
                        .put("index.number_of_replicas", 1) // 副本
                )
                .addMapping("hospital", "message", "type=text", "location", "type=geo_point")
                .get();
        System.out.println("Index:" + indexResponse.index() + ",ACK:" + indexResponse.isAcknowledged());
        BulkProcessor bulkProcessor = BulkProcessor.builder(
                client,
                new BulkProcessor.Listener() {
                    @Override
                    public void beforeBulk(long executionId, BulkRequest request) {
                        // bulk 執行之前
                        System.out.println("beforeBulk-----" + request.getDescription());
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {
                        // bulk 執行之後
                        System.out.println("afterBulk------" + request.getDescription() + ",hasFailures:" + response.hasFailures());
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {
                        //bulk 失敗
                        System.out.println("報錯-----" + request.getDescription() + "," + failure.getMessage());
                    }
                })
                .setBulkActions(100)  // 每100個request,bulk一次
                .setConcurrentRequests(0)   // 設定併發請求的數量。值為0意味著只允許執行一個請求。值為1意味著在積累新的批量請求時允許執行1個併發請求。
                .build();
        Random random = new Random();
        for (int i = 1; i <= 200; i++){
            String lo = new DecimalFormat("#.############").format(random.nextDouble() * 100);
            String la = new DecimalFormat("#.############").format(random.nextDouble() * 100);
            bulkProcessor.add(new IndexRequest("area", "hospital", i+"").source(jsonBuilder()
                    .startObject()
                    .field("name", "hospital-" + i)
                    .field("location", lo + "," + la)
                    .endObject()));
        }
        bulkProcessor.flush();
        bulkProcessor.close();
        return client;
    }

    /**
     * 地理資訊查詢
     * @return
     * @throws UnknownHostException
     */
    public static TransportClient geoAggregation() throws UnknownHostException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        SearchResponse sr = client.prepareSearch("area")
                .setQuery(QueryBuilders.matchQuery("name", "hospital-1"))
                .addAggregation(
                        AggregationBuilders.geoBounds("agg").field("location").wrapLongitude(true)
                )
                .get();
        GeoBounds agg = sr.getAggregations().get("agg");
        GeoPoint left = agg.topLeft();
        GeoPoint right = agg.bottomRight();
        System.out.println(left + " | " + right);
        return client;
    }

執行結果:

89.9911705031991, 0.03342803567647934 | 0.049703302793204784, 99.9249867349863

 

15.3桶聚合

    /**
     * 桶聚合,我這裡只列舉了部分
     * https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/_bucket_aggregations.html
     * @return
     * @throws UnknownHostException
     */
    public static TransportClient bucketAggregationsSearch() throws UnknownHostException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        SearchResponse sr = client.prepareSearch()
                .setQuery(QueryBuilders.matchAllQuery())
//                .addAggregation(AggregationBuilders
//                        .global("agg0")
//                        .subAggregation(AggregationBuilders.terms("sub_agg").field("name"))
//                )
                .addAggregation(AggregationBuilders
                        .filter("agg1", QueryBuilders.termQuery("name", "book_199")))
                .addAggregation(AggregationBuilders
                        .filters("agg2",
                                new FiltersAggregator.KeyedFilter("key1", QueryBuilders.termQuery("name", "book_1")),
                                new FiltersAggregator.KeyedFilter("key2", QueryBuilders.termQuery("name", "book_52"))
                                ))
                .get();

//        Global agg0 = sr.getAggregations().get("agg0");
//        System.out.println("GlobalCount:" + agg0.getDocCount());

        Filter agg1 = sr.getAggregations().get("agg1");
        System.out.println("FilterCount:" + agg1.getDocCount());

        Filters agg2 = sr.getAggregations().get("agg2");
        for (Filters.Bucket entry : agg2.getBuckets()) {
            String key = entry.getKeyAsString();            // bucket key
            long docCount = entry.getDocCount();            // Doc count
            System.out.println("key [" + key + "], doc_count ["+ docCount +"]");
        }
        return client;
    }

 

執行結果:Global會遮蔽其它的Agg

FilterCount:1
key [key1], doc_count [1]
key [key2], doc_count [1]

查詢DSL

16. Query DSL

16.1 MatchAll,最簡單的查詢,它會匹配所有文件

client.prepareSearch().setQuery(QueryBuilders.matchAllQuery());

 

16.2 全文檢索【高階全文查詢通常用於在全文欄位(如電子郵件正文)上執行全文查詢,在執行之前有分析的過程

16.2.1 Match Query(全文查詢的標準查詢,包括模糊匹配和短語或鄰近查詢)

    public static TransportClient queryDSL() throws UnknownHostException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        SearchResponse searchResponse = client.prepareSearch()
                .setIndices("book")
                .setQuery(QueryBuilders
                        .matchQuery("name", "book_1")
                        .fuzziness(Fuzziness.AUTO)  // 模糊查詢
                        .zeroTermsQuery(MatchQuery.ZeroTermsQuery.ALL)  // 與MatchAll等價,匹配所有文件。預設none,不匹配任何文件
                ).get();
        searchResponse.getHits().forEach(e -> {
            System.out.println(e.getSourceAsString());
        });
        System.out.println("命中:" + searchResponse.getHits().totalHits);
        return client;
    }

 

執行結果:(為什麼會命中250條呢?這是因為模糊查詢,如果你註釋掉模糊查詢,就只會查到一條)

{"name":"book_1","price":541.5683324629698,"postDate":"2018-12-12T09:27:30.695Z"}
{"name":"book_2","price":859.0268161692424,"postDate":"2018-12-12T09:27:30.720Z"}
{"name":"book_4","price":666.0331749730802,"postDate":"2018-12-12T09:27:30.720Z"}
{"name":"book_6","price":797.3826369337273,"postDate":"2018-12-12T09:27:30.720Z"}
{"name":"book_15","price":764.0761667524818,"postDate":"2018-12-12T09:27:30.721Z"}
{"name":"book_51","price":969.2863955131567,"postDate":"2018-12-12T09:27:30.722Z"}
{"name":"book_3","price":467.29468328850055,"postDate":"2018-12-12T09:27:30.720Z"}
{"name":"book_11","price":365.2274741512962,"postDate":"2018-12-12T09:27:30.720Z"}
{"name":"book_17","price":498.8900836459158,"postDate":"2018-12-12T09:27:30.721Z"}
{"name":"book_31","price":377.2822748558652,"postDate":"2018-12-12T09:27:30.721Z"}
命中:250

 

16.2.2 Multi Match Query(標準查詢的多欄位版本)

    public static TransportClient queryDSL() throws UnknownHostException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        SearchResponse searchResponse = client.prepareSearch()
                // 關鍵字Aventador,匹配多個欄位*ame、brand。欄位名稱可以使用萬用字元
                .setQuery(QueryBuilders.multiMatchQuery("Aventador", "*ame","brand"))
                .get();
        searchResponse.getHits().forEach(e -> {
            System.out.println(e.getSourceAsString());
        });
        System.out.println("命中:" + searchResponse.getHits().totalHits);
        return client;
    }

 

執行結果:

{"name":"Aventador","price":"630.00-755.94萬","postDate":"2018-12-12T08:49:01.736Z","extra":"Extra Data"}
命中:1

 

16.2.3 Common Terms Query(一個更專業的查詢,偏好不常見的關鍵字)

...待補充

16.2.4 Query String Query(解析輸入並圍繞操作符拆分文字,每個文字部分都是獨立分析的)

    public static TransportClient queryDSL() throws UnknownHostException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        SearchResponse searchResponse = client.prepareSearch()
                // 關鍵字和欄位【均可以】可以使用萬用字元(?匹配一個字元,*匹配0個或多個字元,AND,OR)等等
                // 有一些您不希望作為操作符的必須轉義處理:+ - = && || > < ! ( ) { } [ ] ^ " ~ * ? : \ /
                //.setQuery(QueryBuilders.queryStringQuery("(book_111) OR (book_999)")) // or
                //.setQuery(QueryBuilders.queryStringQuery("(book_111) AND (book_999)")) // AND
                //.setQuery(QueryBuilders.queryStringQuery("(book_111) && (book_999)")) // AND與&&等價
                //.setQuery(QueryBuilders.queryStringQuery("(book_111) & (book_999)")) // &不會短路計算
                //.setQuery(QueryBuilders.queryStringQuery("book_1?1").field("name")) // ? 並且指定欄位
                //.setQuery(QueryBuilders.queryStringQuery("name:book_1?1 OR color:B*"))  // 在查詢裡指定欄位
                //.setQuery(QueryBuilders.queryStringQuery("name:book_1?1 | color:B*"))
                //.setQuery(QueryBuilders.queryStringQuery("name:book_1?1 || color:B*"))  // OR與||等價
                //.setQuery(QueryBuilders.queryStringQuery("price:[990 TO *]"))  // 範圍查詢
                // 預設情況下操作符都是可選的,有兩個特殊的->首選操作符是:+(這一項必須存在)和-(這一項必須不存在)
                .setQuery(QueryBuilders.queryStringQuery("price:[990 TO *] -book*"))    // 不顯示book*的資料
                .setSize(20)    // 返回數量
                .get();
        searchResponse.getHits().forEach(e -> {
            System.out.println(e.getSourceAsString());
        });
        System.out.println("命中:" + searchResponse.getHits().totalHits);
        return client;
    }

 

執行結果:

{"name":"法拉利LaFerrari","price":"2250.00萬","postDate":"2018-12-12T08:38:09.129Z"}
{"name":"法拉利488","price":"315.50-418.80萬","postDate":"2018-12-12T08:38:09.107Z"}
{"name":"Aventador","price":"630.00-755.94萬","postDate":"2018-12-12T08:49:01.736Z","extra":"Extra Data"}
命中:3

 

16.2.5 Simple Query String Query(查詢永遠不會丟擲異常,並丟棄查詢的無效部分)

    public static TransportClient queryDSL() throws UnknownHostException {
        TransportClient client = TransportClientFactory.getInstance().getClient();
        SearchResponse searchResponse = client.prepareSearch()
                .setIndices("book")
                // + 表示與操作
                // | 表示或操作
                // - 表示否定
                // * 在關