1. 程式人生 > >Spark2.2+ES6.4.2(三十二):ES API之ndex的create(建立index時設定setting,並建立index後根據avro模板動態設定index的mapping)/update/delete/open/close

Spark2.2+ES6.4.2(三十二):ES API之ndex的create(建立index時設定setting,並建立index後根據avro模板動態設定index的mapping)/update/delete/open/close

要想通過ES API對es的操作,必須獲取到TransportClient物件,讓後根據TransportClient獲取到IndicesAdminClient物件後,方可以根據IndicesAdminClient物件提供的方法對ES的index進行操作:create index,update index(update index settings,update index mapping),delete index,open index,close index。

準備工作(建立TransportClient,IndicesAdminClient)

第一步:匯入ES6.4.2的依賴包: 

    <dependencies>
        <!--Spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <
dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId
> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.11</artifactId> <version>0.9.5</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-avro_2.11</artifactId> <version>3.2.0</version> <type>jar</type> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.4.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.4.2</version> </dependency> </dependencies>

備註:這裡依賴可能有點多,elastricsearch api操作的話就是依賴org.elasticsearch.client。

第二步:獲取TransportClient,IndicesAdminClient物件:

    /**
     * 獲取ES Client API物件。
     * */
    public static TransportClient getClient() {
        Map<String, String> esOptionsMap = getSparkESCommonOptions();

        return getClient(esOptionsMap);
    }

    /**
     * 獲取ES Client API物件。
     * */
    public static TransportClient getClient(Map<String, String> esOptionsMap) {
        Settings settings = Settings.builder()//
                .put("cluster.name", esOptionsMap.get("cluster.name")) //
                .put("client.transport.sniff", true)//
                .build();

        PreBuiltTransportClient preBuiltTransportClient = new PreBuiltTransportClient(settings);
        TransportClient client = preBuiltTransportClient;

        // 192.168.1.120,192.168.1.121,192.168.1.122,192.168.1.123
        String esNodeStr = esOptionsMap.get("es.nodes");
        String[] esNodeArr = esNodeStr.split(",");

        try {
            for (String esNode : esNodeArr) {
                client.addTransportAddress(new TransportAddress(InetAddress.getByName(esNode), 9300));
            }
        } catch (UnknownHostException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }

        return client;
    }

    public static IndicesAdminClient getAdminClient() {
        Map<String, String> esOptionsMap = getSparkESCommonOptions();

        return getAdminClient(esOptionsMap);
    }

    public static IndicesAdminClient getAdminClient(Map<String, String> esOptionsMap) {
        TransportClient client = getClient(esOptionsMap);
        IndicesAdminClient adminClient = client.admin().indices();
        return adminClient;
    }

備註:其中getSparkESCommonOptions()中配置物件包含:

cluster.name=es-application
es.nodes=192.168.1.120,192.168.1.121,192.168.1.122,192.168.1.123
es.port=9200
es.index.auto.create=true
pushdown=true
es.nodes.wan.only=true
es.mapping.date.rich=false #//設定讀取es中date資料型別欄位時,把它當做string來讀取。
es.scroll.size=10000

ES API之Exists/Create Index:

建立index之前,需要判斷index及其對應的型別是否存在,使用這個方法:

    /**
     * 是否ES包含某個索引型別
     * 
     * @param indexName
     *            index
     * @param indexType
     *            index對應的type
     * */
    public static boolean typeExists(String indexName, String indexType) {
        TypesExistsResponse typeResponse = getAdminClient().prepareTypesExists(indexName).setTypes(indexType).execute().actionGet();
        if (typeResponse.isExists()) {
            return true;
        }
        return false;
    }

    /**
     * 判斷ES中是否存在某個index<br>
     * 是否包含型別,待驗證,看別人呼叫時是不需要帶型別的。
     * */
    public static boolean indexExists(String... indices) {
        IndicesExistsRequest request = new IndicesExistsRequest(indices);
        IndicesExistsResponse response = getAdminClient().exists(request).actionGet();
        if (response.isExists()) {
            return true;
        }
        return false;
    }

建立index,包含兩種:不指定mapping和isettings只建立一個空的index;指定mapping和settings建立複雜的index。

建立一個空的index:

    /**
     * 建立簡單索引——沒有指定mapping<br>
     * 此時資料插入時,會讀取資料的資料的欄位名稱,自動建立mapping欄位(但是,存在問題資料型別不能完好的控制,比如double型別可能會被匹配為float,date型別的格式消失)
     * */
    public static boolean indexCreate(String indexName) {
        CreateIndexResponse response = getAdminClient().prepareCreate(indexName).get();
        return response.isAcknowledged();
    }

備註:此時資料插入時,會讀取資料的資料的欄位名稱,自動建立mapping欄位(但是,存在問題資料型別不能完好的控制,比如double型別可能會被匹配為float,date型別的格式消失)

建立複雜的index:

    /**
     * 建立複雜索引(型別/mapping),指定索引的setting和mapping,其中mappingSource是一個json資料字串。
     * 
     * @param indexName
     *            索引名
     * @param indexType
     *            索引型別名
     * @param mappingSource
     *            索引mapping
     */
    public static boolean indexCreate(String indexName, String indexType, XContentBuilder builder) {
        Settings settings = Settings.builder() //
                .put("index.mapping.ignore_malformed", true)//
                .put("index.refresh_interval", "60s") //
                .put("index.number_of_shards", 4)//
                .put("index.number_of_replicas", 0)//
                .put("index.max_result_window", 500000)//

                .put("index.translog.durability", "async")//
                .put("index.translog.sync_interval", "120s")//
                .put("index.translog.flush_threshold_size", "2gb")//

                .put("index.merge.scheduler.max_thread_count", 1)//
                .build();

        return indexCreate(indexName, indexType, builder, settings);
    }

    /**
     * 建立複雜索引(型別/mapping),指定索引的setting和mapping,其中mappingSource是一個json資料字串。
     * 
     * @param indexName
     *            索引名
     * @param indexType
     *            索引型別名
     * @param mappingSource
     *            索引mapping
     * @param settings
     *            索引settings<br>
     *            setting http://10.205.201.97:9200/twitter/_settings?pretty<br>
     *            "settings":<br>
     *            {<br>
     *            ----"index":<br>
     *            ----{<br>
     *            --------"mapping":<br>
     *            --------{<br>
     *            ------------"ignore_malformed":"true"<br>
     *            --------},<br>
     *            --------"refresh_interval":"60s",<br>
     *            --------"number_of_shards":"4",<br>
     *            --------"translog":<br>
     *            --------{<br>
     *            ------------"flush_threshold_size":"2048m",<br>
     *            ------------"sync_interval":"120s",<br>
     *            ------------"durability":"async"<br>
     *            --------},<br>
     *            --------"provided_name":"indexName",<br>
     *            --------"merge":{<br>
     *            ------------"scheduler":<br>
     *            ------------{<br>
     *            ----------------"max_thread_count":"1"<br>
     *            ------------}<br>
     *            --------},<br>
     *            --------"max_result_window":"500000",<br>
     *            --------"creation_date":"1540781909323",<br>
     *            --------"number_of_replicas":"0",<br>
     *            --------"uuid":"5c079b5tQrGdX0fF23xtQA",<br>
     *            --------"version":{"created":"6020499"}<br>
     *            ----}<br>
     *            }<br>
     */
    public static boolean indexCreate(String indexName, String indexType, XContentBuilder builder, Settings settings) {
        if (indexExists(indexName)) {
            return false;
        }

        // CreateIndexResponse準備建立索引,增加setSetting()方法可以設定setting引數,否則將會按預設設定
        CreateIndexResponse cIndexResponse = getAdminClient().prepareCreate(indexName)//
                .setSettings(settings)// setting
                .addMapping(indexType, builder)// type,mapping 這種方式也可以,經過測試。
                .get();

        return cIndexResponse.isAcknowledged();
    }

ES API之Update Index:

所謂的修改index,也就是修改index的settings和mapping:

    /**
     * 修改ES索引的mapping屬性
     * */
    public static boolean indexUpdateMapping(String indexName, String indexType, XContentBuilder builder) {
        org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest mapping = Requests.putMappingRequest(indexName).type(indexType)
                .source(builder);
        PutMappingResponse pMappingResource = getAdminClient().putMapping(mapping).actionGet();

        return pMappingResource.isAcknowledged();
    }

    /**
     * 修改ES索引的settings屬性<br>
     * 更新索引屬性(更新索引的settings屬性,這是更改已經建立的屬性、但有些一旦建立不能更改,需要按照自己的需求來進行選擇使用)
     * */
    public static boolean indexUpdatSettings(String indexName, Map<String, String> settingsMap) {
        Builder settings = Settings.builder();//
        for (Map.Entry<String, String> kv : settingsMap.entrySet()) {
            settings.put(kv.getKey(), kv.getValue());
        }

        return indexUpdatSettings(indexName, settings);
    }

    /**
     * 修改ES索引的settings屬性<br>
     * 更新索引屬性(更新索引的settings屬性,這是更改已經建立的屬性、但有些一旦建立不能更改,需要按照自己的需求來進行選擇使用)
     * */
    public static boolean indexUpdatSettings(String indexName, Builder settings) {
        UpdateSettingsResponse uIndexResponse = getAdminClient().prepareUpdateSettings(indexName)//
                .setSettings(settings)//
                .execute().actionGet();
        return uIndexResponse.isAcknowledged();
    }

ES API之Delete/Open/Close Index:

    /**
     * 刪除ES中某個或者多個索引
     * */
    public static boolean indexDelete(String... indices) {
        DeleteIndexResponse dIndexResponse = getAdminClient().prepareDelete(indices).execute().actionGet();
        if (dIndexResponse.isAcknowledged()) {
            System.out.println("刪除索引成功");
            return true;
        } else {
            System.out.println("刪除索引失敗");
            return false;
        }
    }

    /**
     * 關閉ES中某個或者多個索引<br>
     * curl -XPOST "http://127.0.0.1:9200/indexname/_close"
     * */
    public static boolean indexClose(String... indices) {
        CloseIndexResponse cIndexResponse = getAdminClient().prepareClose(indices).execute().actionGet();
        if (cIndexResponse.isAcknowledged()) {
            System.out.println("關閉索引成功");
            return true;
        }
        return false;
    }

    /**
     * 開啟ES中某個或者多個索引<br>
     * curl -XPOST "http://127.0.0.1:9200/indexname/_open"
     * */
    public static boolean indexOpen(String... indices) {
        OpenIndexResponse oIndexResponse = getAdminClient().prepareOpen(indices).execute().actionGet();
        if (oIndexResponse.isAcknowledged()) {
            System.out.println("開啟索引成功");
            return true;
        }
        return false;
    }