1. 程式人生 > >第一節 ElasticSearch瞭解及初步使用

第一節 ElasticSearch瞭解及初步使用

一、ElasticSearch簡介 ElasticSearch是一個基於Lucene的搜尋伺服器。它提供了一個分散式多使用者能力的全文搜尋引擎,基於RESTful web介面並且提供java api。Elasticsearch是用Java開發的,並作為Apache許可條款下的開放原始碼釋出,是當前流行的企業級搜尋引擎。 1.1瞭解下Lucene apache Lucene是一套用於全文搜尋的開源程式庫,具有成熟、高效能、可擴充套件、輕量級及強大的功能。區別於搜尋引擎,Lucene是一整套工具,如果直接使用則需要開發的東西比較多。 1.1.1文件(document):索引與搜尋的主要資料載體,包含一個或多個欄位。類比於資料庫的一行資料
1.1.2欄位(field):文件的一個片段,包括欄位名稱和內容(類比於表字段) 1.1.3詞項(term):搜尋時的一個單位,代表文字中的某個詞。 1.1.4詞條(token):詞項在欄位中的一次出現,包括詞項的文字、開始和結束的位移以及型別。 1.1.5倒排索引:倒排索引源於實際應用中需要根據屬性的值來查詢記錄。這種索引表中的每一項都包括一個屬性值和具有該屬性值的各記錄的地址。由於不是由記錄來確定屬性值,而是由屬性值來確定記錄的位置,因而稱為倒排索引(inverted index)。帶有倒排索引的檔案我們稱為倒排 索引檔案。
1.1.6 fieldData/docValue:建立索引檔案的所有文件的某個欄位會被單獨儲存起來。 對於這塊,Lucene 經歷了兩階段的發展。第一階段是fieldData ,查詢時從倒排索引反向構成doc-term。 另外,還有段檔案、分析器、過濾器、分詞器、迭代器等概念就不在這裡一一描述了。
1.2ElasticSearch的基本概念 1.2.1索引:儲存資料的地方,類比於一個數據庫例項。可以向索引寫入文件或讀取文件。ElasticSearch中的索引可能由一個或多個lucene索引構成,具體細節由ElasticSearch的索引分片(shard)、複製(replica)機制及其配置決定。
1.2.2型別(type):類比於資料庫中的一張表(表名)。每一個文件都有與之對應的型別定義,所以在一個索引中可以儲存多種文件型別,併為不同的文件型別提供不同的對映。 1.2.3文件(document):類比於表中的一行資料。elasticsearch世界中的主要實體,文件由一個或多個欄位構成,從客戶端的角度看,文件是一個json串。 1.2.4對映(mapping):所有文件在寫入索引前都需要先進行分析。使用者可以設定一些引數,來決定如何將輸入文字分割為詞條,哪些詞條應該被過濾掉,哪些附加處理是必要的(比如移除HTML標籤等).此外,elasticsearch也提供了各種特性,如排序時所需的欄位內容資訊。這就是對映扮演的角色,儲存所有這種元資訊。 1.2.5節點(node):單個的ElasticSearch服務例項稱為節點,一般部署一個ElasticSearch節點就足以應付大多數簡單應用,但考慮到容錯性或在資料膨脹到單機無法應付這些狀況的時候,更傾向於使用多節點的叢集。 1.2.6叢集(cluster):當資料量或查詢壓力超過單機負載時,需要多個節點來協同處理,所有這些節點組成的系統稱為叢集。叢集同時也是無間斷提供服務的一種解決方案。 1.2.7分片(shard):叢集允許系統儲存的資料總量超過單機容量。為了滿足這個需求,ElasticSearch將資料散佈到多個物理lucene索引上。這些lucene索引稱為分片(shard),而散佈這些分片的過程稱為分片處理(sharding)。ElasticSearch會自動完成分片處理,並且讓這些分片呈現一個大索引的樣子。除了ElasticSearch本身自動進行分片處理外,使用者為具體的應用進行引數調優也至關重要,因為分片的數量在索引建立時就已經配置好,而且之後無法改變。 1.2.8副本(replica):副本則解決了訪問壓力過大時單機無法處理所有請求的問題。即為每個分片建立冗餘的副本,處理查詢時可以把這些副本用作最初的主分片(primary shard).即使某個分片所在的節點空壓機,ElasticSearch也可以使用鞭副本,從而不會造成資料丟失,而且支援在任意時間點新增或移除副本。 1.2.9閘道器(gateway):在ElasticSearch的工作過程中,關於叢集狀態,索引設定的各種資訊都會被收集起來,並在閘道器中被持久化。
1.3ElasticSearch的架構 1.3.1合理的預設配置:使用者在簡單安裝以後能直接使用。 1.3.2預設分散式工作模式:每個節點總是假定自己是某個叢集的一部分或將是某個叢集的一部分,一旦工作啟動節點便會加入某個叢集。 1.3.3對等架:可以避免單點故障。節點會自動連線到叢集中的其他節點,進行相互的資料交換和監控操作。包括自動複製。 1.3.4易於擴充新節點 1.3.5沒有對索引中的資料結構強加任何限制,從而可以隨時調整資料模型。 1.3.6準實時搜尋和版本同步。考慮到ElasticSearch的分散式特性,查詢延遲和節點之間臨時的資料不同步是難避免的。 1.3.7當ElasticSearch節點啟動時,它使用廣播技術(也可配置為單播)來發現同一個叢集中的其他節點並與它們連線。叢集中會有一個節點被選為管理節點。該節點負責叢集的狀態管理以及在叢集拓撲變化時做出反應,分發索引分片至叢集的相應節點上。 1.3.8故障檢測:管理節點會監控所有的可用節點,檢查他們是否正在工作(傳送ping請求,某未響應則節點 從叢集中移除) 1.3.9與ElasticSearch通訊:REST API或java api 1.3.10索引資料:ElasticSearch允許使用REST API向伺服器傳送文件(HTTP協議)。或使用bulk API或UDP bulk API來一次性發送多個文件(UDP協議)。再者是使用外掛傳送資料稱為河流(river),河流執行在ElasticSearch節點上,能夠從外部系統獲取資料。建索引只會發生在主分片上。 1.3.11查詢資料:可以使用查詢語言DSL(基於JSON的可用於構建複雜查詢的語言),查詢型別包括:詞項查詢、短語查詢、範圍查詢、模糊查詢、萬用字元查詢等。通過組合簡單查詢構建複雜查詢。查詢分為兩個階段:分散階段和合並階段。分散階段將查詢分發到包含相關文件的多個分片中去執行查詢,合併階段則從眾多分片中收集返回結果,然後進行合併,排序及後續處理等,最後返回給客戶端。 1.3.12索引配置:ElasticSearch提供了一些功能使得使用者能手動配置,例如使用者可以通過對映來配置自定義的文件結構,設定索引的分片和副本數,定製文字分析過程等等。 1.3.13系統管理和監控:ElasticSearch提供了系統管理與監控相關的API。

二、部署測試 2.1下載 可以通過: https://www.elastic.co/cn/downloads/elasticsearch 地址下載各版本。 JAVA API: https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html 2.2安裝 1.首先要保障具有java執行環境,最好是java 1.8及以上版本 2.解壓ElasticSearch到指定目錄即可:比如(E:\software\elasticsearch-5.3.0) 3.直接進入bin目錄,window下執行elasticsearch.bat即可啟動 4.啟動完成後,在瀏覽器中輸入: http://localhost:9200/ 有一段json返回值,說明啟動成功 5.接下來就可以使用http request工具(比如谷歌的postman或火狐的poaster)向伺服器新增或查詢文件。 新增: http://localhost:9200/es/t_user/2 -d {JSON資料} 查詢:http://localhost:9200/es/_search?pretty
三、JAVA API的使用 3.1新建maven工程並引入 elasticsearch的jar包 測試示例程式碼: public class ElsUtil {   private static Logger logger = LoggerFactory.getLogger(ElsUtil.class);   private static TransportClient client;   public static TransportClient getClient(){   Settings settings = Settings.builder()   .put("cluster.name", "myApplication").build();   try { TransportClient client = new PreBuiltTransportClient(settings) .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("localhost"), 9300)); ElsUtil.client = client; return client; } catch (UnknownHostException e) { e.printStackTrace(); } return null; } public static void closeClient(){ if(client != null) { client.close(); } } public static void index(){ Map<String,Object> map = Maps.newHashMap(); map.put("id","1"); map.put("name","java customer"); map.put("desc","這個是使用java api新增的7人"); IndexResponse response = client.prepareIndex("es", "t_user") .setVersion(7l).setVersionType(VersionType.EXTERNAL)//可以指定外部版本號,作為此資料的版本號 .setSource(map).setId(String.valueOf(map.get("id"))) .execute().actionGet(); if(response != null) { System.out.println(response.getVersion()); } } public static void updateRequest(){ List<Map<String,Object>> list = getDataList(); for(Map<String,Object> map:list) { UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("es"); updateRequest.type("t_user"); updateRequest.id("5"); Map<String,Object> updMap = new HashMap<>(); updMap.put("name","只更新name"); updateRequest.doc(updMap); client.update(updateRequest).actionGet(); } } public static void updatePrepare(){ List<Map<String,Object>> list = getDataList(); Map<String,Object> updMap = new HashMap<>(); updMap.put("name","只更新11nam測試lll9"); updMap.put("desc","更新11name7788"); for(Map<String,Object> map:list) { //client.prepareUpdate("es", "t_user", "5") // .setDoc(updMap).get();//可以同時更新多個欄位 //.setScript(new Script("ctx._source.name = \"prepare更新\""))//只能更新一個欄位 //.setScript(new Script("ctx._source.desc = \"這個是特殊更新的\"")) //.get();//加上才會真正的更新 client.prepareUpdate("es", "t_user", "7") .setVersion(5l).setVersionType(VersionType.INTERNAL) .setScript(new Script("ctx._source.name = \"只更新11nam測試l7667377ll9\"")) .get(); } } public static void updatePrepareAdd(){ List<Map<String,Object>> list = getDataList(); Map<String,Object> updMap = new HashMap<>(); updMap.put("name","只更新11nam測試更新不了就新加"); updMap.put("desc","更新11name7788新加"); updMap.put("version",6); for(Map<String,Object> map:list) { IndexRequest indexRequest = new IndexRequest("es", "t_user", "6") .source(updMap); UpdateRequest updateRequest = new UpdateRequest("es", "t_user", "6") .script(new Script("ctx._source.id = \"6\""))//更新原不包含的欄位時 會新新增 .upsert(indexRequest); UpdateResponse response = client.update(updateRequest).actionGet(); System.out.println(response.getVersion()); } } public static void delete(){ //版本號(插入,刪除): //VersionType.INTERNAL 內部版本號,只有等於當前版本號才可以進行操作 //VersionType.EXTERNAL 外部版本號,只有大於當前版本號才可以進行操作,且update不支援此型別 DeleteResponse response = client.prepareDelete("es", "t_user", "7") .setVersion(7l) .setVersionType(VersionType.EXTERNAL)//外部版本號,或內部版本號 .get(); } public static void deleteByQuery(){ DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = DeleteByQueryAction.INSTANCE.newRequestBuilder(client); deleteByQueryRequestBuilder.source().setIndices("es").setTypes("t_user"); //為什麼提供了.source(“es”)設定setIndices,而沒有提供setTypes?? //同步刪除 //BulkByScrollResponse response = // deleteByQueryRequestBuilder.filter(QueryBuilders.matchQuery("name", "java")) // .get(); //long deleted = response.getDeleted(); //logger.info("delete:{}",deleted); //非同步刪除 deleteByQueryRequestBuilder.filter(QueryBuilders.matchQuery("name", "java")) .execute(new ActionListener<BulkByScrollResponse>() { @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { long deleted = bulkByScrollResponse.getDeleted(); logger.info("非同步delete:{}",deleted); } @Override public void onFailure(Exception e) { logger.info("非同步delete"); logger.error("非同步刪除錯誤:{}",e); } }); } public static void getAsQuery(){ GetResponse response = client.prepareGet("es","t_user","2").get(); if(response.isExists()) { logger.info("getAsQueryResponseMap:{}",response.getSourceAsMap()); logger.info("getAsQueryResponse:{}",response.getSourceAsString()); } //多個ID查詢 MultiGetResponse multiGetResponse = client.prepareMultiGet() .add("es","t_user","2") .add(new MultiGetRequest.Item("es","t_user","5")).get(); for (MultiGetItemResponse itemResponse : multiGetResponse) { GetResponse getResponse = itemResponse.getResponse(); if (getResponse.isExists()) { logger.info("MultiGetResponse:{}",getResponse.getSourceAsString()); } } } public static void query(){ SearchResponse searchResponse = client.prepareSearch().setIndices("es").setTypes("t_user") .setQuery(QueryBuilders.disMaxQuery() .add(QueryBuilders.termQuery("name","更新")) .add(QueryBuilders.matchQuery("name","更新")) .add(QueryBuilders.prefixQuery("name","java"))) .setFrom(0).setSize(2)//分頁 .execute().actionGet(); logger.info("查詢的結果:{}",searchResponse); } /** * 批處理查詢 */ public static void multiQuery(){ MultiSearchResponse multiSearchResponse = client.prepareMultiSearch() .add(client.prepareSearch("es").setQuery(QueryBuilders.disMaxQuery() .add(QueryBuilders.termQuery("name","更新")) .add(QueryBuilders.matchQuery("name","更新")) .add(QueryBuilders.prefixQuery("name","李"))).request()) .add(client.prepareSearch("es") .setQuery(QueryBuilders.queryStringQuery("四")).request()) .execute().actionGet(); logger.info("查詢的結果:{}",multiSearchResponse); } /** * 批量處理 */ public static void bulkHandler(){ BulkResponse bulkResponse = client.prepareBulk() .add(client.prepareIndex("es","t_user","9").setSource(getDataList().get(0)).request()) .add(client.prepareDelete("es","t_customer","9").request()) .execute().actionGet(); if(bulkResponse.hasFailures()){ logger.info("批處理的錯誤結果:{}",bulkResponse); } } public static void highlightQuery(){ HighlightBuilder highlightBuilder = new HighlightBuilder() .field("name").requireFieldMatch(false) .highlightQuery(QueryBuilders.matchQuery("name","1add")) .preTags(Const.HIGHLIGHT_PRE_TAGS) .postTags(Const.HIGHLIGHT_POST_TAGS); SearchResponse searchResponse = client.prepareSearch().setIndices("es").setTypes("t_user") .setQuery(QueryBuilders.disMaxQuery() .add(QueryBuilders.termQuery("name","更新")) .add(QueryBuilders.matchQuery("name","更新")) .add(QueryBuilders.prefixQuery("name","java"))) .highlighter(highlightBuilder) .setFrom(0).setSize(5)//分頁 .execute().actionGet(); logger.info("查詢的結果:{}",searchResponse); } public static void countQuery(){ SearchResponse searchResponse = client.prepareSearch().setIndices("es").setTypes("t_user") .setQuery(QueryBuilders.disMaxQuery() .add(QueryBuilders.termQuery("name","更新")) .add(QueryBuilders.matchQuery("name","更新")) .add(QueryBuilders.prefixQuery("name","java"))) .setSize(0)//不要資料 .execute().actionGet(); logger.info("查詢的結果:{}",searchResponse.getHits().getTotalHits()); logger.info("查詢的結果:{}",searchResponse); } public static List<Map<String,Object>> getDataList(){ List<Map<String,Object>> list = new ArrayList<>(); Map<String,Object> map = Maps.newHashMap(); map.put("id","1"); map.put("name","java 1add是的"); map.put("desc","1這個是使用java api新增的人修改了一次"); list.add(map); return list; }}