1. 程式人生 > >使用JestClient連線elasticsearch-5.x對資料進行分組聚合

使用JestClient連線elasticsearch-5.x對資料進行分組聚合

     原本資料存放在mysql中,專案需求是從mysql中查出來計算推送給前端;但是隨著資料量增大,我們的查詢語句也複雜,效能會明顯下降。所以就考慮乾脆存放到elasticsearch中,查詢計算都方便;於是去和公司專門負責es平臺服務的人對接,負責人說elasticsearch5.x在連線叢集方面對tcp支援不如http效能好,我沒研究過兩種方式的效能,所以不好下結論,但是人家推薦我使用http的方式,那我就打消使用TransportClient客戶端的念頭。因為不想使用httpclient自己封裝挺麻煩,於是在網上搜elasticsearch有哪些基於http的客戶端,這一搜發現有好多

    第一種:JestClient ,專案地址https://github.com/searchbox-io/Jest/tree/master/jest

    第二種:RestClient,es5.0以後出現的一種官方的基於rest的Java客戶端,

                  參考部落格http://blog.csdn.net/u010454030/article/details/77014654

    第三種:Flummi,開源專案,它儘可能的模仿TransportClient的api,使開發者可以很輕鬆的遷移已經存在的程式碼,另外他會提示所有的異常資訊,讓你定位錯誤更方便。專案地址https://github.com/otto-de/flummi,來欣賞一下它的使用方式,是不是和TransportClient很像呢,但是Flummi可是基於http協議的


Flummi flummi = new Flummi("http://elasticsearch.base.url:9200");

SearchResponse searchResponse = flummi
.prepareSearch("products")
        .setQuery(QueryBuilders.termQuery("color", "yellow").build())
        .execute();

System.out.println("Found "+ searchResponse.getHits().getTotalHits()+ " products"
); searchResponse.getHits() .stream().map(hit -> hit.getSource().get("name").getAsString()) .forEach(name -> System.out.println("Name: " + name));

    最後我使用的是JestClient,最新版JestClient使用方式如下


JestClientFactory factory = new JestClientFactory();

String connectionUrl = "http://127.0.0.1:9200";

factory.setHttpClientConfig(new HttpClientConfig
        .Builder(connectionUrl) //引數可以是叢集,請先定義一個list集合,將節點url分別新增到list
        .defaultCredentials("elastic","changeme") //如果使用了x-pack,就要新增使用者名稱和密碼
        .multiThreaded(true) //多執行緒模式
        .connTimeout(60000) //連線超時
        .readTimeout(60000) //由於是基於http,所以超時時間必不可少不然經常會遇到socket異常:read time out
        .build()); //更多引數請檢視api
JestClient client=factory.getObject();

    這樣就獲得了一個JestClient例項,接下來是業務部分,對於elasticsearch來說是查詢限定條件,以前用mysql查詢時sql語句為:


select name AS 'name',count(1) AS 'total_num',job AS 'job',round(avg(age),0) AS'avg_age',time AS 'time'
from employee
WHERE gender = #{gender,jdbcType=INTEGER}  AND  to_days(time) = to_days(date_sub(curdate(),interval 1 day))
GROUP BY name, job;

    要使用elasticsearch的api實現上面的sql效果,首先對於where條件很好處理,平時怎麼查就怎麼查,一些基本的queryBuilder限定條件,我們先實現where條件


SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

QueryBuilder queryBuilder = QueryBuilders.boolQuery()
        .must(QueryBuilders.rangeQuery(time) //對time欄位進行範圍限定           
            .gte("1510848000000").lt(System.currentTimeMillis())) 
//也可用from("1510568631869").to("1511166160231");
        .must(QueryBuilders.termQuery("gender", 1));

searchSourceBuilder.query(queryBuilder);
  

    對於group by 和avg平均函式就要使用elasticsearch的聚合了


AggregationBuilder aggregationBuilder = 
AggregationBuilders.terms("nameAgg").field("name.keyword").size(Integer.MAX_VALUE) //1
     .subAggregation(AggregationBuilders.terms("jobAgg").field("job.keyword").size(Integer.MAX_VALUE) //2
          .subAggregation(AggregationBuilders.avg("ageAgg").field("age")) //3
               .subAggregation(AggregationBuilders.count("totalNum").field("name.keyword"))); //4

searchSourceBuilder.aggregation(aggregationBuilder);

    以上聚合後面的註釋解釋:

    (1) 首先按照name分組,terms括號裡面是聚合名字,隨便起,field為聚合的欄位名;之所以加了.keyword是因為不加聚合的時候會報fielddata屬性沒有設定為true;

{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"Fielddata is disabled on text 
fields by default.Set fielddata=true on [name] in order to load fielddata in memory by uninverting the 
inverted index. Note that this can however use significant memory. Alternatively use a keyword field 
instead."}],"type":"search_phase_execution_exception","reason":"all shards failed","phase":"query",
"grouped":true,"failed_shards":[{"shard":0,"index":"school","node":"H7VIRoOwS8mws78T-0Ce-Q","reason":{
"type":"illegal_argument_exception","reason":"Fielddata is disabled on text fields by default. Set 
fielddata=true on [name] in order to load fielddata in memory by uninverting the inverted index.Note that
 this can however use significant memory. Alternatively use a keyword field instead."}}]},"status":400}

因為對映模板將string型別的欄位存進elasticsearch時,一個字串欄位有兩個型別,一個text型別,分詞型別;一個keyword型別,不分詞型別;所以加上.keyword就可以正常聚合了,對於es2.x版本有可能不分詞的型別為.raw;注意甄別。後面size引數預設為10,貌似是最多聚合10個,我肯定想要聚合全部資料,就填最大值

    (2) 緊接著在name分組的基礎上按job分組,屬於nameAgg聚合的子聚合,後面的都屬於前面的子聚合

    (3) 分組完,緊接著統計各組平均年齡,由於年齡屬於long型別,不用加.keyword,從這裡以後都要注意括號的位置,.subAggregation跟在誰的後面一定要搞清楚,搞混淆結果會不一樣

    (4) 實際上這一步不需要,因為elasticsearch在分組聚合完自動會計算當前分組下有多少doc_count

    如果普通聚合不能滿足需要,還有管道聚合,將前面聚合的結果輸出路徑作為當前聚合的輸入,一定要注意路徑為聚合的相對路徑,不是絕對路徑:


SumBucketPipelineAggregationBuilder pipelineAggregationBuilder = 
     PipelineAggregatorBuilders.sumBucket("countTotalNum","jobAgg>totalNum");//第二個引數為聚合路徑

//如果有需要管道聚合,可以在上面的分組上繼續.subAggregation(pipelineAggregationBuilder);


    接下來開始將查詢和聚合條件放入search中


String query = searchSourceBuilder.toString();

Search search = new Search.Builder(query).addIndex("school").addType("student").build(); 

SearchResult result = client.execute(search); 


    執行查詢時,還有一種非同步執行方式;;我之所以沒有使用非同步方式是因為在聚合的時候會報一種錯誤:{ "error" : "JsonGenerationException[Can not write a field name, expecting a value]"} ;這種錯誤雖然不影響聚合,但是如果執行非同步查詢的話,經常丟失結果,也不會走failed方法;而採用非非同步方式就不會丟失結果;因為有時我在completed方法和failed方法中打了斷點debug執行,都沒進入。也可能是測試有誤,不過你們用的時候多多注意就行,這個方法在併發執行的時候效果不錯。


client.executeAsync(search, new JestResultHandler<SearchResult>() {
    @Override
public void completed(SearchResult searchResult) {
        
    }

    @Override
public void failed(Exception e) {

    }
});

    接下來就是取結果,最主要是聚合怎麼取


//首先取最外層的聚合,拿到桶
List<TermsAggregation.Entry> nameAgg =
        result.getAggregations().getTermsAggregation("nameAgg").getBuckets();
//迴圈每一個桶,拿到裡面的聚合,再拿桶
for (TermsAggregation.Entry entry : nameAgg) {
    List<TermsAggregation.Entry> jobAgg = entry.getTermsAggregation("jobAgg").getBuckets();
    //迴圈每一個桶,拿到裡面的聚合,再拿桶
for (TermsAggregation.Entry jobEntry : jobAgg) {
        //取到每個分組裡的平均年齡
long avgAge = jobEntry.getAvgAggregation("ageAgg").getAvg(); 

        //其實這裡已經能獲取doc_count了,所以聚合計算總數那一步可以省略
long count = jobEntry.getCount();
        ........
        //其他操作
........
    }
    ..........
    //其他操作
..........
}

   下一篇介紹怎麼使用springboot整合JestClient

   參考部落格:https://stackoverflow.com/questions/10441499/java-http-client-for-elasticsearch

                    http://blog.csdn.net/xr568897472/article/details/73826255  (關鍵)

                    http://blog.csdn.net/it_lihongmin/article/details/78447001

                    https://stackoverflow.com/questions/38602135/bucket-script-java-api-in-elasticsearch

相關推薦

使用JestClient連線elasticsearch-5.x資料進行分組聚合

     原本資料存放在mysql中,專案需求是從mysql中查出來計算推送給前端;但是隨著資料量增大,我們的查詢語句也複雜,效能會明顯下降。所以就考慮乾脆存放到elasticsearch中,查詢計算都方便;於是去和公司專門負責es平臺服務的人對接,負責人說elastics

php7中使用mongoDB的聚合操作資料進行分組求和統計操作

本文將介紹mongoDB使用aggregate對資料分組,求和。給出shell命令列寫法,php7中的寫法,也將給出相同資料結構mysql命令列寫法。 mongoDB collection a_test 中資料: > db.a_test.f

資料學習[17]--Elasticsearch 5.x 欄位摺疊的使用[轉]

題目:Elasticsearch 5.x 欄位摺疊的使用 作者:medcl URL:https://elasticsearch.cn/article/132 在 Elasticsearch 5.x 有一個欄位摺疊(Field Collapsing,#22337)的功能非常

使用容器和Elasticsearch集群Twitter進行監控

docker rancher 集群 twitter 監控 介紹Elasticsearch是ELK(Elasticsearch/Logstash/Kibana)的基石。在這篇文章中,我們將使用Rancher Catalog來部署stack,並將它用於追蹤Twitter上的tag和brand。

(一)elasticsearch-5.x安裝與配置

head(一)平臺所需的環境OS:CentOS 7.x minimalelasticsearch :elasticsearch-5.4.0版本jdk: 1.8已上版本創建普通用戶:appuser最新的下載路徑地址為:https://www.elastic.co/downloads (二)配置操作系統的環境並

Elasticsearch 5.X 使用 Docker 運行使用 Head 插件

lock there ati 相對 face host true 對比 dock ES 5.X 版本後就不支持 elasticsearch-head 以插件方式來安裝了。 for Elasticsearch 5.x: site plugins are not suppo

spring boot 2.X 集成 Elasticsearch 5.x 實戰 增刪改查

springboot2.x Elasticsearch5.x 集成 實戰 增刪改查 其實這種博客網上一大片,為啥還要寫出來這篇博客?網上的例子都是基於elasticsearch2.x版本的,並不是5.x版本,而且還有好多是錯的,拿過來根本不能直接用來測試,還有就是spring-data沒有

CentOS7 下安裝 ElasticSearch 5.x 及填坑

用戶及用戶組 log4j rip 還需 -- 指定 png process serve ElasticSearch簡介 什麽是ElasticSearch: ElasticSearch是基於Apache Lucene構建的開源搜索引擎 采用Java編寫,提供了簡單易用的RE

elasticsearch 5.x 系列之六 文檔索引,更新,查詢,刪除流程

取數 獲取 info ast 負載均衡 blog img 選擇 將在 一、elasticsearch index 索引流程 步驟: 客戶端向Node1 發送索引文檔請求 Node1 根據文檔ID(_id字段)計算出該文檔應該屬於shard0,然後請求路由到Node3的P0分

Android使用KeyStore資料進行加密

談到 Android 安全性話題,Android Developers 官方網站給出了許多很好的建議和講解,涵蓋了儲存資料、許可權、網路、處理憑據、輸入驗證、處理使用者資料、加密等方方面面 金鑰的保護以及網路傳輸安全 應該是移動應用安全最關鍵的內容。Android 提供大量用來保護資

用各種工具資料進行分類彙總

資料分類彙總的方法有很多種,工具也有很多,這次為大家一一介紹,各種工具如何進行分類彙總,大家自行判斷,覺得哪種最好用,就用哪種,畢竟工具不重要,高效出結果才最重要。 為了方便舉例,所用的資料集就是鸞尾花資料集,5個欄位(Sepal.Length、Sepal.Width、Petal.Length、P

R_Studio(關聯)簡單資料進行關聯分析

      對資料menu_orders.txt檔案資料進行關聯分析        (1)使支援度為0.4、頻繁項集元素個數大於等於2,檢視關聯規則數量的變化,輸出與a相關的規則 #匯入arules包 install.packages("arul

jsp中資料進行批量刪除操作

批量刪除的SQL:delete from user where uid in(主鍵列表);  UserBiz:  //批量刪除  public boolean batchDelete(String[] uids);  UserBizImpl: public

excel如何資料進行多欄位同時排序、多條件組合和多條件篩選呢

對資料的排序不僅可以是對單列或單行進行排序,還可以進行多欄位同時排序,從而達到想要的效果。excel如何對資料進行多欄位同時排序、多條件組合和多條件篩選呢如圖所示 【解決方法,教程視訊資料如下】 本教程視訊資料來源:http://edu.51cto.com/course/15404.html 完整部落格

Excel中如何資料進行簡單排序

excel表格如何排序,在Excel 2013中,對資料表中的資料進行排序時,如果按照單列的內容進行簡單排序,可以直接使用選項板中的“升序”選項或“降序”選項來完成。【解決方法,教程視訊資料如下】 本教程視訊資料來源:http://edu.51cto.com/course/15404.html 完整部落格

R語言學習(四)——資料進行操作

判斷變數的屬性 is.character(x) #判斷是否為字元型 is.numeric(x) #判斷是否為數值型 is.vector(x) #判斷是否為一個向量 is.matrix(x) #判斷是否為一個

elasticsearch 5.x 安裝與優化 (Windows & linux)

es 5.x安裝與優化 es 5.x安裝與優化 1、jdk版本要求 2、安裝步驟 a、建立組與賬戶 b、給相應的目錄許可權 c、修改作業系統的

ElasticSearch 5.X 最佳實踐

轉自大佬:https://www.jishux.com/p/2ccc199e939f5830 Author: 袁野 Date: 2018.01.05 Version: 1.0 注意事項: 本文件所述為通用情況,不可作為特定業務參照; 本文件所述適用於 ELK 棧

SearchView+Filter資料進行簡單過濾

轉:https://www.jianshu.com/p/5078c7fec29e 我是使用ListView實現展示系統應用的demo 執行邏輯是這樣的: 通過SearchView獲取使用者輸入的文字. 把文字傳到Adpater,在Adpater中對關鍵字進行篩選.

ElasticSearch叢集狀態檢視命令大全 && 刨根問底 | Elasticsearch 5.X叢集多節點角色配置深入詳解

https://blog.csdn.net/pilihaotian/article/details/52460747 Elasticsearch中資訊很多,同時ES也有很多資訊檢視命令,可以幫助開發者快速查詢Elasticsearch的相關資訊。 _cat $ cu