1. 程式人生 > >Elasticsearch+hbase 實現hbase中資料的快速查詢(二)

Elasticsearch+hbase 實現hbase中資料的快速查詢(二)

接下來是Elasticsearch (版本5.x)中資料的CRUD 操作,為此,根據ES官網上的資料總結了一個工具類.
具體如下:
(1)maven 新增依賴
(2)工具類程式碼:

public class ESClientUtils {
    protected static Logger logger = Logger.getLogger(ESClientUtils.class);
    protected static TransportClient client;
    private static String es_node1_host = "";    
    private
static String es_node1_port = ""; static { String configName = "/hbase_elasticsearch.properties"; PropertiesLoader propertiesLoader = new PropertiesLoader(configName); try { es_node1_host = propertiesLoader.getProperty("es_node1_host"); es_node1_port = propertiesLoader.getProperty("es_node1_port"
); } catch (Exception e) { e.printStackTrace(); } } /** * @Title: setUp * @Description: TODO(建立client連線) * @param: @throws Exception * @return: void * @throws */ @Before public static void setUp() throws
Exception { Settings esSettings = Settings.builder() .put("cluster.name", "utan-es") //設定ES例項的名稱 .put("client.transport.sniff", true) //自動嗅探整個叢集的狀態,把叢集中其他ES節點的ip新增到本地的客戶端列表中 .build(); /** * 這裡的連線方式指的是沒有安裝x-pack外掛,如果安裝了x-pack則參考{@link ElasticsearchXPackClient} * 1. java客戶端的方式是以tcp協議在9300埠上進行通訊 * 2. http客戶端的方式是以http協議在9200埠上進行通訊 */ /*client = new PreBuiltTransportClient(esSettings.EMPTY) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.0.0.12"), Integer.parseInt("9300")));*/ client = new PreBuiltTransportClient(esSettings.EMPTY) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(es_node1_host), Integer.parseInt(es_node1_port))); System.out.println("ElasticsearchClient 連線成功"); } public ESClientUtils() throws Exception{ setUp(); } @After public static void closeClient() throws Exception { if (client != null) { client.close(); } } /** * @Title: getQueryResponse * @Description: TODO(查詢資料,精確查詢) * @param: @throws Exception * @return: void * @throws */ public static GetResponse getQueryResponse(String indexName, String type, String id) throws Exception{ setUp(); // 搜尋資料 GetResponse response = client.prepareGet(indexName, type, id).execute().actionGet(); // 輸出結果 System.out.println(response.getSourceAsString()); // 關閉client closeClient(); return response; } /** * @Title: getQueryResSource * @Description: TODO(查詢資料,返回Source,與getQueryResponse搭配使用) * @param: @param response * @param: @return * @param: @throws Exception * @return: Map<String,Object> * @throws */ public static Map<String, Object> getQueryResSource(GetResponse response) throws Exception{ setUp(); //查詢新增的索引 // Index name String _index = response.getIndex(); // Type name String _type = response.getType(); // Document ID (generated or not) String _id = response.getId(); // Version (if it's the first time you index this document, you will get: 1) long _version = response.getVersion(); Map<String, GetField> fields = response.getFields(); for (Entry<String, GetField> fieldsEach : fields.entrySet()) { System.out.println("fieldsEach.key:"+fieldsEach.getKey() + " " + "fieldsEach.value:"+fieldsEach.getValue().toString()); } Map<String, Object> source = response.getSource(); for (Entry<String, Object> sourceEach : source.entrySet()) { System.out.println("sourceEach.key:"+sourceEach.getKey() + " " + "sourceEach.value:"+sourceEach.getValue()); } // 關閉client closeClient(); return source; } /** * @Title: getSearchResponse * @Description: TODO(查詢資料,多條件查詢) * @param: @throws Exception * @return: void * @throws */ public static SearchResponse getSearchResponse(String indexName, String type, Map<String, Object> queryCondition, Map<String, Object> filterCondition) throws Exception{ setUp(); //TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("", ""); MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("", ""); for (Entry<String, Object> conditionEach : queryCondition.entrySet()) { matchQueryBuilder = QueryBuilders.matchQuery(conditionEach.getKey(), conditionEach.getValue()); } SearchResponse response = client.prepareSearch(indexName) .setTypes(type) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setQuery(matchQueryBuilder) // Query //.setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18)) // Filter .setFrom(0).setSize(60).setExplain(false) //true,有結果解釋;false,沒有 .get(); closeClient(); // 輸出結果 SearchHit[] hits = response.getHits().hits(); for(int i=0;i<hits.length;i++){ SearchHit hiti = hits[i]; System.out.println(hiti.getSourceAsString()); } System.out.println(response.toString()); return response; } /** * @Title: getMultiSearchResponse * @Description: TODO(多條件搜尋) * @param: @param indexName * @param: @param type * @param: @param queryCondition * @param: @param filterCondition * @param: @return * @param: @throws Exception * @return: MultiSearchResponse * @throws */ public static MultiSearchResponse getMultiSearchResponse(String indexName, String type, Map<String, Object> queryCondition, Map<String, Object> filterCondition) throws Exception{ setUp(); SearchRequestBuilder srb1 = client .prepareSearch().setQuery(QueryBuilders.queryStringQuery("elasticsearch")).setSize(1); SearchRequestBuilder srb2 = client .prepareSearch().setQuery(QueryBuilders.matchQuery("name", "kimchy")).setSize(1); MultiSearchResponse sr = client.prepareMultiSearch() .add(srb1) .add(srb2) .get(); // You will get all individual responses from MultiSearchResponse#getResponses() long nbHits = 0; for (MultiSearchResponse.Item item : sr.getResponses()) { SearchResponse response = item.getResponse(); nbHits += response.getHits().getTotalHits(); } closeClient(); return sr; } public static void createIndex(String indexName) throws Exception{ setUp(); client.admin().indices().prepareCreate(indexName).get(); closeClient(); } /** * @Title: insertData * @Description: TODO(插入一條) * @param: @param indexName * @param: @param json * @param: @return * @param: @throws Exception * @return: IndexResponse * @throws */ public static IndexResponse insertData(String indexName, String type, String id, JSONObject json) throws Exception{ setUp(); IndexResponse response = client.prepareIndex(indexName, type, id) .setSource(json) .get(); //查詢新增的索引 // Index name String _index = response.getIndex(); // Type name String _type = response.getType(); // Document ID (generated or not) String _id = response.getId(); // Version (if it's the first time you index this document, you will get: 1) long _version = response.getVersion(); // status has stored current instance statement. RestStatus status = response.status(); closeClient(); return response; } /** * @Title: batchInsertData * @Description: TODO(批量插入) * @param: @param indexName * @param: @param type * @param: @param jsonArr * @param: @return * @param: @throws Exception * @return: BulkResponse * @throws */ public static BulkResponse batchInsertData(String indexName, String type, JSONArray jsonArr) throws Exception{ setUp(); BulkRequestBuilder bulkRequest = client.prepareBulk(); for(int i=0;i<jsonArr.size();i++){ JSONObject jsonObj = jsonArr.getJSONObject(i); //插入單條 bulkRequest.add(client.prepareIndex(indexName, type) .setSource(jsonObj) ); } BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // //處理失敗 return null; } closeClient(); return bulkResponse; } /** * @Title: updateData * @Description: TODO(更新單條資料) * @param: @param indexName * @param: @param type * @param: @param id * @param: @param json * @param: @return * @param: @throws Exception * @return: UpdateResponse * @throws */ public static UpdateResponse updateData(String indexName, String type, String id, JSONObject json) throws Exception{ setUp(); UpdateResponse updateResponse = client.prepareUpdate(indexName, type, id) .setDoc(json.toString(),XContentType.JSON).get(); closeClient(); return updateResponse; } /** * @Title: deleteIndex * @Description: TODO(刪除索引) * @param: @param indexName * @param: @return * @param: @throws Exception * @return: boolean * @throws */ public static boolean deleteIndex(String indexName) throws Exception{ setUp(); IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(indexName); IndicesExistsResponse inExistsResponse = client.admin().indices() .exists(inExistsRequest).actionGet(); if(inExistsResponse.isExists()){ DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(indexName) .execute().actionGet(); if(dResponse.isAcknowledged()){ closeClient(); return true; } closeClient(); return false; } closeClient(); return false; } /** * @Title: deleteData * @Description: TODO(這裡用一句話描述這個方法的作用) * @param: @return * @param: @throws Exception * @return: DeleteResponse * @throws */ public static DeleteResponse deleteData(String indexName, String type, String id) throws Exception{ setUp(); DeleteResponse response = client.prepareDelete(indexName, type, id).get(); closeClient(); return response; } /** * @throws Exception * @Title: DeleteByQueryAction * @Description: TODO(通過查詢條件刪除資料) * @param: @param indexName * @param: @param type * @return: void * @throws */ public static void DeleteByQueryAction(String indexName, String type) throws Exception{ } public static void main(String[] args) { String indexName = "testindex"; String type = "person"; String id = "1"; Map<String, Object> queryCondition = new HashMap<String, Object>(); queryCondition.put("user", "李"); Map<String, Object> filterCondition = new HashMap<String, Object>(); try { /*******************************精確查詢單條資料********************************/ //getQueryResponse(indexName, type, id); /*******************************精確查詢單條資料********************************/ /*******************************匹配查詢資料********************************/ getSearchResponse(indexName, type, queryCondition, filterCondition); /*******************************匹配查詢資料********************************/ } catch (Exception e) { e.printStackTrace(); } /*******************************插入單條資料********************************/ /*JSONObject dataJson = new JSONObject(); dataJson.put("user", "張三"); dataJson.put("sex", "男"); dataJson.put("desc", "工程師"); try { insertData(indexName, type, id, dataJson); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }*/ /*******************************插入單條資料********************************/ /*******************************批量插入資料********************************/ /*JSONObject jsonObj1 = new JSONObject(); JSONObject jsonObj2 = new JSONObject(); jsonObj1.put("user", "張三"); jsonObj1.put("sex", "男"); jsonObj1.put("desc", "工程師"); jsonObj2.put("user", "李四"); jsonObj2.put("sex", "女"); jsonObj2.put("desc", "醫生"); JSONArray jsonArray = new JSONArray(); jsonArray.add(jsonObj1); jsonArray.add(jsonObj2); //批量插入 try { batchInsertData(indexName, type, jsonArray); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); }*/ /*******************************批量插入資料********************************/ /*******************************更新單條資料********************************/ /*JSONObject updatedataJson = new JSONObject(); updatedataJson.put("user", "張三"); updatedataJson.put("sex", "女"); updatedataJson.put("desc", "工程師"); try { updateData(indexName, type, id, updatedataJson); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } */ /*******************************更新單條資料********************************/ /*******************************刪除索引********************************/ /*try { deleteIndex(indexName); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); }*/ /*******************************刪除索引********************************/ /*******************************刪除單條資料********************************/ /*try { deleteData(indexName, type, id); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } */ /*******************************刪除單條資料********************************/ } }

查詢hbase 資料

只要把ES中索引查出來,然後使用索引資料,查詢hbase資料就可以了.

相關推薦

ElasticsearchHbase資料建索引實現海量資料快速查詢

一、將專案匯入myeclipse中方法1:將下載好的檔案(是解壓es_hbase6資料夾而不是Test-master)解壓到你myeclipse的Workspaces目錄中,然後在myeclipse中右鍵點選Import匯入專案方法2:將下載好的檔案解壓到你的Windows桌

Elasticsearch+hbase 實現hbase資料快速查詢()

接下來是Elasticsearch (版本5.x)中資料的CRUD 操作,為此,根據ES官網上的資料總結了一個工具類. 具體如下: (1)maven 新增依賴 (2)工具類程式碼: public class ESClientUtils { pro

Elasticsearch+hbase 實現hbase資料快速查詢(三)

前2篇介紹了Elasticsearch的安裝和工具類,雖然這樣能用,但是還留有幾個問題,對此有些困擾. 多條件查詢 工具類裡面有個get精確查詢和search搜尋,但是那個只用來查詢單一條件,如果查詢介面上需要查詢多個條件,那這個顯然不夠用.在網路上搜索了

Elasticsearch+hbase 實現hbase資料快速查詢(一)

之前雖做了solr-hbase構建二級索引以及快速查詢,但是考慮到以後生成的資料可能會很多,一旦到了億級以上,solr查詢效率會漸漸慢下來.老闆不滿意,又聽了幾位專家的建議,採用Elasticsearch+hbase 來實現hbase中資料的快速查詢. 首先,

JAVA對資料庫進行操作,實現資料庫資料的插入,查詢,更改,刪除操作

轉載自:http://www.cnblogs.com/sodawoods-blogs/p/4415858.html (—)通過mysql workbench 建立一個數據庫,在這裡命名為company,然後建一個tb_employee表 (二)以下是java程式碼對錶

HBase刪除表資料

1、使用hbase shell中delete命令刪除表中特定的單元格資料,命令格式如下: delete 'tablename','row','column name','time stramp' 刪除emp表中第二行personal data:name列

惠州學院-資料庫實驗2-資料庫資料查詢

    計算機科學系實驗報告(首頁) 課程名稱 資料庫系統概論 班級 14計科2班 實驗名稱 資料庫中資料的查詢

POI實現Excel資料的讀取

所需依賴包:poi-3.17.jar、poi-ooxml-3.17.jar、poi-ooxml-schemas-3.17.jar、xmlbeans-2.6.0.jar、commons-collections4-4.1.jar。 依賴包下載地址:http://mvnrepository.com/a

springboot實現資料庫資料匯出Excel功能

功能介紹        網上查找了一堆的資料匯出程式碼,可能是自己基礎比較薄弱的原因還是別的什麼原因,導致一直沒有執行成功,就算是執行成功的,結果也是差強人意。在此總結一下自己借鑑別人已經經過自己整合

Android實現列表仿聯絡人快速查詢和關鍵字搜尋

 列表按字母順序排序,右側實現根據首字母快速查詢,輸入框輸入首字母或元素第一個字也可以實現快速查詢。 頁面佈局如下: <LinearLayoutxmlns:andro

在ArcGIS如何快速查詢所要檢視的要素?

(1)開啟mxd地圖文件。 (2)在【工具】工具條中單擊【查詢】按鈕,開啟【查詢】對話方塊,單擊【要素】標籤,切換到【要素】選項卡。 (3)在【查詢】下拉框中輸入要查詢的要素名稱“**”;單擊【範圍】

使用hibernate實現mysqllimit的查詢

 給大家分享個知識點,hibernate 的hql不支援limit的使用。以後大家如果有限制查詢從第幾條至第幾條時。就用          List<SiteInvestment> investments = new ArrayList<SiteIn

Vue.js框架--Vuex實現元件裡資料持久化(十八)

主要操作技能:      Vuex 是一個專為 Vue.js 應用程式開發的狀態管理模式      新聞頁面每次切換路由時,再次訪問就會請求資料;那麼如何直接從vuex中持久化資料呢?      

Keras之predict分類:DL之binary_crossentropy&Relu——DIY分類資料實現輸入新資料點預測分類

Keras之predict二分類:DL之binary_crossentropy&Relu——DIY二分類資料集實現輸入新資料點預測二分類 輸出結果     實現程式碼 #Keras之predict二分類:DL之binary_cro

資料結構-查詢叉樹

查詢二叉樹:始終滿足任一節點的左兒子小於該節點,右兒子大於該節點, 由於其特性,查詢二叉樹的中序遍歷始終為從小到大; 需要注意的是二叉查詢樹的刪除,有2種策略,在刪除節點不多時可以採用懶惰刪除,即標記該節點刪除而非真正的delete,第二種是當刪除節點只有一個兒子或則為葉子

Elasticsearch+Hbase實現海量資料秒回查詢

---------------------------------------------------------------------------------------------[版權申明:本文系作者原創,轉載請註明出處] 文章出處:http://blog.csdn.

Elasticsearch+Hbase實現海量數據秒回查詢

ont 都是 lease tor lines app req 空格 java開發 ---------------------------------------------------------------------------------------------[版權

hbase表1讀取資料,最終結果寫入到hbase表2 ,如何通過MapReduce實現

需要一: 將hbase中‘student’表中的info:name和info:age兩列資料取出並寫入到hbase中‘user’表中的basic:XM和basic:NL class ReadStudentMapper extends Table

HBase 寫優化之 BulkLoad 實現資料快速入庫

1、為何要 BulkLoad 匯入?傳統的 HTableOutputFormat 寫 HBase 有什麼問題? 我們先看下 HBase 的寫流程: 通常 MapReduce 在寫HBase時使用的是 TableOutputFormat 方式,在reduce中直接生成put

如何用Elasticsearch實現類似SQL的IN查詢實例

red ast last .cn lte style sea ges logs 我想實現類似如下sql語句的效果: select * from table1 where rw_id in (‘7a482589-e52e-0887-4dd5-5821aab77eea‘,‘c