1. 程式人生 > >Elasticsearch 2.0以上版本根據條件批量刪除Java如何實現

Elasticsearch 2.0以上版本根據條件批量刪除Java如何實現

Elasticsearch在2.0以前版本,刪除操作有兩種方式,一種是通過id來進行刪除,但是這種方式一般不常用,因為id不容易得到;另一種方式是通過先查詢操作,然後刪除,也就是通過client.prepareDeleteByQuery這種方式來根據條件批量刪除資料:

DeleteByQueryResponse response = client.prepareDeleteByQuery("library") .setQuery(QueryBuilders.termQuery("title", "ElasticSearch")) .execute().actionGet();

但是Delete by Query在2.0版本及其以上的版本已經被移除了,因為這種方式會自動強制重新整理,所以在大量索引併發的情況下,會很快造成記憶體溢位。 詳情可檢視:https://www.elastic.co/guide/en/elasticsearch/client/java-api/1.7/delete-by-query.html

那麼在2.0以後的版本,我們如何來進行批量的刪除呢?

我們可以先通過Search API查詢,然後得到需要刪除的批量資料的id,然後再通過id來刪除,但是這種方式在大批量資料的刪除的時候,依然是行不通的。

具體實現程式碼:

    public void deleteByTerm(Client client){         BulkRequestBuilder bulkRequest = client.prepareBulk();         SearchResponse response = client.prepareSearch("megacorp").setTypes("employee")                 .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)                 .setQuery(QueryBuilders.termQuery("first_name", "xiaoming"))                 .setFrom(0).setSize(20).setExplain(true).execute().actionGet();         for(SearchHit hit : response.getHits()){             String id = hit.getId();             bulkRequest.add(client.prepareDelete("megacorp", "employee", id).request());         }         BulkResponse bulkResponse = bulkRequest.get();         if (bulkResponse.hasFailures()) {             for(BulkItemResponse item : bulkResponse.getItems()){                 System.out.println(item.getFailureMessage());             }         }else {             System.out.println("delete ok");         }              }

同樣通過delete-by-query外掛,我們還可以根據type來批量刪除資料,這種方式能夠刪除大批量的資料,他是現將要刪除的資料一個一個做標記,然後再刪除,於是效率會比較低。下面是官網的說明:https://www.elastic.co/guide/en/elasticsearch/plugins/2.3/plugins-delete-by-query.html

Queries which match large numbers of documents may run for a long time, as every document has to be deleted individually. Don’t use delete-by-query to clean out all or most documents in an index. Rather create a new index and perhaps reindex the documents you want to keep. 

可見這種刪除方式並不適合大批量資料的刪除,因為效率真的是很低,我是親身體驗過了。

這種方式需要先引入delete-by-query外掛包,然後使用外掛的api來刪除:

        <dependency>             <groupId>org.elasticsearch.plugin</groupId>             <artifactId>delete-by-query</artifactId>             <version>2.3.2</version>         </dependency>

具體實現程式碼:

import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ResourceBundle; import java.util.Stack;   import org.elasticsearch.action.deletebyquery.DeleteByQueryAction; import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder; import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.plugin.deletebyquery.DeleteByQueryPlugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory;   import com.xgd.log.common.ExceptionUtil;   public class EsDeleteByType {       private static final Logger logger = LoggerFactory.getLogger(EsDeleteByType.class);     private Client client;          private static ResourceBundle getEsConfig(){         return ResourceBundle.getBundle("elasticsearch");     }          private void getClient(){         String clusterName = getEsConfig().getString("clusterName");         String hosts = getEsConfig().getString("hosts");         if (hosts == null || clusterName == null) {             throw new IllegalArgumentException("hosts or clusterName was null.");         }         Settings settings = Settings.settingsBuilder().put("cluster.name", clusterName).build();         client = TransportClient.builder()                 .addPlugin(DeleteByQueryPlugin.class)                 .settings(settings).build();         String[] hostsArray = hosts.split(",");         for(String hostAndPort : hostsArray){             String[] tmpArray = hostAndPort.split(":");             try {                 client = ((TransportClient)client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(tmpArray[0]), Integer.valueOf(tmpArray[1])));             } catch (NumberFormatException e) {                 logger.error(ExceptionUtil.getTrace(e));             } catch (UnknownHostException e) {                 logger.error(ExceptionUtil.getTrace(e));             }         }     }          /**      * 判斷一個index中的type是否有資料      * @param index      * @param type      * @return      * @throws Exception      */     public Boolean existDocOfType(String index, String type) throws Exception {         SearchRequestBuilder builder = client.prepareSearch(index).setTypes(type)                 .setSearchType(SearchType.QUERY_THEN_FETCH)                 .setSize(1);         SearchResponse response = builder.execute().actionGet();         long docNum = response.getHits().getTotalHits();         if (docNum == 0) {             return false;         }         return true;     }       /**      * 根據type來刪除資料      * @param index      * @param types      * @return      */     public long deleteDocByType(String index, String[] types) {         getClient();         long oldTime = System.currentTimeMillis();         StringBuilder b = new StringBuilder();         b.append("{\"query\":{\"match_all\":{}}}");         DeleteByQueryResponse response = new DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)         .setIndices(index).setTypes(types)         .setSource(b.toString())         .execute().actionGet();         Stack<String> allTypes = new Stack<String>();         for(String type : types){             allTypes.add(type);         }         while(!allTypes.isEmpty()){             String type = allTypes.pop();             while(true){                 try {                     if (existDocOfType(index, type) == false) {                         break;                     }                 } catch (Exception e) {                     logger.error("queryError: " + e.getMessage());                 }             }         }         System.out.println(System.currentTimeMillis() - oldTime);         return response.getTotalDeleted();     } }

那麼當我們在開發中,使用到elasticsearch的時候,總會涉及到大批量資料的刪除,我們要怎麼辦呢? 經過很長時間的糾結,我發現使用elasticsearch儲存資料的時候,千萬不要把所有資料都儲存於一個index,這樣一個是不利於查詢的效率,一個是不利於後面的刪除,既然我們不能index中去刪除部分的大批量資料,那麼我們為啥不改變一種思路呢,就是分索引,然後通過索引來刪除資料,例如:我在生產上面,每天有5億的資料,那麼我每天在叢集中生成一個index用於儲存這5億的資料,如果我們的elasticsearch叢集對資料只要求儲存7天的資料,超過7天的資料就可以刪除了,這樣我們可以通過index直接刪除7天以前的資料,這種方式,我們在查詢的時候不會在所有資料中查詢,只需要在所要查詢的時間段內查詢,便提高了查詢的效率,同時刪除效率的問題也解決了,能夠很快刪除不需要的資料,釋放掉磁碟空間。

針對於elasticsearch大批量資料刪除效率的問題,目前官網上面也沒有一個特別好的解決辦法,這種方式算是目前還算能行得通的方式了。