Elasticsearch 2.0以上版本根據條件批量刪除Java如何實現
Elasticsearch在2.0以前版本,刪除操作有兩種方式,一種是通過id來進行刪除,但是這種方式一般不常用,因為id不容易得到;另一種方式是通過先查詢操作,然後刪除,也就是通過client.prepareDeleteByQuery這種方式來根據條件批量刪除資料:
DeleteByQueryResponse response = client.prepareDeleteByQuery("library") .setQuery(QueryBuilders.termQuery("title", "ElasticSearch")) .execute().actionGet();
但是Delete by Query在2.0版本及其以上的版本已經被移除了,因為這種方式會自動強制重新整理,所以在大量索引併發的情況下,會很快造成記憶體溢位。 詳情可檢視:https://www.elastic.co/guide/en/elasticsearch/client/java-api/1.7/delete-by-query.html
那麼在2.0以後的版本,我們如何來進行批量的刪除呢?
我們可以先通過Search API查詢,然後得到需要刪除的批量資料的id,然後再通過id來刪除,但是這種方式在大批量資料的刪除的時候,依然是行不通的。
具體實現程式碼:
public void deleteByTerm(Client client){ BulkRequestBuilder bulkRequest = client.prepareBulk(); SearchResponse response = client.prepareSearch("megacorp").setTypes("employee") .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setQuery(QueryBuilders.termQuery("first_name", "xiaoming")) .setFrom(0).setSize(20).setExplain(true).execute().actionGet(); for(SearchHit hit : response.getHits()){ String id = hit.getId(); bulkRequest.add(client.prepareDelete("megacorp", "employee", id).request()); } BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { for(BulkItemResponse item : bulkResponse.getItems()){ System.out.println(item.getFailureMessage()); } }else { System.out.println("delete ok"); } }
同樣通過delete-by-query外掛,我們還可以根據type來批量刪除資料,這種方式能夠刪除大批量的資料,他是現將要刪除的資料一個一個做標記,然後再刪除,於是效率會比較低。下面是官網的說明:https://www.elastic.co/guide/en/elasticsearch/plugins/2.3/plugins-delete-by-query.html
Queries which match large numbers of documents may run for a long time, as every document has to be deleted individually. Don’t use delete-by-query to clean out all or most documents in an index. Rather create a new index and perhaps reindex the documents you want to keep.
可見這種刪除方式並不適合大批量資料的刪除,因為效率真的是很低,我是親身體驗過了。
這種方式需要先引入delete-by-query外掛包,然後使用外掛的api來刪除:
<dependency> <groupId>org.elasticsearch.plugin</groupId> <artifactId>delete-by-query</artifactId> <version>2.3.2</version> </dependency>
具體實現程式碼:
import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ResourceBundle; import java.util.Stack; import org.elasticsearch.action.deletebyquery.DeleteByQueryAction; import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.plugin.deletebyquery.DeleteByQueryPlugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.xgd.log.common.ExceptionUtil; public class EsDeleteByType { private static final Logger logger = LoggerFactory.getLogger(EsDeleteByType.class); private Client client; private static ResourceBundle getEsConfig(){ return ResourceBundle.getBundle("elasticsearch"); } private void getClient(){ String clusterName = getEsConfig().getString("clusterName"); String hosts = getEsConfig().getString("hosts"); if (hosts == null || clusterName == null) { throw new IllegalArgumentException("hosts or clusterName was null."); } Settings settings = Settings.settingsBuilder().put("cluster.name", clusterName).build(); client = TransportClient.builder() .addPlugin(DeleteByQueryPlugin.class) .settings(settings).build(); String[] hostsArray = hosts.split(","); for(String hostAndPort : hostsArray){ String[] tmpArray = hostAndPort.split(":"); try { client = ((TransportClient)client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(tmpArray[0]), Integer.valueOf(tmpArray[1]))); } catch (NumberFormatException e) { logger.error(ExceptionUtil.getTrace(e)); } catch (UnknownHostException e) { logger.error(ExceptionUtil.getTrace(e)); } } } /** * 判斷一個index中的type是否有資料 * @param index * @param type * @return * @throws Exception */ public Boolean existDocOfType(String index, String type) throws Exception { SearchRequestBuilder builder = client.prepareSearch(index).setTypes(type) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(1); SearchResponse response = builder.execute().actionGet(); long docNum = response.getHits().getTotalHits(); if (docNum == 0) { return false; } return true; } /** * 根據type來刪除資料 * @param index * @param types * @return */ public long deleteDocByType(String index, String[] types) { getClient(); long oldTime = System.currentTimeMillis(); StringBuilder b = new StringBuilder(); b.append("{\"query\":{\"match_all\":{}}}"); DeleteByQueryResponse response = new DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) .setIndices(index).setTypes(types) .setSource(b.toString()) .execute().actionGet(); Stack<String> allTypes = new Stack<String>(); for(String type : types){ allTypes.add(type); } while(!allTypes.isEmpty()){ String type = allTypes.pop(); while(true){ try { if (existDocOfType(index, type) == false) { break; } } catch (Exception e) { logger.error("queryError: " + e.getMessage()); } } } System.out.println(System.currentTimeMillis() - oldTime); return response.getTotalDeleted(); } }
那麼當我們在開發中,使用到elasticsearch的時候,總會涉及到大批量資料的刪除,我們要怎麼辦呢? 經過很長時間的糾結,我發現使用elasticsearch儲存資料的時候,千萬不要把所有資料都儲存於一個index,這樣一個是不利於查詢的效率,一個是不利於後面的刪除,既然我們不能index中去刪除部分的大批量資料,那麼我們為啥不改變一種思路呢,就是分索引,然後通過索引來刪除資料,例如:我在生產上面,每天有5億的資料,那麼我每天在叢集中生成一個index用於儲存這5億的資料,如果我們的elasticsearch叢集對資料只要求儲存7天的資料,超過7天的資料就可以刪除了,這樣我們可以通過index直接刪除7天以前的資料,這種方式,我們在查詢的時候不會在所有資料中查詢,只需要在所要查詢的時間段內查詢,便提高了查詢的效率,同時刪除效率的問題也解決了,能夠很快刪除不需要的資料,釋放掉磁碟空間。
針對於elasticsearch大批量資料刪除效率的問題,目前官網上面也沒有一個特別好的解決辦法,這種方式算是目前還算能行得通的方式了。