Elasticsearch+hbase 實現hbase中資料的快速查詢(二)
接下來是Elasticsearch (版本5.x)中資料的CRUD 操作,為此,根據ES官網上的資料總結了一個工具類.
具體如下:
(1)maven 新增依賴
(2)工具類程式碼:
public class ESClientUtils {
protected static Logger logger = Logger.getLogger(ESClientUtils.class);
protected static TransportClient client;
private static String es_node1_host = "";
private static String es_node1_port = "";
static {
String configName = "/hbase_elasticsearch.properties";
PropertiesLoader propertiesLoader = new PropertiesLoader(configName);
try {
es_node1_host = propertiesLoader.getProperty("es_node1_host");
es_node1_port = propertiesLoader.getProperty("es_node1_port" );
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* @Title: setUp
* @Description: TODO(建立client連線)
* @param: @throws Exception
* @return: void
* @throws
*/
@Before
public static void setUp() throws Exception {
Settings esSettings = Settings.builder()
.put("cluster.name", "utan-es") //設定ES例項的名稱
.put("client.transport.sniff", true) //自動嗅探整個叢集的狀態,把叢集中其他ES節點的ip新增到本地的客戶端列表中
.build();
/**
* 這裡的連線方式指的是沒有安裝x-pack外掛,如果安裝了x-pack則參考{@link ElasticsearchXPackClient}
* 1. java客戶端的方式是以tcp協議在9300埠上進行通訊
* 2. http客戶端的方式是以http協議在9200埠上進行通訊
*/
/*client = new PreBuiltTransportClient(esSettings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.0.0.12"), Integer.parseInt("9300")));*/
client = new PreBuiltTransportClient(esSettings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(es_node1_host), Integer.parseInt(es_node1_port)));
System.out.println("ElasticsearchClient 連線成功");
}
public ESClientUtils() throws Exception{
setUp();
}
@After
public static void closeClient() throws Exception {
if (client != null) {
client.close();
}
}
/**
* @Title: getQueryResponse
* @Description: TODO(查詢資料,精確查詢)
* @param: @throws Exception
* @return: void
* @throws
*/
public static GetResponse getQueryResponse(String indexName, String type, String id) throws Exception{
setUp();
// 搜尋資料
GetResponse response = client.prepareGet(indexName, type, id).execute().actionGet();
// 輸出結果
System.out.println(response.getSourceAsString());
// 關閉client
closeClient();
return response;
}
/**
* @Title: getQueryResSource
* @Description: TODO(查詢資料,返回Source,與getQueryResponse搭配使用)
* @param: @param response
* @param: @return
* @param: @throws Exception
* @return: Map<String,Object>
* @throws
*/
public static Map<String, Object> getQueryResSource(GetResponse response) throws Exception{
setUp();
//查詢新增的索引
// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
Map<String, GetField> fields = response.getFields();
for (Entry<String, GetField> fieldsEach : fields.entrySet()) {
System.out.println("fieldsEach.key:"+fieldsEach.getKey() + " " + "fieldsEach.value:"+fieldsEach.getValue().toString());
}
Map<String, Object> source = response.getSource();
for (Entry<String, Object> sourceEach : source.entrySet()) {
System.out.println("sourceEach.key:"+sourceEach.getKey() + " " + "sourceEach.value:"+sourceEach.getValue());
}
// 關閉client
closeClient();
return source;
}
/**
* @Title: getSearchResponse
* @Description: TODO(查詢資料,多條件查詢)
* @param: @throws Exception
* @return: void
* @throws
*/
public static SearchResponse getSearchResponse(String indexName, String type, Map<String, Object> queryCondition, Map<String, Object> filterCondition) throws Exception{
setUp();
//TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("", "");
MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("", "");
for (Entry<String, Object> conditionEach : queryCondition.entrySet()) {
matchQueryBuilder = QueryBuilders.matchQuery(conditionEach.getKey(), conditionEach.getValue());
}
SearchResponse response = client.prepareSearch(indexName)
.setTypes(type)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(matchQueryBuilder) // Query
//.setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18)) // Filter
.setFrom(0).setSize(60).setExplain(false) //true,有結果解釋;false,沒有
.get();
closeClient();
// 輸出結果
SearchHit[] hits = response.getHits().hits();
for(int i=0;i<hits.length;i++){
SearchHit hiti = hits[i];
System.out.println(hiti.getSourceAsString());
}
System.out.println(response.toString());
return response;
}
/**
* @Title: getMultiSearchResponse
* @Description: TODO(多條件搜尋)
* @param: @param indexName
* @param: @param type
* @param: @param queryCondition
* @param: @param filterCondition
* @param: @return
* @param: @throws Exception
* @return: MultiSearchResponse
* @throws
*/
public static MultiSearchResponse getMultiSearchResponse(String indexName, String type, Map<String, Object> queryCondition, Map<String, Object> filterCondition) throws Exception{
setUp();
SearchRequestBuilder srb1 = client
.prepareSearch().setQuery(QueryBuilders.queryStringQuery("elasticsearch")).setSize(1);
SearchRequestBuilder srb2 = client
.prepareSearch().setQuery(QueryBuilders.matchQuery("name", "kimchy")).setSize(1);
MultiSearchResponse sr = client.prepareMultiSearch()
.add(srb1)
.add(srb2)
.get();
// You will get all individual responses from MultiSearchResponse#getResponses()
long nbHits = 0;
for (MultiSearchResponse.Item item : sr.getResponses()) {
SearchResponse response = item.getResponse();
nbHits += response.getHits().getTotalHits();
}
closeClient();
return sr;
}
public static void createIndex(String indexName) throws Exception{
setUp();
client.admin().indices().prepareCreate(indexName).get();
closeClient();
}
/**
* @Title: insertData
* @Description: TODO(插入一條)
* @param: @param indexName
* @param: @param json
* @param: @return
* @param: @throws Exception
* @return: IndexResponse
* @throws
*/
public static IndexResponse insertData(String indexName, String type, String id, JSONObject json) throws Exception{
setUp();
IndexResponse response = client.prepareIndex(indexName, type, id)
.setSource(json)
.get();
//查詢新增的索引
// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();
closeClient();
return response;
}
/**
* @Title: batchInsertData
* @Description: TODO(批量插入)
* @param: @param indexName
* @param: @param type
* @param: @param jsonArr
* @param: @return
* @param: @throws Exception
* @return: BulkResponse
* @throws
*/
public static BulkResponse batchInsertData(String indexName, String type, JSONArray jsonArr) throws Exception{
setUp();
BulkRequestBuilder bulkRequest = client.prepareBulk();
for(int i=0;i<jsonArr.size();i++){
JSONObject jsonObj = jsonArr.getJSONObject(i);
//插入單條
bulkRequest.add(client.prepareIndex(indexName, type)
.setSource(jsonObj)
);
}
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
//
//處理失敗
return null;
}
closeClient();
return bulkResponse;
}
/**
* @Title: updateData
* @Description: TODO(更新單條資料)
* @param: @param indexName
* @param: @param type
* @param: @param id
* @param: @param json
* @param: @return
* @param: @throws Exception
* @return: UpdateResponse
* @throws
*/
public static UpdateResponse updateData(String indexName, String type, String id, JSONObject json) throws Exception{
setUp();
UpdateResponse updateResponse = client.prepareUpdate(indexName, type, id)
.setDoc(json.toString(),XContentType.JSON).get();
closeClient();
return updateResponse;
}
/**
* @Title: deleteIndex
* @Description: TODO(刪除索引)
* @param: @param indexName
* @param: @return
* @param: @throws Exception
* @return: boolean
* @throws
*/
public static boolean deleteIndex(String indexName) throws Exception{
setUp();
IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(indexName);
IndicesExistsResponse inExistsResponse = client.admin().indices()
.exists(inExistsRequest).actionGet();
if(inExistsResponse.isExists()){
DeleteIndexResponse dResponse = client.admin().indices().prepareDelete(indexName)
.execute().actionGet();
if(dResponse.isAcknowledged()){
closeClient();
return true;
}
closeClient();
return false;
}
closeClient();
return false;
}
/**
* @Title: deleteData
* @Description: TODO(這裡用一句話描述這個方法的作用)
* @param: @return
* @param: @throws Exception
* @return: DeleteResponse
* @throws
*/
public static DeleteResponse deleteData(String indexName, String type, String id) throws Exception{
setUp();
DeleteResponse response = client.prepareDelete(indexName, type, id).get();
closeClient();
return response;
}
/**
* @throws Exception
* @Title: DeleteByQueryAction
* @Description: TODO(通過查詢條件刪除資料)
* @param: @param indexName
* @param: @param type
* @return: void
* @throws
*/
public static void DeleteByQueryAction(String indexName, String type) throws Exception{
}
public static void main(String[] args) {
String indexName = "testindex";
String type = "person";
String id = "1";
Map<String, Object> queryCondition = new HashMap<String, Object>();
queryCondition.put("user", "李");
Map<String, Object> filterCondition = new HashMap<String, Object>();
try {
/*******************************精確查詢單條資料********************************/
//getQueryResponse(indexName, type, id);
/*******************************精確查詢單條資料********************************/
/*******************************匹配查詢資料********************************/
getSearchResponse(indexName, type, queryCondition, filterCondition);
/*******************************匹配查詢資料********************************/
} catch (Exception e) {
e.printStackTrace();
}
/*******************************插入單條資料********************************/
/*JSONObject dataJson = new JSONObject();
dataJson.put("user", "張三");
dataJson.put("sex", "男");
dataJson.put("desc", "工程師");
try {
insertData(indexName, type, id, dataJson);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/
/*******************************插入單條資料********************************/
/*******************************批量插入資料********************************/
/*JSONObject jsonObj1 = new JSONObject();
JSONObject jsonObj2 = new JSONObject();
jsonObj1.put("user", "張三");
jsonObj1.put("sex", "男");
jsonObj1.put("desc", "工程師");
jsonObj2.put("user", "李四");
jsonObj2.put("sex", "女");
jsonObj2.put("desc", "醫生");
JSONArray jsonArray = new JSONArray();
jsonArray.add(jsonObj1);
jsonArray.add(jsonObj2);
//批量插入
try {
batchInsertData(indexName, type, jsonArray);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/
/*******************************批量插入資料********************************/
/*******************************更新單條資料********************************/
/*JSONObject updatedataJson = new JSONObject();
updatedataJson.put("user", "張三");
updatedataJson.put("sex", "女");
updatedataJson.put("desc", "工程師");
try {
updateData(indexName, type, id, updatedataJson);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} */
/*******************************更新單條資料********************************/
/*******************************刪除索引********************************/
/*try {
deleteIndex(indexName);
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}*/
/*******************************刪除索引********************************/
/*******************************刪除單條資料********************************/
/*try {
deleteData(indexName, type, id);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} */
/*******************************刪除單條資料********************************/
}
}
查詢hbase 資料
只要把ES中索引查出來,然後使用索引資料,查詢hbase資料就可以了.
相關推薦
Elasticsearch對Hbase中的資料建索引實現海量資料快速查詢
一、將專案匯入myeclipse中方法1:將下載好的檔案(是解壓es_hbase6資料夾而不是Test-master)解壓到你myeclipse的Workspaces目錄中,然後在myeclipse中右鍵點選Import匯入專案方法2:將下載好的檔案解壓到你的Windows桌
Elasticsearch+hbase 實現hbase中資料的快速查詢(二)
接下來是Elasticsearch (版本5.x)中資料的CRUD 操作,為此,根據ES官網上的資料總結了一個工具類. 具體如下: (1)maven 新增依賴 (2)工具類程式碼: public class ESClientUtils { pro
Elasticsearch+hbase 實現hbase中資料的快速查詢(三)
前2篇介紹了Elasticsearch的安裝和工具類,雖然這樣能用,但是還留有幾個問題,對此有些困擾. 多條件查詢 工具類裡面有個get精確查詢和search搜尋,但是那個只用來查詢單一條件,如果查詢介面上需要查詢多個條件,那這個顯然不夠用.在網路上搜索了
Elasticsearch+hbase 實現hbase中資料的快速查詢(一)
之前雖做了solr-hbase構建二級索引以及快速查詢,但是考慮到以後生成的資料可能會很多,一旦到了億級以上,solr查詢效率會漸漸慢下來.老闆不滿意,又聽了幾位專家的建議,採用Elasticsearch+hbase 來實現hbase中資料的快速查詢. 首先,
JAVA對資料庫進行操作,實現資料庫中資料的插入,查詢,更改,刪除操作
轉載自:http://www.cnblogs.com/sodawoods-blogs/p/4415858.html (—)通過mysql workbench 建立一個數據庫,在這裡命名為company,然後建一個tb_employee表 (二)以下是java程式碼對錶
HBase刪除表中資料
1、使用hbase shell中delete命令刪除表中特定的單元格資料,命令格式如下: delete 'tablename','row','column name','time stramp' 刪除emp表中第二行personal data:name列
惠州學院-資料庫實驗2-資料庫中資料的查詢
計算機科學系實驗報告(首頁) 課程名稱 資料庫系統概論 班級 14計科2班 實驗名稱 資料庫中資料的查詢
POI實現Excel中資料的讀取
所需依賴包:poi-3.17.jar、poi-ooxml-3.17.jar、poi-ooxml-schemas-3.17.jar、xmlbeans-2.6.0.jar、commons-collections4-4.1.jar。 依賴包下載地址:http://mvnrepository.com/a
springboot實現資料庫中資料匯出Excel功能
功能介紹 網上查找了一堆的資料匯出程式碼,可能是自己基礎比較薄弱的原因還是別的什麼原因,導致一直沒有執行成功,就算是執行成功的,結果也是差強人意。在此總結一下自己借鑑別人已經經過自己整合
Android實現列表仿聯絡人快速查詢和關鍵字搜尋
列表按字母順序排序,右側實現根據首字母快速查詢,輸入框輸入首字母或元素第一個字也可以實現快速查詢。 頁面佈局如下: <LinearLayoutxmlns:andro
在ArcGIS中如何快速查詢所要檢視的要素?
(1)開啟mxd地圖文件。 (2)在【工具】工具條中單擊【查詢】按鈕,開啟【查詢】對話方塊,單擊【要素】標籤,切換到【要素】選項卡。 (3)在【查詢】下拉框中輸入要查詢的要素名稱“**”;單擊【範圍】
使用hibernate實現mysql中limit的查詢
給大家分享個知識點,hibernate 的hql不支援limit的使用。以後大家如果有限制查詢從第幾條至第幾條時。就用 List<SiteInvestment> investments = new ArrayList<SiteIn
Vue.js框架--Vuex實現元件裡資料持久化(二十八)
主要操作技能: Vuex 是一個專為 Vue.js 應用程式開發的狀態管理模式 新聞頁面每次切換路由時,再次訪問就會請求資料;那麼如何直接從vuex中持久化資料呢?  
Keras之predict二分類:DL之binary_crossentropy&Relu——DIY二分類資料集實現輸入新資料點預測二分類
Keras之predict二分類:DL之binary_crossentropy&Relu——DIY二分類資料集實現輸入新資料點預測二分類 輸出結果 實現程式碼 #Keras之predict二分類:DL之binary_cro
資料結構-查詢二叉樹
查詢二叉樹:始終滿足任一節點的左兒子小於該節點,右兒子大於該節點, 由於其特性,查詢二叉樹的中序遍歷始終為從小到大; 需要注意的是二叉查詢樹的刪除,有2種策略,在刪除節點不多時可以採用懶惰刪除,即標記該節點刪除而非真正的delete,第二種是當刪除節點只有一個兒子或則為葉子
Elasticsearch+Hbase實現海量資料秒回查詢
---------------------------------------------------------------------------------------------[版權申明:本文系作者原創,轉載請註明出處] 文章出處:http://blog.csdn.
Elasticsearch+Hbase實現海量數據秒回查詢
ont 都是 lease tor lines app req 空格 java開發 ---------------------------------------------------------------------------------------------[版權
從hbase表1中讀取資料,最終結果寫入到hbase表2 ,如何通過MapReduce實現 ?
需要一: 將hbase中‘student’表中的info:name和info:age兩列資料取出並寫入到hbase中‘user’表中的basic:XM和basic:NL class ReadStudentMapper extends Table
HBase 寫優化之 BulkLoad 實現資料快速入庫
1、為何要 BulkLoad 匯入?傳統的 HTableOutputFormat 寫 HBase 有什麼問題? 我們先看下 HBase 的寫流程: 通常 MapReduce 在寫HBase時使用的是 TableOutputFormat 方式,在reduce中直接生成put
如何用Elasticsearch實現類似SQL中的IN查詢實例
red ast last .cn lte style sea ges logs 我想實現類似如下sql語句的效果: select * from table1 where rw_id in (‘7a482589-e52e-0887-4dd5-5821aab77eea‘,‘c