SSM框架整合ElasticSearch實現資料的增刪改查實戰案例
前言:
當資料量過大幾十萬或者上百萬條資料或者億萬條時,單純的mysql oracle 以及sql查詢已經無法滿足我們在效率上的需求,elasticSearch 是當下一款熱門的實時搜尋引擎基於lucense的搜尋伺服器,使用它可以完成近乎實時的資料查詢。
目錄
一、準備開發環境
- 該專案是基於Srping + SpringMVC+Mybatis的
- 1.JDK 1.8
- 2.elasticSearch5.5.1
- 需要用到的相關依賴pom檔案如下
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>5.5.3</version> </dependency> <!-- transport客戶端 --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>5.5.3</version> </dependency> <!--log4j--> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.7</version> </dependency>
二、常用操作
由於程式碼是從我的專案中拿出來的,沒有用到的程式碼直接刪除就可以了
1.工具類ESutils,用於client客戶端的關閉,建立
package com.rupeng.utlis; import java.net.InetAddress; import java.net.UnknownHostException; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import com.rupeng.service.config.ConfigInfo; /** * 用來建立關閉es客戶端 * @author 2016wlw2 徐塬峰 * 建立時間:2018年7月24日 上午10:32:28 */ public class EsUtils { /** * 獲取ElasticSearch */ public synchronized static Client getClient() { Settings settings = Settings.builder(). put("client.transport.sniff", true) .build();//自動嗅探其他叢集的ip 如果有則加入 InetSocketTransportAddress master = null; try { master = new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"),9300); TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(master); return client; } catch (UnknownHostException e) { e.printStackTrace(); throw new RuntimeException("elasticSearch Client init error 連線建立失敗"+e); } } /** * 用於關閉elasticSearch */ public static void closeClient(Client client){ if(null != client){ try { client.close(); } catch (Exception e) { throw new RuntimeException("連線關閉失敗"); } } }
1.SSM注入client客戶端
程式碼以及其引用的方式
/** * 實現自動注入elasticserach * @author Ray */ @Configuration public class ElasticSerachConfig { //自動注入client @Bean(name="client") public TransportClient esClint() throws UnknownHostException { Settings settings = Settings.builder().build(); InetSocketTransportAddress master=new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"),9300); TransportClient client =new PreBuiltTransportClient(settings).addTransportAddress(master); return client; } }
@Resource(name = "client")
private TransportClient client;
2.基本操作(查詢,增加,刪除,修改)
2.1基本查詢
public List<String> search (String query,int number)
{
long start =System.currentTimeMillis();
if(client==null)
{
client =ESUtils.getTransportClient();
}
// 定義查詢用到的型別,單獨定義
String Type[]=new String[]{INDEXKEYTEMPLATE.TYPE_COURSES
,INDEXKEYTEMPLATE.TYPE_NEWS,INDEXKEYTEMPLATE.TYPE_SEGMENTS};
SearchRequestBuilder responsebuilder = client.prepareSearch(INDEX_NAME).setTypes(Type);
List<String> hitsList=new ArrayList<String>();
String Key[]=new String[]{CoursesIndexKey.NAME,
NewsIndexKey.TITLE,
SegmentsIndexKey.NAME,
BbsIndexKey.TITLE
};//查詢用到的關鍵字,單獨定義
SearchResponse myresponse = responsebuilder
.setQuery(QueryBuilders.multiMatchQuery(query, Key))
.setFrom(0)//from 從哪條開始 ,可用於分頁操作
.setSize(number).setExplain(true).execute().actionGet();//number為查詢的條數
SearchHits hits = myresponse.getHits();
for (int i = 0; i < hits.getTotalHits(); i++) // getHits()當前查詢頁的結果
{
SearchHit hit = hits.getHits()[i];
String jsonStr=hit.getSourceAsString();
logger.info("索引庫的資料:" +jsonStr );
hitsList.add(jsonStr);
}
long end=System.currentTimeMillis();
logger.debug("為您找到相關結果約"+hits.getTotalHits()+"個----"+"耗時"+(end-start)/1000+"秒");
return hitsList;
}
2.2根據id來獲取資料
public String getOneById(String INDEX_TYPE,Long id)
{
if(client==null)
{
client =ESUtils.getTransportClient();
}
GetResponse getResponse = client.prepareGet(INDEX_NAME,INDEX_TYPE,String.valueOf(id))
.execute()
.actionGet();
if(getResponse!=null)
{
logger.debug(getResponse.getSourceAsString());
String jsonStr=getResponse.getSourceAsString();
return jsonStr;
}
else
{
logger.error("找不到id對應的資料");
return null;
}
}
2.3精確匹配 方法和1類似 不過替換為MatchPhraseQuery
2.4建立操作 分批次提交資料
基本思路,從資料庫獲取資料,並將資料庫裡的資料插入到elasticSearch搜尋伺服器,資料量由於比較大,所以
採用了分批次提交
public boolean createAll(List<T> pojoList,String INDEX_TYPE) throws InterruptedException, ExecutionException, NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, NoSuchFieldException
{
long start =System.currentTimeMillis();
Gson gson = new Gson();
if(client==null)
{
client =ESUtils.getTransportClient();
}
BulkRequestBuilder bulkRequest = client.prepareBulk();
for(T pojo:pojoList)
{
// 通過反射獲得方法getID 設定主鍵
// pojo.getClass().getMethod("getId", Long.class);
Field FieldId=pojo.getClass().getDeclaredField("id");
//設定可讀寫
FieldId.setAccessible(true);
Object id = FieldId.get(pojo);
IndexRequest request=client.prepareIndex(INDEX_NAME,INDEX_TYPE).setId(String.valueOf(id))
.setSource(gson.toJson(pojo),XContentType.JSON).request();
bulkRequest.add(request);
if(bulkRequest.numberOfActions()==1000)
{
bulkRequest.execute().get();//get的實現就是呼叫executeget
bulkRequest=client.prepareBulk();//建立一個物件
logger.debug("已提交1000條資料");
}
}
if(bulkRequest.numberOfActions()>0)//最後一批不足一千的再提交一次
{
bulkRequest.execute().get();//
long end=System.currentTimeMillis();
logger.debug("最後一批已提交,"+"耗時"+(end-start)/1000+"秒");
}
ESUtils.close(client);
return true;
}
2.6刪除操作
2.6.1刪除整個索引庫
/** 注意 :該操作會刪除整個索引庫
*
* @param INDEX_NAME
*/
public void deleteIndex(String INDEX_NAME)
{
if(client==null)
{
client =ESUtils.getTransportClient();
}
TransportResponse resp=client.admin().indices().prepareDelete(INDEX_NAME).get();
if(resp!=null)
{
logger.debug("索引庫刪除成功");
ESUtils.close(client);
}
else
{
logger.error("索引刪除失敗");
ESUtils.close(client);
}
ESUtils.close(client);
}
2.6.2 刪除指定id
如果需要刪除單個型別裡的多個文件,則可以使用for迴圈刪除 或者其他 只要思路正確就ok
/**
* 根據id的值刪除文件
* @param id
* @param INDEX_TYPE
* @throws UnknownHostException
*/
public void removeOneById(Long id,String INDEX_TYPE)
{
if(client==null)
{
client =ESUtils.getTransportClient();
}
DeleteResponse response = client.prepareDelete(INDEX_NAME, INDEX_TYPE, String.valueOf(id)).get();
String index = response.getIndex();
String type = response.getType();
String typeId = response.getId();
long version = response.getVersion();
logger.debug("刪除一條文件"+index + " : " + type + ": " + typeId + ": " + version);
ESUtils.close(client);
}
2.6.3 刪除指定分類
public boolean deleteType(String indexName, String type) {
client = EsUtils.getClient();
QueryBuilder builder = QueryBuilders.typeQuery(type);
DeleteByQueryAction.INSTANCE.newRequestBuilder(client).source(indexName).filter(builder).execute().actionGet();
EsUtils.closeClient(client);
return true;
}
2.7插入一條資料
/**
* 建立一條或者更新索引
* @param esMixedDataDto
* @return
*/
public boolean upsertDocument(T pojo,String INDEX_TYPE) {
client =ESUtils.getTransportClient();
Gson gson = new Gson();
Field FieldId;
try {
FieldId = pojo.getClass().getDeclaredField("id");
FieldId.setAccessible(true);
Object id = FieldId.get(pojo);
IndexResponse indexRes=client.prepareIndex(INDEX_NAME,INDEX_TYPE).setId(String.valueOf(id))
.setSource(gson.toJson(pojo),XContentType.JSON).get();
String index = indexRes.getIndex();
String type = indexRes.getType();
String typeId = indexRes.getId();
logger.debug("插入成功"+index + " : " + type + ": " + typeId + ": " );
ESUtils.close(client);
return true;
}
catch (NoSuchFieldException | SecurityException e) {
// TODO Auto-generated catch block
logger.warn("插入失敗");
e.printStackTrace();
} catch (IllegalArgumentException e) {
logger.warn("插入失敗");
e.printStackTrace();
} catch (IllegalAccessException e) {
logger.warn("插入失敗");
e.printStackTrace();
}
return false;
}
三、查詢結果高亮顯示
java程式碼實現高亮顯示
public Map<String,Object> hlSearch (String query,int number)
{
long start =System.currentTimeMillis();
if(client==null)
{
client =ESUtils.getTransportClient();//建立連線
}
String Type[]=new String[]{INDEXKEYTEMPLATE.TYPE_COURSES
,INDEXKEYTEMPLATE.TYPE_NEWS,INDEXKEYTEMPLATE.TYPE_SEGMENTS};
SearchRequestBuilder requestBuilder = client.prepareSearch(INDEX_NAME).setTypes(Type);
String Key[]=new String[]{
CoursesIndexKey.NAME,
NewsIndexKey.TITLE,
SegmentsIndexKey.NAME,
BbsIndexKey.TITLE,
};
SearchRequestBuilder searchRequestBuilder = requestBuilder
.setQuery(QueryBuilders.multiMatchQuery(query, Key))
.setFrom(20). //從那行開始
setSize(30).setExplain(true);//.actionGet()==get()
Map<String,Object> msgMap = new HashMap<String,Object>();
HighlightBuilder highlightBuilder = new HighlightBuilder().field("*").requireFieldMatch(false);
highlightBuilder.preTags("<span style=\"color:red\">");
highlightBuilder.postTags("</span>");
searchRequestBuilder.highlighter(highlightBuilder);
SearchResponse response = searchRequestBuilder.get();
List<Map<String,Object>> result = new ArrayList<>();
for (SearchHit hit:response.getHits()) // getHits()當前查詢頁的結果
{
Map<String, Object> source = hit.getSource();
//建立map 存放高亮欄位 並返回到物件中
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
//處理高亮欄位
HighlightField nameField = highlightFields.get("title");
if(nameField!=null){
Text[] fragments = nameField.fragments();
String nameTmp ="";
for(Text text:fragments){
nameTmp+=text;
source.put("title",nameTmp);
source.put("content",nameTmp);
}
source.put("title",nameTmp);
}
result.add(source);
}
//封裝資料返回
msgMap.put("itemsList",result); //搜尋結果
//msgMap.put("page","page"); //分頁
msgMap.put("took",response.getTook().getSecondsFrac()); //獲取響應需要的時間
return msgMap;
}
Controller層
@RequestMapping(value="/hlsearch.do",method=RequestMethod.POST)
public @ResponseBody AjaxResult hlsearch(String text)
{
Map<String,Object> msg=searchService.hlSearch(text, 10);
for(Map.Entry<String, Object> entry :msg.entrySet())
{
System.out.println("Key:"+entry.getKey()+"Value:"+entry.getValue());
}
return new AjaxResult("Success",msg);
}
前端程式碼(vue.js)
search:function()
{
var that=this;
that.results=[];
axios.post('./hlsearch.do?text='+this.text)
.then(function(resp){
var results=resp.data.data.itemsList;
if(results.length==0)
{
alert("未找到匹配結果");
that.message="<font size='2' color='CCCCCC'>找到約0 條結果!</font>";
}
else
{
that.message="<font size='2' color='CCCCCC'>找到約 "+results.length+" 條結果!</font>";
for(var i=0;i<results.length;i++)
{
that.results.push(resp.data.data.itemsList[i]);
}
}
})
.catch(function(error){alert("ajax錯誤"+error);})
}}
四.效果展示
五、後記:
1.前端不要用ajax請求來獲取資料,非同步獲取資料在併發環境下 會導致前端展示結果 錯亂的情況,所以
建議大家使用返回檢視的方式來展示資料,這樣效果更好
2.生產環境下 client必須關閉,否則會導致資源洩露,記憶體溢位等問題。但是client 獲取連線的時間較慢依然是一個大問題
獲取一個client平均需要2秒左右 ,這樣查詢快的意義就沒有了。所以應該儘量採用es連線池的方式來建立client 客戶端。
連線池程式碼(參考)
package com.rupeng.elasticpools;
import java.net.InetAddress;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import com.rupeng.service.config.ConfigInfo;
/**
* 每一次建立es的連線都太耗費時間 我們應該自己去封裝一個es的連線池
* @author 2016wlw2 徐塬峰 建立時間:2018年7月31日 上午10:00:32
*/
public class ElasticSearchFactory implements PooledObjectFactory<Client>{
private int elasticPool_maxTotal = Integer.parseInt(ConfigInfo.elasticPool_maxTotal.trim());
private int elasticPool_maxIdle = Integer.parseInt(ConfigInfo.elasticPool_maxIdle.trim());
private String elasticSearch_addr ="127.0.0.1";// ConfigInfo.elasticSearch_addr;
private int elasticSearch_port = Integer.parseInt(ConfigInfo.elasticSearch_port.trim());
private int elasticPool_minIdle = Integer.parseInt(ConfigInfo.elasticPool_minIdle.trim());
private int maxWaitMilis = Integer.parseInt(ConfigInfo.elasticPool_maxWaitMilis.trim());
private static GenericObjectPool<Client> pool;// 連線池
static
{
ElasticSearchFactory fa=new ElasticSearchFactory();
fa.createElasticSearchPool();
}
/**
* 建立連線池
*/
public void createElasticSearchPool()
{
ElasticSearchFactory fac = new ElasticSearchFactory();// 建立工廠
GenericObjectPoolConfig conf = new GenericObjectPoolConfig();// 配置檔案
conf.setMaxTotal(elasticPool_maxTotal);// 設定執行緒池中最大的數量
conf.setMaxIdle(elasticPool_maxIdle);// 設定最大的空閒時間
conf.setMinIdle(elasticPool_minIdle);// 設定最小空閒連線
conf.setMaxWaitMillis(maxWaitMilis);// 設定最大等待時間
try {
//先建立三個連線
fac.makeObject();
fac.makeObject();
fac.makeObject();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
pool = new GenericObjectPool<Client>(fac, conf);// 建立連線池
}
// 模擬物件建立的過程
@Override // 通過ElastcSearchConnections 來建立連線
public PooledObject<Client> makeObject() throws Exception {
Settings settings = Settings.builder().put("client.transport.sniff", true).build();// 自動嗅探其他叢集的ip
InetSocketTransportAddress master = null;
master = new InetSocketTransportAddress(InetAddress.getByName(elasticSearch_addr), elasticSearch_port);
TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(master);
System.out.println("太棒了,物件建立成功了!");
return new DefaultPooledObject<Client>(client);
}
public static GenericObjectPool<Client> getPool() {
return pool;
}
//銷燬
@Override
public void destroyObject(PooledObject<Client> p) throws Exception {
p.getObject().close();
}
@Override
public boolean validateObject(PooledObject<Client> p) {
// TODO Auto-generated method stub
return false;
}
@Override
public void activateObject(PooledObject<Client> p) throws Exception {
}
@Override
public void passivateObject(PooledObject<Client> p) throws Exception {
System.out.println("passivate Object"+p.toString());
}
}
3.elasticSearch 已不再維護transport 客戶端 儘量使用rest client客戶端來進行開發。。。。。。。
以上就是我歸納的對elasticsearch5.5.3版本的java實現,如果有哪些地方有問題,歡迎大家留言指正,感激不盡!