1. 程式人生 > >elasticsearch(一)java 分別使用同步和非同步方法進行索引、更新操作

elasticsearch(一)java 分別使用同步和非同步方法進行索引、更新操作

一、索引或更新基本步驟

1) 建立與elasticsearch服務進行連線的RestHighLevelClient物件

        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9200, "http")
                )
        );

2)將文件內容以一個XContentBuilder 物件的方式進行建立,elasticsearch內容助手會根據該物件自動生成json格式內容進行儲存

        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.field("user", "kimchy");
            builder.timeField("postDate", new Date());
            builder.field("message", "trying out Elasticsearch");
        }
        builder.endObject();

3) 建立IndexRequest  索引請求物件,並將XContentBuilder 作為引數傳入其source方法

        IndexRequest indexRequest = new IndexRequest("posts", "doc", "16")
                .source(builder);
posts為索引庫,doc為型別,1為指定的文件id

4)其它引數設定

        indexRequest.timeout(TimeValue.timeValueSeconds(5));
        indexRequest.opType(DocWriteRequest.OpType.INDEX);

a,如果操作設定為DocWriteRequest.OpType.INDEX(預設值),如果文件存在,則更新文件;如果文件不存在,則建立文件

b,如果操作設定為DocWriteRequest.OpType.CREATE,則是指定為建立文件操作,如果物件的文件(根據id判斷)存在,則報錯如下:

ElasticsearchStatusException[Elasticsearch exception 
[type=version_conflict_engine_exception, 
reason=[doc][16]: version conflict, document already exists

索引操作只能為以上兩種操作值,不能為UPDATE和DELETE

二、進行同步請求:使用client.index(indexRequest, RequestOptions.DEFAULT);方法

     try {
            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            String index = indexResponse.getIndex();
            String type = indexResponse.getType();
            String id = indexResponse.getId();
            long version = indexResponse.getVersion();
            if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                System.out.println("新增成功");
                System.out.println("type:" + type);
                System.out.println("id:" + id);
                System.out.println("version:" + version);
                System.out.println("index:" + index);
            } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                System.out.println("更新成功");
                System.out.println("index:" + index);
                System.out.println("type:" + type);
                System.out.println("id:" + id);
                System.out.println("version:" + version);
            }
        }catch (ElasticsearchException e) {
            if (e.status() == RestStatus.CONFLICT) {
                System.out.println("建立的文件與已存在的發生衝突");
            }
        }

三、非同步請求:

1)建立非同步請求的,回撥物件:ActionListener<IndexResponse>

如果執行成功,會自動呼叫onResponse方法,如果執行失敗,會回撥onFailure方法

可以從傳入的IndexResponse和Exception型別引數中獲取相關建立情況資訊

        ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
            @Override
            public void onResponse(IndexResponse indexResponse) {
                String index = indexResponse.getIndex();
                String type = indexResponse.getType();
                String id = indexResponse.getId();
                long version = indexResponse.getVersion();
                if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                    System.out.println("新增成功");
                    System.out.println("type:" + type);
                    System.out.println("id:" + id);
                    System.out.println("version:" + version);
                    System.out.println("index:" + index);
                } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                    System.out.println("更新成功");
                    System.out.println("index:" + index);
                    System.out.println("type:" + type);
                    System.out.println("id:" + id);
                    System.out.println("version:" + version);
                }
                ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                if (shardInfo.getTotal() != shardInfo.getSuccessful()) {

                }
                if (shardInfo.getFailed() > 0) {
                    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                        String reason = failure.reason();
                    }
                }
            }

            @Override
            public void onFailure(Exception e) {
                e.printStackTrace();
            }
        };

2)進行非同步請求:將請求物件、和回撥物件作為引數傳入

client.indexAsync(indexRequest, RequestOptions.DEFAULT, listener);

三、完整示例程式碼如下:

1)同步方法程式碼示例:

package com.example.elasticsearch.main;



import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;

import java.util.Date;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: Weichang Zhong
 * @Date: 2018/11/6
 * @Time: 15:16
 * @Description:
 */
public class Test {
    public static void main(String[] args) throws Exception{
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9200, "http")
                )
        );

        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.field("user", "kimchy");
            builder.timeField("postDate", new Date());
            builder.field("message", "trying out Elasticsearch");
        }
        builder.endObject();
        IndexRequest indexRequest = new IndexRequest("posts", "doc", "20")
                .source(builder);
        indexRequest.timeout(TimeValue.timeValueSeconds(5));
        indexRequest.opType(DocWriteRequest.OpType.INDEX);
        
        try {
            IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
            String index = indexResponse.getIndex();
            String type = indexResponse.getType();
            String id = indexResponse.getId();
            long version = indexResponse.getVersion();
            if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                System.out.println("新增成功");
                System.out.println("type:" + type);
                System.out.println("id:" + id);
                System.out.println("version:" + version);
                System.out.println("index:" + index);
            } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                System.out.println("更新成功");
                System.out.println("index:" + index);
                System.out.println("type:" + type);
                System.out.println("id:" + id);
                System.out.println("version:" + version);
            }
        }catch (ElasticsearchException e) {
            if (e.status() == RestStatus.CONFLICT) {
                System.out.println("建立的文件與已存在的發生衝突");
            }
        }

        client.close();
    }
}

2)非同步方法程式碼示例 

package com.example.elasticsearch.main;



import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;

import java.util.Date;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: Weichang Zhong
 * @Date: 2018/11/6
 * @Time: 15:16
 * @Description:
 */
public class Test {
    public static void main(String[] args) throws Exception{
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9200, "http")
                )
        );

        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.field("user", "kimchy");
            builder.timeField("postDate", new Date());
            builder.field("message", "trying out Elasticsearch");
        }
        builder.endObject();
        IndexRequest indexRequest = new IndexRequest("posts", "doc", "16")
                .source(builder);
        indexRequest.timeout(TimeValue.timeValueSeconds(5));
        indexRequest.opType(DocWriteRequest.OpType.CREATE);

        ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
            @Override
            public void onResponse(IndexResponse indexResponse) {
                String index = indexResponse.getIndex();
                String type = indexResponse.getType();
                String id = indexResponse.getId();
                long version = indexResponse.getVersion();
                if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                    System.out.println("新增成功");
                    System.out.println("type:" + type);
                    System.out.println("id:" + id);
                    System.out.println("version:" + version);
                    System.out.println("index:" + index);
                } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                    System.out.println("更新成功");
                    System.out.println("index:" + index);
                    System.out.println("type:" + type);
                    System.out.println("id:" + id);
                    System.out.println("version:" + version);
                }
            }

            @Override
            public void onFailure(Exception e) {
                ElasticsearchException elasticsearchException = (ElasticsearchException)e;
                if (elasticsearchException.status() == RestStatus.CONFLICT) {
                    System.out.println("建立的文件已存在");
                }
            }
        };
     
        client.indexAsync(indexRequest, RequestOptions.DEFAULT, listener);


//        client.close();
    }
}

注意:

1,//        client.close(); 如果不被註釋掉,可能還沒有將請求傳送出去,連線就會被關閉,從而建立或更新失敗。
所以如上程式碼的非同步請求中,將此行註釋掉以進行實驗

2,IndexRequest indexRequest = new IndexRequest("posts", "doc", null)

建立請求物件的id設定為null,則每次執行後都會自動生成一個新的id

英文連結:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.4/java-rest-high-document-index.html