1. 程式人生 > >ElasticSearch學習筆記之三十 JAVA Client 之 Document APIs

ElasticSearch學習筆記之三十 JAVA Client 之 Document APIs

ElasticSearch學習筆記之三十 JAVA Client 之 文件請求概述

Document APIs(文件APIS)

Java High Level REST Client支援下面的文件APIS

單文件APIs

  • Index API
  • Get API
  • Delete API
  • Update API

多文件操作 APIs

  • Bulk API
  • Multi-Get API

Index API

Index Request(索引請求)

一個IndexRequest就像下面案例一樣:

IndexRequest request = new IndexRequest(
        "posts"
, // Index "doc", // Type "1"); // Document id //JSON字串請求體 String jsonString = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; request.source(jsonString, XContentType.JSON);

Providing the document source(構建文件請求體)

除了上面所示的字串示例之外,還可以以不同的方式構建文件請求體。

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source(jsonMap); 

Map構建文件請求體會自動轉為JSON格式。

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("user", "kimchy");
    builder.timeField("postDate", new Date());
    builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source(builder);  

ElasticSearch內建XContentBuilder可以用來幫我們構建JSON請求體。

IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source("user", "kimchy",
                "postDate", new Date(),
                "message", "trying out Elasticsearch"); 

用鍵值對構建請求體也會自動轉為JSON格式。

Optional arguments(功能引數)

下面的案例展示功能配置

request.routing("routing"); //Routing value
request.parent("parent"); //Parent value
//等待主分片響應的超時時間
request.timeout(TimeValue.timeValueSeconds(1)); 
//字串配置主分片響應的超時時間
request.timeout("1s"); 
//設定重新整理策略為WriteRequest.RefreshPolicy
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
//字元設定
request.setRefreshPolicy("wait_for");                            
//版本設定
request.version(2); 
//設定版本型別
request.versionType(VersionType.EXTERNAL); 
//設定操作型別為DocWriteRequest.OpType
request.opType(DocWriteRequest.OpType.CREATE); 
//字串設定,可以配置create or update (default)
request.opType("create"); 
//設定在索引文件之前要執行的攝取管道的名稱
request.setPipeline("pipeline"); 

Synchronous Execution(同步執行)

IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);

Asynchronous Execution(非同步執行)

索引請求的非同步執行需要將IndexRequest例項和ActionListener例項傳遞給非同步方法:

client.indexAsync(request/*需要執行的IndexRequest*/, RequestOptions.DEFAULT, listener/*執行完成之後的回撥*/); 

非同步執行不會堵塞並且立即返回,一旦完成,如果執行成功完成,則使用onResponse方法回撥ActionListener,如果執行失敗,則使用onFailure方法回撥ActionListener。

IndexResponse 典型的ActionListener例如:

ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
    //呼叫成功時回撥,返回資訊作為引數傳入
    @Override
    public void onResponse(IndexResponse indexResponse) {
        
    }

	//呼叫失敗時回撥,錯誤資訊作為引數傳入
    @Override
    public void onFailure(Exception e) {
        
    }
};

Index Response(索引返回)

IndexResponse獲取返回的響應資訊方式如下:

String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
//文件建立成功操作
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
    //文件更新成功操作
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
//檢查成功的分片是不是等於總分片
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
     	//獲取分片失敗的原因
        String reason = failure.reason(); 
    }
}

如果版本衝突,我們會得到這樣的ElasticsearchException:

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .version(1);
try {
    IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
	//版本衝突錯誤
    if (e.status() == RestStatus.CONFLICT) {
        
    }
}

當我們把opType 設定為 create 但是存在相同的 index, type 和 id :

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .opType(DocWriteRequest.OpType.CREATE);
try {
    IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        
    }
}

完整案例如下:

配置Maven pom.xml依賴(改為對應ES版本):

<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<dependency>
	<groupId>org.elasticsearch</groupId>
	<artifactId>elasticsearch</artifactId>
	<version>6.2.4</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.2.4</version>
</dependency>

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>6.2.4</version>
</dependency>

這裡,我們使用 ObjectMapperJavaBean轉為JSON請求體,對應有以下依賴:

<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.9.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.6</version>
</dependency>
        //建立連線
        //建立連線
        RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("192.168.199.18", 9200, "http"),
                        new HttpHost("192.168.199.118", 9200, "http")));


        //建立請求
        IndexRequest request = new IndexRequest(
                "posts", // Index
                "doc",  // Type
                "1");   // Document id

        //構建請求體(這裡演示ObjectMapper將JavaBean轉為JSON請求體)
        JavaBean bean = new JavaBean();

        ObjectMapper objectMapper = new ObjectMapper();//create once reuse
        // generate json
        String json = null;
        try {
            json = objectMapper.writeValueAsString(bean);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        request.source(json, XContentType.JSON);

        //設定操作型別為DocWriteRequest.OpType
        request.opType(DocWriteRequest.OpType.CREATE);


        ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
            //呼叫成功時回撥,返回資訊作為引數傳入
            @Override
            public void onResponse(IndexResponse indexResponse) {
                System.out.println("非同步回撥成功");
                String index = indexResponse.getIndex();
                String type = indexResponse.getType();
                String id = indexResponse.getId();
                long version = indexResponse.getVersion();
                //文件建立成功操作
                if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                    System.out.println(index+ type+ id+ version);
                    //文件更新成功操作
                } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {

                }
            }

            //呼叫失敗時回撥,錯誤資訊作為引數傳入
            @Override
            public void onFailure(Exception e) {
                e.printStackTrace();
            }
        };

        //非同步操作
        //client.indexAsync(request/*需要執行的IndexRequest*/, listener/*執行完成之後的回撥*/);

		//同步操作
        try {
            client.index(request);
        } catch (IOException e) {
            e.printStackTrace();
        }
        //關閉連線
        if(client !=null){
            try {
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }