1. 程式人生 > >Elasticsearch全文檢索企業開發記錄總結(二):ES客戶端搭建

Elasticsearch全文檢索企業開發記錄總結(二):ES客戶端搭建

專案依賴

   <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>transport</artifactId>
        <version>5.4.0</version>
    </dependency>

<dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.8.2</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.24</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.21</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.6.1</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.6.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.7</version>
    </dependency>

配置ES叢集節點

public class EsConfig {
    /**
     * Es叢集名稱
     */
    public static final String ES_CLUSTER_NAME = SystemConfig.get("es.cluster.name");

    /**
     * 已經解析的埠和地址
     */
    public static List<URL> es_cluster_urls = loadUrls();

    public static List<URL> loadUrls() {
        String urls = SystemConfig.get("es.cluster.url"
); if (!StringUtils.hasLength(urls)) { throw new IllegalStateException("Es cluster urls must not be empty."); } List<URL> ups = new LinkedList<>(); String[] urlArray = urls.split(","); for (String up : urlArray) { String url = up.split(":"
)[0]; String port = up.split(":")[1]; ups.add(new URL(url, Integer.parseInt(port))); } return ups; } public static class URL { private String url; private int port; public URL() { } public URL(String url, int port) { this.port = port; this.url = url; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } } }

ES客戶端單例建立

public class ESClient {
    private static Logger log = LoggerFactory.getLogger(ESClient.class);
    private static ESClient instance = new ESClient();
    private static TransportClient client;

    private ESClient() {
        newTransportClient();
    }

    public static ESClient instance() {
        return instance;
    }

    public TransportClient getTransportClient() {
        // client建立完成後,如果es連線失敗將會自動重連
        if (client == null/* || client.connectedNodes().isEmpty()*/) {
            newTransportClient();
        }
        return client;
    }

    private void newTransportClient() {
        Settings settings = Settings.EMPTY;
        if (StringUtils.hasLength(EsConfig.ES_CLUSTER_NAME))
            settings = Settings.builder().put("cluster.name", EsConfig.ES_CLUSTER_NAME).build();
        client = new PreBuiltTransportClient(settings);
        if (EsConfig.es_cluster_urls.size() == 0) {
            throw new NoNodeAvailableException("There is no available node be defined, please check you cluster url configuration.");
        }
        EsConfig.es_cluster_urls.forEach(url -> {
            try {
                client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(url.getUrl()), url.getPort()));
            } catch (UnknownHostException e) {
                log.error("Es cluster configuration throw UnkownHostException : ", e);
            }
        });
    }

    public void closeClient() {
        instance().getTransportClient().close();
    }
}

ES客戶端的增刪改查介面

public interface ESService {
    /**
     * 建立索引。
     * <p>
     * 路由如果為空,則不設定路由。
     *
     * @param index       索引名稱
     * @param type        型別名稱
     * @param id          id
     * @param jsonContent 索引內容
     * @param parent      關聯父級的欄位
     * @param routing     路由
     * @return 索引結果物件
     */
    IndexResponse index(String index, String type, String id, String jsonContent, String parent, String routing);

    /**
     * 建立索引。
     *
     * @param index       索引名稱
     * @param type        型別名稱
     * @param id          id
     * @param jsonContent 索引內容
     * @return 索引結果物件
     */
    IndexResponse index(String index, String type, String id, String jsonContent);

    /**
     * 批量建立索引。
     *
     * @param index    索引名稱,不能為空
     * @param type     索引型別,不能為空
     * @param ids      索引id列表,必須與索引內容列表一一對應
     * @param contents 索引內容列表,必須與索引id列表一一對應
     * @param parent   父級ID
     * @param routing  路由
     * @return 批量操作結果
     */
    BulkResponse bulkIndex(String index, String type, List<String> ids, List<String> contents, String parent, String routing);

    /**
     * 根據id查詢。
     *
     * @param index 索引名稱
     * @param type  型別名稱
     * @param id    id
     * @return 結果物件
     */
    GetResponse get(String index, String type, String id);

    /**
     * 根據id查詢,如果<code>routing</code>為null,則不設定路由。
     *
     * @param index   索引名稱
     * @param type    型別名稱
     * @param id      id
     * @param parent  父級
     * @param routing 路由
     * @return 結果物件
     */
    GetResponse get(String index, String type, String id, String parent, String routing);

    /**
     * 批量查詢。
     *
     * @param index 索引名稱
     * @param type  型別名稱
     * @param ids   批量查詢的id陣列
     * @return 結果物件
     */
    MultiGetResponse multiGet(String index, String type, String[] ids);

    /**
     * 更新。
     *
     * @param index       索引名稱
     * @param type        型別名稱
     * @param id          id
     * @param jsonContent 更新內容
     * @param create      不存在時是否建立,true - 建立, false - 不建立(預設值)
     * @return 結果物件
     * @throws ExecutionException
     * @throws InterruptedException
     */
    UpdateResponse update(String index, String type, String id, String jsonContent, boolean create) throws ExecutionException, InterruptedException;

    /**
     * 更新。
     * <p>
     * 如果<code>routing</code>為null,則不設定路由。
     *
     * @param index       索引名稱
     * @param type        型別名稱
     * @param id          id
     * @param jsonContent 更新內容
     * @param parent      父級
     * @param routing     路由
     * @param create      不存在時是否建立,true - 建立, false - 不建立(預設值)
     * @return 結果物件
     * @throws ExecutionException
     * @throws InterruptedException
     */
    UpdateResponse update(String index, String type, String id, String jsonContent, String parent, String routing, boolean create) throws ExecutionException, InterruptedException;

    /**
     * 批量更新。
     * <p>
     * 如果<code>routing</code>為null,則不設定路由。
     * <p>
     * 如果文件不存在,會自動建立文件。如果不希望建立文件,可以參見帶create引數的bulkUpdate方法。
     *
     * @param index    索引名稱
     * @param type     型別名稱
     * @param contents 更新內容
     * @param ids      更新的id列表
     * @param parent   父級
     * @param routing  路由
     * @return 結果物件
     */
    BulkResponse bulkUpdate(String index, String type, List<String> ids, List<String> contents, String parent, String routing);

    /**
     * 批量更新。
     * <p>
     * 如果<code>routing</code>為null,則不設定路由。
     *
     * @param index    索引名稱
     * @param type     型別名稱
     * @param contents 更新內容
     * @param ids      更新的id列表
     * @param parent   父級
     * @param routing  路由
     * @param create   不存在,則建立文件。預設false
     * @return 結果物件
     */
    BulkResponse bulkUpdate(String index, String type, List<String> ids, List<String> contents, String parent, String routing, boolean create);

    /**
     * 刪除。
     * <p>
     * 如果<code>routing</code>為null,則不設定路由。
     *
     * @param index 索引名稱
     * @param type  型別名稱
     * @param id    id
     * @return 刪除結果
     */
    DeleteResponse delete(String index, String type, String id);

    /**
     * 刪除。
     *
     * @param index   索引名稱
     * @param type    型別名稱
     * @param id      id
     * @param routing 路由
     * @param parent  關聯父級
     * @return 刪除結果
     */
    DeleteResponse delete(String index, String type, String id, String parent, String routing);

    /**
     * 批量刪除。
     *
     * @param index 索引名稱
     * @param type  型別名稱
     * @param ids   被刪除的id列表
     * @return 刪除結果
     */
    BulkResponse bulkDelete(String index, String type, List<String> ids);

    /**
     * 帶父id和路由的批量刪除。
     *
     * @param index   索引名稱
     * @param type    型別名稱
     * @param ids     id列表
     * @param parent  父id
     * @param routing 路由
     * @return 刪除結果
     */
    BulkResponse bulkDelete(String index, String type, List<String> ids, String parent, String routing);

    /**
     * 搜尋。
     *
     * @param index               索引名稱
     * @param type                型別名稱
     * @param searchSourceBuilder 設定好的結果欄位過濾器
     * @param query               設定好的查詢構建器
     * @param postFlter           設定好的查詢過濾器
     * @param from                開始位置,從0開始
     * @param size                查詢數量,預設為10
     * @param aggregationBuilders 基於查詢的聚合構建器
     * @return 搜尋結果
     */
    SearchResponse search(String index, String type,
                          SearchSourceBuilder searchSourceBuilder,
                          QueryBuilder query, QueryBuilder postFlter,
                          Integer from, Integer size,
                          AggregationBuilder[] aggregationBuilders,
                          SortBuilder[] sortBuilders);
}

介面實現參照ES官網JAVA API的文件編寫即可,search方法實現如下:

@Override
    public SearchResponse search(String index, String type,
                                 SearchSourceBuilder searchSourceBuilder,
                                 QueryBuilder query, QueryBuilder postFlter,
                                 Integer from, Integer size,
                                 AggregationBuilder[] aggregationBuilders,
                                 SortBuilder[] sortBuilders) {
        if (!StringUtils.hasLength(index))
            throw new IllegalArgumentException("Index name must not be null.");
        SearchRequestBuilder requestBuilder = getClient().prepareSearch(index);
        if (StringUtils.hasLength(type))
            requestBuilder.setTypes(type);
        if (searchSourceBuilder != null)
            requestBuilder.setSource(searchSourceBuilder);
        if (query != null)
            requestBuilder.setQuery(query);
        if (postFlter != null)
            requestBuilder.setPostFilter(postFlter);
        if (from == null)
            from = 0;
        if (size == null) // 預設為10,如果傳入0表示不顯示命中結果
            size = 10;
        requestBuilder.setFrom(from).setSize(size);
        requestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
        // 聚合
        if (aggregationBuilders != null && aggregationBuilders.length > 0) {
            for (AggregationBuilder aggregationBuilder : aggregationBuilders) {
                requestBuilder.addAggregation(aggregationBuilder);
            }
        }
        // 排序
        if (sortBuilders != null && sortBuilders.length > 0) {
            for (SortBuilder sortBuilder : sortBuilders) {
                requestBuilder.addSort(sortBuilder);
            }
        }
        // 非格式化輸出,日誌就列印一行
        Map<String, String> params = new HashMap<>();
        params.put("pretty", "false");
        ToXContent.MapParams mapParams = new ToXContent.MapParams(params);
        log.info("===================== Searching params ===================== ");
        log.info("index : " + index + ", type : " + type + ", from : " + from + ", size : " + size);
        log.info("request : " + requestBuilder.request().source().toString(mapParams));
        return requestBuilder.get();
    }

至此,ES提供的JAVA API的底層操作已經編寫完畢,供業務層呼叫。