Spark2.2+ES6.4.2(三十二):ES API之ndex的create(建立index時設定setting,並建立index後根據avro模板動態設定index的mapping)/update/delete/open/close
阿新 • • 發佈:2018-11-03
要想通過ES API對es的操作,必須獲取到TransportClient物件,讓後根據TransportClient獲取到IndicesAdminClient物件後,方可以根據IndicesAdminClient物件提供的方法對ES的index進行操作:create index,update index(update index settings,update index mapping),delete index,open index,close index。
準備工作(建立TransportClient,IndicesAdminClient)
第一步:匯入ES6.4.2的依賴包:
<dependencies> <!--Spark --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.11</artifactId> <version>0.9.5</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_2.11</artifactId> <version>3.2.0</version> <type>jar</type> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.4.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.4.2</version> </dependency> </dependencies>
備註:這裡依賴可能有點多,elastricsearch api操作的話就是依賴org.elasticsearch.client。
第二步:獲取TransportClient,IndicesAdminClient物件:
/** * 獲取ES Client API物件。 * */ public static TransportClient getClient() { Map<String, String> esOptionsMap = getSparkESCommonOptions(); return getClient(esOptionsMap); } /** * 獲取ES Client API物件。 * */ public static TransportClient getClient(Map<String, String> esOptionsMap) { Settings settings = Settings.builder()// .put("cluster.name", esOptionsMap.get("cluster.name")) // .put("client.transport.sniff", true)// .build(); PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings); TransportClient client = preBuiltTransportClient; // 192.168.1.120,192.168.1.121,192.168.1.122,192.168.1.123 String esNodeStr = esOptionsMap.get("es.nodes"); String[] esNodeArr = esNodeStr.split(","); try { for (String esNode : esNodeArr) { client.addTransportAddress(new TransportAddress(InetAddress.getByName(esNode), 9300)); } } catch (UnknownHostException e) { e.printStackTrace(); throw new RuntimeException(e); } return client; } public static IndicesAdminClient getAdminClient() { Map<String, String> esOptionsMap = getSparkESCommonOptions(); return getAdminClient(esOptionsMap); } public static IndicesAdminClient getAdminClient(Map<String, String> esOptionsMap) { TransportClient client = getClient(esOptionsMap); IndicesAdminClient adminClient = client.admin().indices(); return adminClient; }
備註:其中getSparkESCommonOptions()中配置物件包含:
cluster.name=es-application es.nodes=192.168.1.120,192.168.1.121,192.168.1.122,192.168.1.123 es.port=9200 es.index.auto.create=true pushdown=true es.nodes.wan.only=true es.mapping.date.rich=false #//設定讀取es中date資料型別欄位時,把它當做string來讀取。 es.scroll.size=10000
ES API之Exists/Create Index:
建立index之前,需要判斷index及其對應的型別是否存在,使用這個方法:
/** * 是否ES包含某個索引型別 * * @param indexName * index * @param indexType * index對應的type * */ public static boolean typeExists(String indexName, String indexType) { TypesExistsResponse typeResponse = getAdminClient().prepareTypesExists(indexName).setTypes(indexType).execute().actionGet(); if (typeResponse.isExists()) { return true; } return false; } /** * 判斷ES中是否存在某個index<br> * 是否包含型別,待驗證,看別人呼叫時是不需要帶型別的。 * */ public static boolean indexExists(String... indices) { IndicesExistsRequest request = new IndicesExistsRequest(indices); IndicesExistsResponse response = getAdminClient().exists(request).actionGet(); if (response.isExists()) { return true; } return false; }
建立index,包含兩種:不指定mapping和isettings只建立一個空的index;指定mapping和settings建立複雜的index。
建立一個空的index:
/** * 建立簡單索引——沒有指定mapping<br> * 此時資料插入時,會讀取資料的資料的欄位名稱,自動建立mapping欄位(但是,存在問題資料型別不能完好的控制,比如double型別可能會被匹配為float,date型別的格式消失) * */ public static boolean indexCreate(String indexName) { CreateIndexResponse response = getAdminClient().prepareCreate(indexName).get(); return response.isAcknowledged(); }
備註:此時資料插入時,會讀取資料的資料的欄位名稱,自動建立mapping欄位(但是,存在問題資料型別不能完好的控制,比如double型別可能會被匹配為float,date型別的格式消失)
建立複雜的index:
/** * 建立複雜索引(型別/mapping),指定索引的setting和mapping,其中mappingSource是一個json資料字串。 * * @param indexName * 索引名 * @param indexType * 索引型別名 * @param mappingSource * 索引mapping */ public static boolean indexCreate(String indexName, String indexType, XContentBuilder builder) { Settings settings = Settings.builder() // .put("index.mapping.ignore_malformed", true)// .put("index.refresh_interval", "60s") // .put("index.number_of_shards", 4)// .put("index.number_of_replicas", 0)// .put("index.max_result_window", 500000)// .put("index.translog.durability", "async")// .put("index.translog.sync_interval", "120s")// .put("index.translog.flush_threshold_size", "2gb")// .put("index.merge.scheduler.max_thread_count", 1)// .build(); return indexCreate(indexName, indexType, builder, settings); } /** * 建立複雜索引(型別/mapping),指定索引的setting和mapping,其中mappingSource是一個json資料字串。 * * @param indexName * 索引名 * @param indexType * 索引型別名 * @param mappingSource * 索引mapping * @param settings * 索引settings<br> * setting http://10.205.201.97:9200/twitter/_settings?pretty<br> * "settings":<br> * {<br> * ----"index":<br> * ----{<br> * --------"mapping":<br> * --------{<br> * ------------"ignore_malformed":"true"<br> * --------},<br> * --------"refresh_interval":"60s",<br> * --------"number_of_shards":"4",<br> * --------"translog":<br> * --------{<br> * ------------"flush_threshold_size":"2048m",<br> * ------------"sync_interval":"120s",<br> * ------------"durability":"async"<br> * --------},<br> * --------"provided_name":"indexName",<br> * --------"merge":{<br> * ------------"scheduler":<br> * ------------{<br> * ----------------"max_thread_count":"1"<br> * ------------}<br> * --------},<br> * --------"max_result_window":"500000",<br> * --------"creation_date":"1540781909323",<br> * --------"number_of_replicas":"0",<br> * --------"uuid":"5c079b5tQrGdX0fF23xtQA",<br> * --------"version":{"created":"6020499"}<br> * ----}<br> * }<br> */ public static boolean indexCreate(String indexName, String indexType, XContentBuilder builder, Settings settings) { if (indexExists(indexName)) { return false; } // CreateIndexResponse準備建立索引,增加setSetting()方法可以設定setting引數,否則將會按預設設定 CreateIndexResponse cIndexResponse = getAdminClient().prepareCreate(indexName)// .setSettings(settings)// setting .addMapping(indexType, builder)// type,mapping 這種方式也可以,經過測試。 .get(); return cIndexResponse.isAcknowledged(); }
ES API之Update Index:
所謂的修改index,也就是修改index的settings和mapping:
/** * 修改ES索引的mapping屬性 * */ public static boolean indexUpdateMapping(String indexName, String indexType, XContentBuilder builder) { org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest mapping = Requests.putMappingRequest(indexName).type(indexType) .source(builder); PutMappingResponse pMappingResource = getAdminClient().putMapping(mapping).actionGet(); return pMappingResource.isAcknowledged(); } /** * 修改ES索引的settings屬性<br> * 更新索引屬性(更新索引的settings屬性,這是更改已經建立的屬性、但有些一旦建立不能更改,需要按照自己的需求來進行選擇使用) * */ public static boolean indexUpdatSettings(String indexName, Map<String, String> settingsMap) { Builder settings = Settings.builder();// for (Map.Entry<String, String> kv : settingsMap.entrySet()) { settings.put(kv.getKey(), kv.getValue()); } return indexUpdatSettings(indexName, settings); } /** * 修改ES索引的settings屬性<br> * 更新索引屬性(更新索引的settings屬性,這是更改已經建立的屬性、但有些一旦建立不能更改,需要按照自己的需求來進行選擇使用) * */ public static boolean indexUpdatSettings(String indexName, Builder settings) { UpdateSettingsResponse uIndexResponse = getAdminClient().prepareUpdateSettings(indexName)// .setSettings(settings)// .execute().actionGet(); return uIndexResponse.isAcknowledged(); }
ES API之Delete/Open/Close Index:
/** * 刪除ES中某個或者多個索引 * */ public static boolean indexDelete(String... indices) { DeleteIndexResponse dIndexResponse = getAdminClient().prepareDelete(indices).execute().actionGet(); if (dIndexResponse.isAcknowledged()) { System.out.println("刪除索引成功"); return true; } else { System.out.println("刪除索引失敗"); return false; } } /** * 關閉ES中某個或者多個索引<br> * curl -XPOST "http://127.0.0.1:9200/indexname/_close" * */ public static boolean indexClose(String... indices) { CloseIndexResponse cIndexResponse = getAdminClient().prepareClose(indices).execute().actionGet(); if (cIndexResponse.isAcknowledged()) { System.out.println("關閉索引成功"); return true; } return false; } /** * 開啟ES中某個或者多個索引<br> * curl -XPOST "http://127.0.0.1:9200/indexname/_open" * */ public static boolean indexOpen(String... indices) { OpenIndexResponse oIndexResponse = getAdminClient().prepareOpen(indices).execute().actionGet(); if (oIndexResponse.isAcknowledged()) { System.out.println("開啟索引成功"); return true; } return false; }