Elasticsearch全文檢索企業開發記錄總結(二):ES客戶端搭建
阿新 • • 發佈:2019-02-12
專案依賴
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>5.4.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.24</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.1</version> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.7</version> </dependency>
配置ES叢集節點
public class EsConfig {
/**
* Es叢集名稱
*/
public static final String ES_CLUSTER_NAME = SystemConfig.get("es.cluster.name");
/**
* 已經解析的埠和地址
*/
public static List<URL> es_cluster_urls = loadUrls();
public static List<URL> loadUrls() {
String urls = SystemConfig.get("es.cluster.url" );
if (!StringUtils.hasLength(urls)) {
throw new IllegalStateException("Es cluster urls must not be empty.");
}
List<URL> ups = new LinkedList<>();
String[] urlArray = urls.split(",");
for (String up : urlArray) {
String url = up.split(":" )[0];
String port = up.split(":")[1];
ups.add(new URL(url, Integer.parseInt(port)));
}
return ups;
}
public static class URL {
private String url;
private int port;
public URL() {
}
public URL(String url, int port) {
this.port = port;
this.url = url;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
}
ES客戶端單例建立
public class ESClient {
private static Logger log = LoggerFactory.getLogger(ESClient.class);
private static ESClient instance = new ESClient();
private static TransportClient client;
private ESClient() {
newTransportClient();
}
public static ESClient instance() {
return instance;
}
public TransportClient getTransportClient() {
// client建立完成後,如果es連線失敗將會自動重連
if (client == null/* || client.connectedNodes().isEmpty()*/) {
newTransportClient();
}
return client;
}
private void newTransportClient() {
Settings settings = Settings.EMPTY;
if (StringUtils.hasLength(EsConfig.ES_CLUSTER_NAME))
settings = Settings.builder().put("cluster.name", EsConfig.ES_CLUSTER_NAME).build();
client = new PreBuiltTransportClient(settings);
if (EsConfig.es_cluster_urls.size() == 0) {
throw new NoNodeAvailableException("There is no available node be defined, please check you cluster url configuration.");
}
EsConfig.es_cluster_urls.forEach(url -> {
try {
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(url.getUrl()), url.getPort()));
} catch (UnknownHostException e) {
log.error("Es cluster configuration throw UnkownHostException : ", e);
}
});
}
public void closeClient() {
instance().getTransportClient().close();
}
}
ES客戶端的增刪改查介面
public interface ESService {
/**
* 建立索引。
* <p>
* 路由如果為空,則不設定路由。
*
* @param index 索引名稱
* @param type 型別名稱
* @param id id
* @param jsonContent 索引內容
* @param parent 關聯父級的欄位
* @param routing 路由
* @return 索引結果物件
*/
IndexResponse index(String index, String type, String id, String jsonContent, String parent, String routing);
/**
* 建立索引。
*
* @param index 索引名稱
* @param type 型別名稱
* @param id id
* @param jsonContent 索引內容
* @return 索引結果物件
*/
IndexResponse index(String index, String type, String id, String jsonContent);
/**
* 批量建立索引。
*
* @param index 索引名稱,不能為空
* @param type 索引型別,不能為空
* @param ids 索引id列表,必須與索引內容列表一一對應
* @param contents 索引內容列表,必須與索引id列表一一對應
* @param parent 父級ID
* @param routing 路由
* @return 批量操作結果
*/
BulkResponse bulkIndex(String index, String type, List<String> ids, List<String> contents, String parent, String routing);
/**
* 根據id查詢。
*
* @param index 索引名稱
* @param type 型別名稱
* @param id id
* @return 結果物件
*/
GetResponse get(String index, String type, String id);
/**
* 根據id查詢,如果<code>routing</code>為null,則不設定路由。
*
* @param index 索引名稱
* @param type 型別名稱
* @param id id
* @param parent 父級
* @param routing 路由
* @return 結果物件
*/
GetResponse get(String index, String type, String id, String parent, String routing);
/**
* 批量查詢。
*
* @param index 索引名稱
* @param type 型別名稱
* @param ids 批量查詢的id陣列
* @return 結果物件
*/
MultiGetResponse multiGet(String index, String type, String[] ids);
/**
* 更新。
*
* @param index 索引名稱
* @param type 型別名稱
* @param id id
* @param jsonContent 更新內容
* @param create 不存在時是否建立,true - 建立, false - 不建立(預設值)
* @return 結果物件
* @throws ExecutionException
* @throws InterruptedException
*/
UpdateResponse update(String index, String type, String id, String jsonContent, boolean create) throws ExecutionException, InterruptedException;
/**
* 更新。
* <p>
* 如果<code>routing</code>為null,則不設定路由。
*
* @param index 索引名稱
* @param type 型別名稱
* @param id id
* @param jsonContent 更新內容
* @param parent 父級
* @param routing 路由
* @param create 不存在時是否建立,true - 建立, false - 不建立(預設值)
* @return 結果物件
* @throws ExecutionException
* @throws InterruptedException
*/
UpdateResponse update(String index, String type, String id, String jsonContent, String parent, String routing, boolean create) throws ExecutionException, InterruptedException;
/**
* 批量更新。
* <p>
* 如果<code>routing</code>為null,則不設定路由。
* <p>
* 如果文件不存在,會自動建立文件。如果不希望建立文件,可以參見帶create引數的bulkUpdate方法。
*
* @param index 索引名稱
* @param type 型別名稱
* @param contents 更新內容
* @param ids 更新的id列表
* @param parent 父級
* @param routing 路由
* @return 結果物件
*/
BulkResponse bulkUpdate(String index, String type, List<String> ids, List<String> contents, String parent, String routing);
/**
* 批量更新。
* <p>
* 如果<code>routing</code>為null,則不設定路由。
*
* @param index 索引名稱
* @param type 型別名稱
* @param contents 更新內容
* @param ids 更新的id列表
* @param parent 父級
* @param routing 路由
* @param create 不存在,則建立文件。預設false
* @return 結果物件
*/
BulkResponse bulkUpdate(String index, String type, List<String> ids, List<String> contents, String parent, String routing, boolean create);
/**
* 刪除。
* <p>
* 如果<code>routing</code>為null,則不設定路由。
*
* @param index 索引名稱
* @param type 型別名稱
* @param id id
* @return 刪除結果
*/
DeleteResponse delete(String index, String type, String id);
/**
* 刪除。
*
* @param index 索引名稱
* @param type 型別名稱
* @param id id
* @param routing 路由
* @param parent 關聯父級
* @return 刪除結果
*/
DeleteResponse delete(String index, String type, String id, String parent, String routing);
/**
* 批量刪除。
*
* @param index 索引名稱
* @param type 型別名稱
* @param ids 被刪除的id列表
* @return 刪除結果
*/
BulkResponse bulkDelete(String index, String type, List<String> ids);
/**
* 帶父id和路由的批量刪除。
*
* @param index 索引名稱
* @param type 型別名稱
* @param ids id列表
* @param parent 父id
* @param routing 路由
* @return 刪除結果
*/
BulkResponse bulkDelete(String index, String type, List<String> ids, String parent, String routing);
/**
* 搜尋。
*
* @param index 索引名稱
* @param type 型別名稱
* @param searchSourceBuilder 設定好的結果欄位過濾器
* @param query 設定好的查詢構建器
* @param postFlter 設定好的查詢過濾器
* @param from 開始位置,從0開始
* @param size 查詢數量,預設為10
* @param aggregationBuilders 基於查詢的聚合構建器
* @return 搜尋結果
*/
SearchResponse search(String index, String type,
SearchSourceBuilder searchSourceBuilder,
QueryBuilder query, QueryBuilder postFlter,
Integer from, Integer size,
AggregationBuilder[] aggregationBuilders,
SortBuilder[] sortBuilders);
}
介面實現參照ES官網JAVA API的文件編寫即可,search方法實現如下:
@Override
public SearchResponse search(String index, String type,
SearchSourceBuilder searchSourceBuilder,
QueryBuilder query, QueryBuilder postFlter,
Integer from, Integer size,
AggregationBuilder[] aggregationBuilders,
SortBuilder[] sortBuilders) {
if (!StringUtils.hasLength(index))
throw new IllegalArgumentException("Index name must not be null.");
SearchRequestBuilder requestBuilder = getClient().prepareSearch(index);
if (StringUtils.hasLength(type))
requestBuilder.setTypes(type);
if (searchSourceBuilder != null)
requestBuilder.setSource(searchSourceBuilder);
if (query != null)
requestBuilder.setQuery(query);
if (postFlter != null)
requestBuilder.setPostFilter(postFlter);
if (from == null)
from = 0;
if (size == null) // 預設為10,如果傳入0表示不顯示命中結果
size = 10;
requestBuilder.setFrom(from).setSize(size);
requestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
// 聚合
if (aggregationBuilders != null && aggregationBuilders.length > 0) {
for (AggregationBuilder aggregationBuilder : aggregationBuilders) {
requestBuilder.addAggregation(aggregationBuilder);
}
}
// 排序
if (sortBuilders != null && sortBuilders.length > 0) {
for (SortBuilder sortBuilder : sortBuilders) {
requestBuilder.addSort(sortBuilder);
}
}
// 非格式化輸出,日誌就列印一行
Map<String, String> params = new HashMap<>();
params.put("pretty", "false");
ToXContent.MapParams mapParams = new ToXContent.MapParams(params);
log.info("===================== Searching params ===================== ");
log.info("index : " + index + ", type : " + type + ", from : " + from + ", size : " + size);
log.info("request : " + requestBuilder.request().source().toString(mapParams));
return requestBuilder.get();
}
至此,ES提供的JAVA API的底層操作已經編寫完畢,供業務層呼叫。