1. 程式人生 > >ElasticSearch初體驗之使用Java進行最基本的增刪改查

ElasticSearch初體驗之使用Java進行最基本的增刪改查

好久沒寫博文了, 最近專案中使用到了ElaticSearch相關的一些內容, 剛好自己也來做個總結。
現在自己也只能算得上入門, 總結下自己在工作中使用Java操作ES的一些小經驗吧。

本文總共分為三個部分:
一:ES相關基本概念及原理
二:ES使用場景介紹
三:使用Java進行ES的增刪改查及程式碼講解

一:ES相關基本概念:
ElasticSearch(簡稱ES)是一個基於Lucene構建的開源、分散式、RESTful的全文字搜尋引擎。

不過,ElasticSearch卻也不僅只是一個全文字搜尋引擎,它還是一個分散式實時文件儲存,其中每個field均是被索引的資料且可被搜尋;也是一個帶實時分析功能的分散式搜尋引擎,並且能夠擴充套件至數以百計的伺服器儲存及處理PB級的資料。

如前所述,ElasticSearch在底層利用Lucene完成其索引功能,因此其許多基本概念源於Lucene。
我們先說說ES的基本概念。

  • 索引 Index:對資料的邏輯儲存(倒排索引),不儲存原
    始值。
  • 型別 Type:對索引的邏輯分類,可以有⼀個或多個分類。
  • ⽂檔 Document:基本資料單元,JSON。
  • 欄位 Filed

關係型資料與ES對比:
Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices -> Types -> Documents -> Fields

這裡再說下ES中很重要的概念--倒排索引。這同樣也是solr,lucene中所使用的索引方式。

例如我們正常的索引:

當我們在關係型資料庫中,都是有id索引的, 我們通過id去查value速度是很快的。
但是如果我們想查value中包含字母b的值呢?特別是資料量很大的時候, 這種以id為索引的方式是不是就不適合了?
那麼這裡就適合使用倒排索引了:

這裡將value進行分詞, 然後將分詞結果拿出來當做索引
跟正向的索引比較,也就是做了一個倒置,這就是倒排索引的思想

二,ES使用場景介紹
1、全文搜尋(搜尋引擎)
在一組文件中查詢某一單詞所在文件及位置
2、模糊匹配
通過使用者的輸入去匹配詞庫中符合條件的詞條
3、商品搜尋
通過商品的關鍵字去資料來源中查詢符合條件的商品

在我自己的專案中使用的情況是我有上百萬的文章需要被通過各種條件檢索到, 所以這裡就直接使用ES, 現在線上檢索速度都是10ms之內返回。

下面看看ES資料在瀏覽器的展示形式以及視覺化介面的搜尋:

三:使用Java進行ES的增刪改查及程式碼講解
1, 使用ES進行增加和更新操作。

//首先在專案啟動的時候生成esClient, 這個我們公司自己封裝好了的。
@PostConstruct
    publicvoidinit() {
        esClient = new ESClient<>(esConfig.getAddress(), esConfig.getCluster(), esConfig.getIndex(),
                esConfig.getUsername(), esConfig.getPassword(), ES_TYPE_MIXEDDATA, EsMixedDataDto.class);
    }

上面EsMixedDataDto是自己構建的一個類, ES中儲存的欄位就是這個類中的所有欄位。
接著是增加和更新操作了:

//同步到ES中
articleEsService.upsertDocument(esMixedDataDto);

/** * 建立或更新索引 * * @param esMixedDataDto * @return */
publicbooleanupsertDocument(EsMixedDataDto esMixedDataDto) {
        return esClient.upsertDocument(esMixedDataDto.getMixId(), esMixedDataDto);
    }

這個還是呼叫了系統封裝好的esClient中的insertOrUpdate方法,最後我會把ESClient中所有封裝的方法都貼出來, 其內部就是呼叫了ES原生的insert或者update方法的。

2,使用ES進行刪除操作

/** * 刪除索引 */
publicvoiddeleteIndex(String id) throws Exception{
    esClient.deleteDocument(id);
}

同上,也是使用了esClient中的delete方法,後面我會貼上esClient中所有方法。

3,使用ES進行查詢
3.1 當然ES最重要的還是多維度的查詢, 這裡也是我要講的重點。
首先來個最簡單的搜尋一篇文章的標題:

//通過關鍵詞來查詢文章集合
public PageResponse<EsMixedDataDto> queryForKeyword(String searchText, boolean highlight, PageRequest pageRequesto) {
    SearchRequestBuilder searchRequestBuilder = esClient.prepareSearch()
            .setTypes(ES_TYPE_MIXEDDATA)
            .setSearchType(SearchType.QUERY_THEN_FETCH)
            .setQuery(QueryBuilders.multiMatchQuery(searchText, "title").type(MultiMatchQueryBuilder.Type.BEST_FIELDS))
            .setFrom(pageRequest.getOffset())
            .setSize(pageRequest.getLimit())
            .setExplain(false)
            ;

    if (highlight) {
        searchRequestBuilder.addHighlightedField("title", 100, 1)
                .setHighlighterPreTags("<font color='red'>")
                .setHighlighterPostTags("</font>");
    }

    try {
        //這裡就是給es傳送搜尋指令了
        return getMixedData(searchRequestBuilder);
    } catch (Exception e) {
        log.error("ES搜尋異常!", e.getMessage());
        throw new RuntimeException(e);
    }
}

這裡先說說search_type, 也就是上面setSearchType(SearchType.QUERY_THEN_FETCH)的內容:

  • query_then_fetch:執⾏查詢得到對⽂檔進⾏排序的所需資訊(在所
    有分⽚上執⾏),然後在相關分⽚上查詢⽂檔實際內容。返回結果的
    最⼤數量等於size引數的值。
  • query_and_fetch:查詢在所有分⽚上並⾏執⾏,所有分⽚返回等於
    size值的結果數,最終返回結果的最⼤數量等於size的值乘以分⽚
    數。分⽚較多時會消耗過多資源。
  • count:只返回匹配查詢的⽂檔數量。
  • scan:⼀般在需要返回⼤量結果時使⽤。在傳送第⼀次請求後,ES
    會返回⼀個滾動識別符號,類似於資料庫中的遊標。

我這裡使用的是query_then_fetch。

3.2 緊接著說個多條件複雜的查詢:

/** * @param jiaxiaoId: 駕校id * @param title 文章的title關鍵詞 * @param publishStatus 釋出狀態 * @param stickStatus 置頂狀態 * @param pageRequest 請求的頁碼和條數 * @param highlight 搜尋結果是否高亮顯示 */
public PageResponse<EsMixedDataDto> queryByConditions(Long jiaxiaoId, String title, PageRequest pageRequest, int publishStatus, int stickStatus, boolean highlight) {
    BoolQueryBuilder booleanQueryBuilder = QueryBuilders.boolQuery();
    booleanQueryBuilder.must(QueryBuilders.termQuery("jiaxiaoId", jiaxiaoId));
    if (StringUtils.isNotBlank(title)) {
        booleanQueryBuilder.must(QueryBuilders.multiMatchQuery(title, "title").type(MultiMatchQueryBuilder.Type.BEST_FIELDS));
    }

    //這裡是新增是否釋出的搜尋條件, 預設是隻展示已釋出的文章
    if (publishStatus == CommonConstants.DataStatus.INIT_STATUS) {
        booleanQueryBuilder.must(QueryBuilders.termQuery("publishStatus", CommonConstants.DataStatus.INIT_STATUS));
    } else {
        booleanQueryBuilder.mustNot(QueryBuilders.termQuery("publishStatus", CommonConstants.DataStatus.INIT_STATUS));
    }

    //這裡是新增是否置頂的搜尋條件
    if (stickStatus == CommonConstants.DataStatus.PUBLISH_STATUS) {
        booleanQueryBuilder.must(QueryBuilders.termQuery("stickStatus", CommonConstants.DataStatus.PUBLISH_STATUS));
    } else if(stickStatus == CommonConstants.DataStatus.INIT_STATUS){
        booleanQueryBuilder.mustNot(QueryBuilders.termQuery("stickStatus", CommonConstants.DataStatus.PUBLISH_STATUS));
    }

    SearchRequestBuilder searchRequestBuilder = esClient.prepareSearch()
            .setTypes(ES_TYPE_MIXEDDATA)
            .setSearchType(SearchType.QUERY_THEN_FETCH)
            .setQuery(booleanQueryBuilder)
            .setFrom(pageRequest.getOffset())
            .setSize(pageRequest.getLimit())
            .addSort("stickStatus", SortOrder.DESC)
            .setExplain(false)
            ;

    if (jiaxiaoId == null) {
        BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter()
                .must(FilterBuilders.missingFilter("jiaxiaoId"));
        searchRequestBuilder.setPostFilter(filterBuilder);
    }
    
    if (highlight) {
        searchRequestBuilder.addHighlightedField("title", 100, 1)
                .setHighlighterPreTags("<font color='red'>")
                .setHighlighterPostTags("</font>");
    } else {
        searchRequestBuilder.addSort("publishTime", SortOrder.DESC);
    }

    try {
        return getMixedData(searchRequestBuilder);
    } catch (Exception e) {
        log.error("ES搜尋異常!", e.getMessage());
        throw new RuntimeException(e);
    }
}

這裡不用的就是使用query和filterBuilder,searchRequestBuilder中可以設定query和postFilter。
Debug到這裡, 其實寫的查詢語句最終還是拼接成了一個ES可讀的結構化查詢語句:

3.3 最後貼上最重要的一個類ESClient.java, 這是我們針對於ElasticSearch封裝的一個類。

public class ESClient<T> {
    private static final Logger LOG = LoggerFactory.getLogger(ESClient.class);

    private static final String DEFAULT_ANALYZER = "ik_smart";
    private static final DozerBeanMapper dozerBeanMapper = new DozerBeanMapper();

    private final Client client;
    private String index;
    private Class<T> clazz;
    private String type;
    private BulkProcessor bulkProcessor;
    private List<String> serverHttpAddressList = Lists.newArrayList();

    private Map<String, JSONObject> sqlJsonMap = Maps.newHashMap();

    /**     * 初始化一個連線ElasticSearch的客戶端     *     * @param addresses   ES伺服器的Transport地址和埠的列表,多個伺服器用逗號分隔,例如 localhost:9300,localhost:9300,...     * @param clusterName 叢集名稱     * @param index       索引名稱,這裡應該使用專案名稱     * @param username    使用者名稱稱     * @param password    使用者密碼     * @param type        索引型別     * @param clazz       儲存類     */
    publicESClient(String addresses, String clusterName, String index,
                    String username, String password, String type, Class<T> clazz) {
        if (StringUtils.isBlank(addresses)) {
            throw new RuntimeException("沒有給定的ES伺服器地址。");
        }

        this.index = index;
        this.type = type;
        this.clazz = clazz;

        // 獲得連結地址物件列表
        List<InetSocketTransportAddress> addressList = Lists.transform(
                Splitter.on(",").trimResults().omitEmptyStrings().splitToList(addresses),
                new Function<String, InetSocketTransportAddress>() {
                    @Override
                    public InetSocketTransportAddress apply(String input) {
                        String[] addressPort = input.split(":");
                        String address = addressPort[0];
                        Integer port = Integer.parseInt(addressPort[1]);

                        serverHttpAddressList.add(address + ":" + 9200);
                        return new InetSocketTransportAddress(address, port);
                    }
                }
        );

        // 建立關於ES的配置
        ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder()
                .put("cluster.name", clusterName)
                .put("client.transport.sniff", false);
        if (StringUtils.isNotBlank(username)) {
            builder.put("shield.user", username + ":" + password);
        }
        Settings settings = builder.build();

        // 生成原生客戶端
        TransportClient transportClient = new TransportClient(settings);
        for (InetSocketTransportAddress address : addressList) {
            transportClient.addTransportAddress(address);
        }
        client = transportClient;
        bulkProcessor = BulkProcessor.builder(
                client, new BulkProcessor.Listener() {
                    @Override
                    publicvoidbeforeBulk(long executionId, BulkRequest request) {
                    }

                    @Override
                    publicvoidafterBulk(long executionId, BulkRequest request, BulkResponse response) {
                    }

                    @Override
                    publicvoidafterBulk(long executionId, BulkRequest request, Throwable failure) {
                        throw new RuntimeException(failure);
                    }
                }).build();
    }

    /**     * 初始化連線ElasticSearch的客戶端     *     * @param client 原生客戶端     * @param index  索引名稱     * @param type   型別     * @param clazz  儲存類     */
    publicESClient(Client client, String index, String type, Class<T> clazz) {
        this.client = client;
        this.index = index;
        this.type = type;
        this.clazz = clazz;
    }

    /**     * 向ES傳送儲存請求,將一個物件儲存到伺服器。     *     * @param id 該物件的id     * @param t  儲存例項     * @return 是否儲存成功     */
    publicbooleanindexDocument(String id, T t) {
        return indexDocument(id, type, t);
    }

    /**     * 向ES傳送儲存請求,將一個物件儲存到伺服器。     *     * @param t 儲存例項     * @return 返回儲存之後在ES伺服器內生成的隨機ID     */
    public String indexDocument(T t) {
        IndexResponse indexResponse = client.prepareIndex(index, type)
                .setSource(toJSONString(t))
                .execute()
                .actionGet();
        return indexResponse.getId();
    }

    /**     * 向ES傳送儲存請求,將一個物件儲存到伺服器,這個方法允許使用者手動指定該物件的儲存型別名稱     *     * @param id   物件id     * @param type 儲存型別     * @param t    儲存例項     * @return 是否儲存成功     */
    publicbooleanindexDocument(String id, String type, T t) {
        IndexResponse indexResponse = client.prepareIndex(index, type, id)
                .setSource(toJSONString(t))
                .execute()
                .actionGet();

        return true;
    }

    /**     * 向ES傳送批量儲存請求, 請求不會馬上提交,而是會等待到達bulk設定的閾值後進行提交.<br/>
     * 最後客戶端需要呼叫{@link #flushBulk()}方法.     *     * @param id 物件id     * @param t  儲存例項     * @return 成功表示放入到bulk成功, 可能會丟擲runtimeException     */
    public boolean indexDocumentBulk(String id, T t) {
        return indexDocumentBulk(id, type, t);
    }

    /**     * 向ES傳送批量儲存請求,將一個物件儲存到伺服器,這個方法允許使用者手動指定該物件的儲存型別名稱     *     * @param id   物件id     * @param type 儲存型別     * @param t    儲存例項     * @return 成功表示放入到bulk成功, 可能會丟擲runtimeException     * @see #indexDocument(String, Object)     */
    public boolean indexDocumentBulk(String id, String type, T t) {
        IndexRequest indexRequest = new IndexRequest(index, type, id).source(toJSONString(t));
        bulkProcessor.add(indexRequest);
        return true;
    }

    /**     * 向ES傳送批量儲存請求, 請求不會馬上提交,而是會等待到達bulk設定的閾值後進行提交.<br/>     * 最後客戶端需要呼叫{@link #flushBulk()}方法.     *     * @param t 儲存例項     * @return 成功表示放入到bulk成功, 可能會丟擲runtimeException     */
    public boolean indexDocumentBulk(T t) {
        IndexRequest indexRequest = new IndexRequest(index, type).source(toJSONString(t));
        bulkProcessor.add(indexRequest);
        return true;
    }

    public boolean indexDocumentBulk(List<T> list) {
        for (T t : list) {
            indexDocumentBulk(t);
        }
        return true;
    }

    /**     * 向ES傳送批量儲存請求, 允許傳入一個Function, 用來從物件中獲取ID.     *     * @param list       物件列表     * @param idFunction 獲取ID     * @return 成功表示放入到bulk成功, 可能會丟擲runtimeException     */
    public boolean indexDocumentBulk(List<T> list, Function<T, String> idFunction) {
        for (T t : list) {
            indexDocumentBulk(idFunction.apply(t), t);
        }
        return true;
    }

    /**     * 向ES傳送更新文件請求,將一個物件更新到伺服器,會替換原有對應ID的資料。     *     * @param id id     * @param t  儲存物件     * @return 是否更新成功     */
    public boolean updateDocument(String id, T t) {
        return updateDocument(id, type, t);
    }

    /**     * 向ES傳送更新文件請求,將一個物件更新到伺服器,會替換原有對應ID的資料。     *     * @param id   id     * @param type 儲存型別     * @param t    儲存物件     * @return 是否更新成功     */
    public boolean updateDocument(String id, String type, T t) {
        client.prepareUpdate(index, type, id).setDoc(toJSONString(t))
                .execute().actionGet();
        return true;
    }

    /**     * 向ES傳送批量更新請求     *     * @param id 索引ID     * @param t  儲存物件     * @return 成功表示放入到bulk成功, 可能會丟擲runtimeException     */
    public boolean updateDocumentBulk(String id, T t) {
        UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(toJSONString(t));
        bulkProcessor.add(updateRequest);
        return true;
    }

    /**     * 向ES傳送upsert請求, 如果該document不存在將會新建這個document, 如果存在則更新.     *     * @param id id     * @param t  儲存物件     * @return 是否執行成功     */
    public boolean upsertDocument(String id, T t) {
        return upsertDocument(id, type, t);
    }

    /**     * 向ES傳送upsert請求, 如果該document不存在將會新建這個document, 如果存在則更新.     *     * @param id   id     * @param type 儲存型別     * @param t    儲存物件     * @return 是否執行成功     */
    public boolean upsertDocument(String id, String type, T t) {
        client.prepareUpdate(index, type, id).setDocAsUpsert(true).setDoc(toJSONString(t))
                .execute().actionGet();
        return true;
    }

    /**     * 向ES傳送批量upsert的請求.     *     * @param id id     * @param t  儲存物件     * @return 是否執行成功     */
    public boolean upsertDocumentBulk(String id, T t) {
        UpdateRequest updateRequest = new UpdateRequest(index, type, id)
                .doc(toJSONString(t));
        updateRequest.docAsUpsert(true);
        bulkProcessor.add(updateRequest);
        return true;
    }

    /**     * 向ES傳送獲取指定ID文件的請求     *     * @param id id     * @return 搜尋引擎例項     * @throws Exception     */
    public T getDocument(String id) throws Exception {
        try {
            GetResponse getResponse = client.prepareGet(index, type, id)
                    .execute().actionGet();
            if (getResponse.getSource() == null) {
                return null;
            }
            JSONObject jsonObject = new JSONObject(getResponse.getSource());

            T t = clazz.newInstance();
            toObject(t, jsonObject);
            return t;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**     * 向ES傳送刪除指定ID文件的請求     *     * @param id id     * @return 是否刪除成功     * @throws Exception     */
    public boolean deleteDocument(String id) throws Exception {
        return deleteDocument(id, type);
    }

    /**     * 向ES傳送刪除指定ID文件的請求     *     * @param id   id     * @param type 儲存型別     * @return 是否刪除成功     * @throws Exception     */
    public boolean deleteDocument(String id, String type) throws Exception {
        DeleteResponse deleteResponse = client.prepareDelete(index, type, id)
                .execute().actionGet();
        return deleteResponse.isFound();
    }

    /**     * 向ES傳送搜尋文件的請求,返回分頁結果     *     * @param searchText 搜尋內容     * @return 分頁結果     * @throws Exception     */
    public PageResponse<T> searchDocument(String searchText) throws Exception {
        PageRequest pageRequest = WebContext.get().page();
        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index)
                .setTypes(type)
                .setQuery(QueryBuilders.matchQuery("_all", searchText))
                .setFrom(pageRequest.getOffset())
                .setSize(pageRequest.getLimit())
                .setFetchSource(true);
        return searchDocument(searchRequestBuilder);
    }

    /**     * 向ES傳送搜尋文件的請求,返回列表結果     *     * @param searchText 搜尋內容     * @param start      起始位置     * @param size       獲取資料大小     * @return 返回資料列表     * @throws Exception     */
    public List<T> searchDocument(String searchText, int start, int size) throws Exception {
        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index)
                .setTypes(type)
                .setQuery(QueryBuilders.matchQuery("_all", searchText))
                .setFrom(start)
                .setSize(size)
                .setFetchSource(true);

        PageResponse<T> pageResponse = searchDocument(searchRequestBuilder);
        return pageResponse.getItemList();
    }

    /**     * 向ES傳送搜尋文件的請求,返回列表結果     *     * @param searchText 搜尋內容     * @param type       型別     * @param start      起始位置     * @param size       資料大小     * @return 返回資料列表     * @throws Exception     */
    public List<T> searchDocument(String searchText, String type, int start, int size) throws Exception {
        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index)
                .setTypes(type)
                .setQuery(QueryBuilders.matchQuery("_all", searchText))
                .setFrom(start)
                .setSize(size)
                .setFetchSource(true);

        PageResponse<T> pageResponse = searchDocument(searchRequestBuilder);
        return pageResponse.getItemList();
    }

    /**     * 向ES傳送搜尋文件的請求,返回分頁結果     *     * @param searchRequestBuilder 搜尋構造器     * @return 分頁結果     * @throws Exception     */
    public PageResponse<T> searchDocument(SearchRequestBuilder searchRequestBuilder) throws Exception {
        SearchResponse searchResponse = search(searchRequestBuilder);
        return searchResponseToPageResponse(searchResponse);
    }

    /**     * 獲得scrollId對應的資料. 請檢視{@link #getScrollId(SearchRequestBuilder, int, int)}.<br/>     * 可以反覆呼叫該方法, 直到返回資料為0.     *<