1. 程式人生 > >Elasticsearch Java API簡介

Elasticsearch Java API簡介

tex version exist () address clas mar transport private

加入依賴

我本地的Elasticsearch的版本是2.1.0,因此加入相應的maven依賴

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>2.1.0</version>
</dependency>

創建Client

Elasticsearch Client分為Node Client和TransportClient。

  • Node Client:節點本身也是Elasticsearch集群的節點,也進入Elasticsearch集群和別的Elasticsearch集群中的節點一樣
  • TransportClient:輕量級的Client,使用Netty線程池,Socket連接到ES集群。本身不加入到集群,只作為請求的處理

一般我們使用TransportClient。創建Client的實例如下:

    private TransportClient client = null;

    @Before
    public void createElaCLient() throws UnknownHostException {
        //如果集群是默認名稱的話可以不設置集群名稱
        Settings settings = Settings.settingsBuilder().put("cluster.name","elasticsearch").build();
        client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("master"),9300));
    }

    /**
     * 關閉ela客戶端
     */
    @After
    public void closeElaClient(){
        if(client != null){
            client.close();
        }
    }

client.transport.sniff嗅探功能

你可以設置client.transport.sniff為true來使客戶端去嗅探整個集群的狀態,把集群中其它機器的ip地址加到客戶端中,這樣做的好處是一般你不用手動設置集群裏所有集群的ip到連接客戶端,它會自動幫你添加,並且自動發現新加入集群的機器。代碼實例如下:

    private TransportClient client = null;

    @Before
    public void createElaCLient() throws UnknownHostException {
        //如果集群是默認名稱的話可以不設置集群名稱
        Settings settings = Settings.settingsBuilder().put("cluster.name","elasticsearch").put("client.transport.sniff",true).build();
        client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("master"),9300));
    }

註意:當ES服務器監聽使用內網服務器IP而訪問使用外網IP時,不要使用client.transport.sniff為true,在自動發現時會使用內網IP進行通信,導致無法連接到ES服務器,而直接使用addTransportAddress方法進行指定ES服務器

測試Client連接到Elasticsearch集群

代碼如下:

@Test
    public void testConnection(){
        List<DiscoveryNode> discoveryList = client.connectedNodes();
        for(DiscoveryNode node : discoveryList){
            System.out.println(node.getName());
        }
    }

創建/刪除Index和Type信息

    /**
     * 創建索引
     */
    @Test
    public void createIndex(){
        if(client != null){
            client.admin().indices().create(new CreateIndexRequest("test_index")).actionGet();
        }
    }

    /**
     * 清除索引
     */
    @Test
    public void clearIndex(){
        IndicesExistsResponse indicesExistsResponse = client.admin().indices().exists(new IndicesExistsRequest("test_index")).actionGet();
        if(indicesExistsResponse.isExists()){
            client.admin().indices().delete(new DeleteIndexRequest("test_index")).actionGet();
        }
    }

    /**
     * 定義索引的映射類型(mapping)
     */
    @Test
    public void defineIndexTypeMapping(){
        try {
            XContentBuilder builder = XContentFactory.jsonBuilder();
            builder.startObject()
                    .startObject("test")
                    .startObject("properties")
                    .startObject("id").field("type","long").field("store","yes").endObject()
                    .startObject("name").field("type","string").field("store","yes").field("index","not_analyzed").endObject()
                    .endObject()
                    .endObject()
                    .endObject();
            PutMappingRequest mappingRequest = Requests.putMappingRequest("test_index").type("test").source(builder);
            client.admin().indices().putMapping(mappingRequest).actionGet();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 刪除index下的某個type
     */
    @Test
    public void deleteType(){
        if(client != null){
            client.prepareDelete().setIndex("test_index").setType("test").execute().actionGet();
        }
    }

這裏自定義了某個Type的索引映射(Mapping),默認ES會自動處理數據類型的映射:針對整型映射為long,浮點數為double,字符串映射為string,時間為date,true或false為boolean。

註意:針對字符串,ES默認會做“analyzed”處理,即先做分詞、去掉stop words等處理再index。如果你需要把一個字符串做為整體被索引到,需要把這個字段這樣設置:field(“index”, “not_analyzed”)。

索引數據

    /**
     * 批量索引
     */
    @Test
    public void indexData(){
        BulkRequestBuilder requestBuilder = client.prepareBulk();
        for(Person person : personList){
            String obj = getIndexDataFromHotspotData(person);
            if(obj != null){
                requestBuilder.add(client.prepareIndex("test_index","test",String.valueOf(person.getId())).setRefresh(true).setSource(obj));
            }
        }
        BulkResponse bulkResponse = requestBuilder.execute().actionGet();
        if(bulkResponse.hasFailures()){
            Iterator<BulkItemResponse> it = bulkResponse.iterator();
            while(it.hasNext()){
                BulkItemResponse itemResponse = it.next();
                if(itemResponse.isFailed()){
                    System.out.println(itemResponse.getFailureMessage());
                }
            }
        }
    }

    /**
     * 單個索引數據
     * @return
     */
    @Test
    public void indexHotspotData() {
        String jsonSource = getIndexDataFromHotspotData(new Person(1004,"jim"));
        if (jsonSource != null) {
            IndexRequestBuilder requestBuilder = client.prepareIndex("test_index",
                    "test").setRefresh(true);
            requestBuilder.setSource(jsonSource)
                    .execute().actionGet();
        }
    }
    public String getIndexDataFromHotspotData(Person p){
        String result = null;
        if(p != null){
            try {
                XContentBuilder builder = XContentFactory.jsonBuilder();
                builder.startObject().field("id",p.getId()).field("name",p.getName()).endObject();
                result = builder.string();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return result;
    }

查詢數據

ES支持分頁查詢獲取數據,也可以一次性獲取大量數據,需要使用Scroll Search,QueryBuilder是一個查詢條件

    public List<Long> searchData(QueryBuilder builder){
        List<Long> ids = new ArrayList<>();
        SearchResponse response = client.prepareSearch("test_index").setTypes("test").setQuery(builder).setSize(10).execute().actionGet();
        SearchHits hits = response.getHits();
        for(SearchHit hit : hits){
            Long id = (Long) hit.getSource().get("id");
            ids.add(id);
        }
        return ids;
    }

Elasticsearch Java API簡介