1. 程式人生 > >javaAPI實現elasticsearch5.5.2的聚合分析

javaAPI實現elasticsearch5.5.2的聚合分析

前言:

在前面的幾篇文章講到了elasticsearch的搜尋,但是elasticsearch還有強大的聚合分析功能,通過聚合,我們會得到一個數據的概覽,這樣對大資料提取統計指標時就變得遊刃有餘。聚合允許我們向資料提出一些複雜的問題。雖然功能完全不同於搜尋,但它們使用相同的資料結構,執行速度很快並且就像搜尋一樣幾乎是實時的。

一、聚合(Aggregations)的介紹

elasticsearch的聚合(Aggregations)類似於資料庫sql中的分組 group by,count、sum等函式,除此之外他還有更多強大的資料統計分析介面。

聚合有兩個核心概念:

1.桶(bucket):對資料進行分組。比如一個物件User他有一個屬性是city,有如下資料:1.張三 上海 ;2.李四 北京; 3.王五 北京,我們就可以基於city劃分buckets,一個是北京bucket,一個是上海bucket,按照某個欄位進行bucket劃分,那個欄位的值相同的那些資料,就會被劃分到一個bucket中,相當於sql中的group by分組。

2.指標(metric):對一個數據分組執行的統計。當我們有了一堆bucket之後,就可以對每個bucket中的資料進行聚合分詞了,比如說計算一個bucket內所有資料的數量、平均值、最大值等的,metric就是對一個bucket執行的某種聚合分析的操作,相當於資料庫中的avg,sum函式

二、Bucket aggregations 聚合分組的使用

官方參考資料:

我會在第一個例子中將完整的程式碼寫出來,其他的例子仿照第一個進行

1. Terms Aggregation

terms按照某個欄位進行分組,下面的程式碼中也涉及了排序問題,詳細程式碼看註釋

/**TermAggs 包含了Order
	 * termAggs統計每個顏色的個數
	* @Title: countByColor 
	* @Description: TODO(統計每個顏色的銷量) 
	* @param @throws UnknownHostException
	* @return void
	* @autor lpl
	* @date 2017年11月6日
	* @throws
	 */
	@Test
	public void countByColor() throws UnknownHostException{
		//進行聚合查詢,terms按照某個欄位中進行分組
		TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders
				.terms("popular_colors")	//aggs的名稱,自定義,取資料的時候與其一致即可
				.field("color")				//需要分組的欄位
				//.order(Terms.Order.term(true))	//true表示按照term分組的asc排序
				.order(Terms.Order.count(true));  //true表asc按照doc_count排序,false表示desc排序		
				
		//查詢資料進行聚合分析
		SearchResponse response = EsClient.client().prepareSearch("tvs").setTypes("sales")
				.addAggregation(termsAggregationBuilder)
				.setSize(0)
				.execute()
				.actionGet();
		//獲取聚合
		Terms terms = response.getAggregations().get("popular_colors");
		//遍歷處理資料
		for (Terms.Bucket entry : terms.getBuckets()) {
			Map<String,Object> map = new HashMap<String,Object>();
		    String key = (String) entry.getKey();      // Term
		    long docCount = entry.getDocCount(); // Doc count
		    //將結果放到Map中
			map.put("color",key);
			map.put("docCount",docCount);
			System.out.println(map);
		}
	}

在Terms Aggregation涉及到一個Order分組問題,在上面的程式碼上我已經做了簡單的註釋,但是還存在一種排序方式,就是我們按照某一個統計出來欄位進行排序,比如:按照height欄位的平均值進行排序,fasle代表了desc排序

AggregationBuilders
    .terms("genders")
    .field("gender")
    .order(Terms.Order.aggregation("avg_height", false))
    .subAggregation(
        AggregationBuilders.avg("avg_height").field("height")
    )

2. Histogram Aggregation 按照區間進行分組

按照區間分組有Histogram Aggregation 和 Date Histogram Aggregation,我們以兩個簡單的需求進行編寫具體的程式碼

2.1Histogram Aggregation:以2000位區間單位統計電視的銷量

/**
	 * 以2000位區間單位統計電視的銷量
	* @Title: HistogramTest 
	* @param @throws UnknownHostException
	* @return void
	* @autor lpl
	* @date 2017年11月6日
	* @throws
	 */
	@Test
	public void HistogramTest() throws UnknownHostException{
		//histogram:類似於terms,也是進行bucket分組操作,interval:2000,
		//劃分範圍,0~2000,2000~4000,4000~6000,6000~8000,8000~10000
		HistogramAggregationBuilder histogramAggregationBuilder = AggregationBuilders
						.histogram("his_price")
						.field("price")
						.interval(2000);
		//查詢資料進行聚合分析
		SearchResponse response = EsClient.client()
				.prepareSearch("tvs").setTypes("sales")
				.addAggregation(histogramAggregationBuilder)
				.setSize(0)
				.execute()
				.actionGet();
		Histogram agg = response.getAggregations().get("his_price");
		// For each entry
		for (Histogram.Bucket entry : agg.getBuckets()) {
			Map<String,Object> map = new HashMap<String,Object>();
		    Number key = (Number) entry.getKey();   // 區間key
		    long docCount = entry.getDocCount();    // Doc count
		    //資料封裝
		    map.put("key",key);
		    map.put("doc_count",docCount);
		    System.out.println(map);
		}
	}
2.2Date Histogram Aggregation:統計每個月份的電視銷量,這個需求中還涉及了一些其他知識點,在程式碼註釋中有體現
/**
	 * 統計每個月份的電視銷量
	* @Title: dateHistogramTest 
	* @Description: TODO(這裡用一句話描述這個方法的作用) 
	* @param @throws UnknownHostException
	* @return void
	* @autor lpl
	* @date 2017年11月6日
	* @throws
	 */
	@Test
	public void dateHistogramTest() throws UnknownHostException{
		//Date histogram的用法與histogram差不多,只不過區間上支援了日期的表示式。
		DateHistogramAggregationBuilder dateHistogramInterval = AggregationBuilders
						.dateHistogram("date_hist")	//聚合的名稱,可以隨意去取,下邊取資料一致即可
						.field("sold_date")			//需要進行分組的欄位
						.dateHistogramInterval(DateHistogramInterval.MONTH)	//設定時間區間
						.minDocCount(0)				//設定最小的 doc_count,
						.format("yyyy-MM-dd")		//將返回的時間結果進行格式化
						.missing("2016-05-01");		//預設欄位:當遇到沒有值的欄位,就會按照該欄位的值來計算
		//查詢資料進行聚合分析
		SearchResponse response = EsClient.client()
				.prepareSearch("tvs").setTypes("sales")
				.addAggregation(dateHistogramInterval)
				.setSize(0)
				.execute()
				.actionGet();
		//獲得我們設定的聚合
		Histogram agg = response.getAggregations().get("date_hist");
		//迴圈資料輸出
		for (Histogram.Bucket entry : agg.getBuckets()) {
			Map<String,Object> map = new HashMap<String,Object>();
			//處理
			DateTime key = (DateTime) entry.getKey();    // Key
		    String keyAsString = entry.getKeyAsString(); // Key as String
		    long docCount = entry.getDocCount();         // Doc count
		    //封裝
		    map.put("date",keyAsString);
		    map.put("doc_count",docCount);
		    System.out.println(map);
		}
	}
3.Range Aggregation按照範圍進行分組

這個功能介紹中,我會減少程式碼量,只把關鍵程式碼貼出來,程式碼中會有註釋

3.1.數字範圍:

AggregationBuilder rangAgg = AggregationBuilders
				.range("rang_price")
				.field("price")
				.addUnboundedTo(1000)	//從負無窮到1000
				.addRange(1001, 1500)	//從1001-1500
				.addRange(1501, 2000)	//從1501-2000
				.addUnboundedFrom(2001);//2000到正無窮
// 獲得我們設定的聚合
		Range agg = response.getAggregations().get("rang_price");
		// 迴圈資料輸出
		for (Range.Bucket entry : agg.getBuckets()) {
			Map<String, Object> map = new HashMap<String, Object>();
			// 處理
			String key = entry.getKeyAsString();             // Range as key
		    Number from = (Number) entry.getFrom();          // Bucket from
		    Number to = (Number) entry.getTo();              // Bucket to
		    long docCount = entry.getDocCount();    // Doc count
			// 封裝
			map.put("key", key);
			map.put("doc_count", docCount);
			System.out.println(map);
		}
3.2.Date Range Aggregation日期範圍
AggregationBuilder aggregation =
		        AggregationBuilders
		                .dateRange("agg")
		                .field("dateOfBirth")	//field名稱
		                .format("yyyy")			//日期格式
		                .addUnboundedTo("1950")    // from -infinity to 1950 (excluded)
		                .addRange("1950", "1960")  // from 1950 to 1960 (excluded)
		                .addUnboundedFrom("1960"); // from 1960 to +infinity
for (Range.Bucket entry : agg.getBuckets()) {
    String key = entry.getKeyAsString();                // Date range as key
    DateTime fromAsDate = (DateTime) entry.getFrom();   // Date bucket from as a Date
    DateTime toAsDate = (DateTime) entry.getTo();       // Date bucket to as a Date
    long docCount = entry.getDocCount();                // Doc count

}

3.3 IP Range Aggregation ip範圍分組

IpRangeAggregationBuilder aggregation1 =
		        AggregationBuilders
		                .ipRange("agg")
		                .field("ip")
		                .addUnboundedTo("192.168.1.0")             // 從負無窮到192.168.1.0
		                .addRange("192.168.1.0", "192.168.2.0")    // 從  192.168.1.0 到 192.168.2.0 (excluded)
		                .addUnboundedFrom("192.168.2.0");          // 從 192.168.2.0 到 +infinity
for (Range.Bucket entry : aggregation1.getBuckets()) {
		    String key = entry.getKeyAsString();            // Ip 範圍 as key
		    String fromAsString = entry.getFromAsString();  // Ip bucket from as a String
		    String toAsString = entry.getToAsString();      // Ip bucket to as a String
		    long docCount = entry.getDocCount();            // Doc count
		}
4. Filters Aggregation 過濾分組:根據過濾條件進行分組
AggregationBuilder aggregation =
			    AggregationBuilders
			        .filters("agg",
			            new FiltersAggregator.KeyedFilter("men", QueryBuilders.termQuery("gender", "male")),
			            new FiltersAggregator.KeyedFilter("women", QueryBuilders.termQuery("gender", "female")));
// For each entry
		for (Filters.Bucket entry : agg.getBuckets()) {
		    String key = entry.getKeyAsString();            // bucket key
		    long docCount = entry.getDocCount();            // Doc count
		    logger.info("key [{}], doc_count [{}]", key, docCount);
		}
key [men], doc_count [4982]
key [women], doc_count [5018]

三、Metrics aggregations 聚合分析的使用

官方參考文件:

3.1常用的:Avg,Max,Sum等等

我通過一個需求將metric的javaAPI進行說明,需求是:查詢電視銷售每個顏色的平均價格、最高價格、總價格並按照平均價格降序排序,對應的sql語句為:

select avg(price) avg_price,max(price) max_price,sum(price) sum_price from tvs.sales group by color order by avg_price

/**
	 * 查詢電視銷售每個顏色的平均價格、最高價格、總價格並按照平均價格降序排序
	* @Title: metrics 
	* @Description: TODO(這裡用一句話描述這個方法的作用) 
	* @param @throws UnknownHostException
	* @return void
	* @autor lpl
	* @date 2017年11月6日
	* @throws
	 */
	@Test
	public void metrics() throws UnknownHostException{
		// 建立查詢索引和type
		SearchRequestBuilder srBuilder =EsClient.client()
				.prepareSearch("tvs")
				.setTypes("sales");
		// colorAggs
		// 如果需要進行排序的話,可以使用order(Order.aggregation("avg_price",true))
		// "avg_price"對應下班的字聚合的名稱,true表示升序排序。false表示倒敘排序
		TermsAggregationBuilder colorsAgg = AggregationBuilders
				.terms("colors")
				.field("color")
				.order(Order.aggregation("avg_price", true));
		// 平均值
		AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_price").field("price");
		// 最大值a
		MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_price").field("price");
		// 總和
		SumAggregationBuilder sumAgg = AggregationBuilders.sum("sum_price").field("price");

		SearchResponse response = srBuilder.setSize(0)
				.addAggregation(colorsAgg
						.subAggregation(avgAgg)	
						.subAggregation(maxAgg)
						.subAggregation(sumAgg))
				.execute()
				.actionGet();
		Terms terms = response.getAggregations().get("colors");
		//處理結果
		for (Terms.Bucket entry : terms.getBuckets()) {
			Map<String, Object> map = new HashMap<String, Object>();
			// 獲得按照顏色進行分組的桶
			String keyAsString = entry.getKeyAsString();
			// 獲得每個分組的數量
			long docCount = entry.getDocCount();
			// 獲得平均價格
			Avg avg = entry.getAggregations().get("avg_price");
			// 獲得最大價格
			Max max = entry.getAggregations().get("max_price");
			// 獲得價格總和
			Sum sum = entry.getAggregations().get("sum_price");
			map.put("key", keyAsString);
			map.put("docCount", docCount);
			map.put("avg", avg.getValue());
			map.put("max", max.getValue());
			map.put("sum", sum.getValue());
			System.out.println(map);

		}
	}
3.2 Percentile Aggregation百分比聚合分析
PercentilesAggregationBuilder aggregation =
	        AggregationBuilders
	                .percentiles("agg")
	                .field("height")
	                .percentiles(1.0, 5.0, 10.0, 20.0, 30.0, 75.0, 95.0, 99.0);
// sr is here your SearchResponse object
	Percentiles agg = response.getAggregations().get("agg");
	// For each entry
	for (Percentile entry : agg) {
	    double percent = entry.getPercent();    // Percent
	    double value = entry.getValue();        // Value
	}
percent [1.0], value [0.814338896154595]
percent [5.0], value [0.8761912455821302]
percent [25.0], value [1.173346540141847]
percent [50.0], value [1.5432023318692198]
percent [75.0], value [1.923915462033674]
percent [95.0], value [2.2273644908535335]
percent [99.0], value [2.284989339108279]

上邊就是我對elasticsearch的聚合分析的一些javaAPI使用的總結,當然我只寫出了一部分,強大的elasticsearch還有很多的介面,大家可以通過上邊我提供的連線進入elasticsearch的官網進行查詢。在前面的文章中我也對elasticsearch的常用搜索、curd、高亮顯示、搜尋建議進行了記錄,如果大家有興趣可以去翻閱。文章中有哪些地方有誤請大家及時指出,感激不盡,相互交流,共同進步!