1. 程式人生 > >ElasticSearch學習筆記之三十二 JAVA Client 之 Exists Delete Update APIs

ElasticSearch學習筆記之三十二 JAVA Client 之 Exists Delete Update APIs

ElasticSearch學習筆記之三十二 JAVA Client 之 Exists Delete Update APIs

Exists API

如果文件存在的化exists API 會返回true

,否則false

Exists Request

Exists Request就像Get API一樣使用GetRequest. 也支援Get API的所有功能引數. 由於exists() 只返回true 或者 false,我們建議關閉fetching _sourcestored fields 從而讓我們的請求更加輕量級。

GetRequest getRequest = new GetRequest(
    "posts", //Index
    "doc",  // Type
    "1");    // Document id
 //禁用fetching _source
getRequest.
fetchSourceContext(new FetchSourceContext(false)); //禁用fetching stored fields getRequest.storedFields("_none_");

Synchronous Execution(同步執行)

boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);

Asynchronous Execution(非同步執行)

索引請求的非同步執行需要將GetRequest例項和ActionListener例項傳遞給非同步方法:

client.existsAsync(getRequest/*需要執行的GetRequest*/, RequestOptions.DEFAULT, listener/*執行完成之後的回撥*/); 

非同步執行不會堵塞並且立即返回,一旦完成,如果執行成功完成,則使用onResponse方法回撥ActionListener,如果執行失敗,則使用onFailure方法回撥ActionListener。

典型的GetResponse:

ActionListener<Boolean> listener = new ActionListener<Boolean>() {
    //呼叫成功時回撥,返回資訊作為引數傳入
    @Override
    public void onResponse(Boolean exists) {
        
    }
    //呼叫失敗時回撥,錯誤資訊作為引數傳入
    @Override
    public void onFailure(Exception e) {
        
    }
};

Delete API

Delete Request

DeleteRequest形如:

DeleteRequest request = new DeleteRequest(
        "posts",   //Index 
        "doc",   //Type  
        "1");   //Document id   

Optional arguments

下面的案例展示功能配置:

//Routing value
request.routing("routing"); 
//Parent value
request.parent("parent"); 
//等待主分片響應的超時時間
request.timeout(TimeValue.timeValueSeconds(1)); 
//字串配置主分片響應的超時時間
request.timeout("1s"); 
//設定重新整理策略為WriteRequest.RefreshPolicy
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
//字元設定
request.setRefreshPolicy("wait_for");                            
//版本設定
request.version(2); 
//設定版本型別
request.versionType(VersionType.EXTERNAL); 

Synchronous Execution(同步執行)

DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);

Asynchronous Execution(非同步執行)

索引請求的非同步執行需要將DeleteRequest例項和ActionListener例項傳遞給非同步方法:

client.deleteAsync(request/*需要執行的DeleteRequest*/, RequestOptions.DEFAULT, listener/*執行完成之後的回撥*/); 

非同步執行不會堵塞並且立即返回,一旦完成,如果執行成功完成,則使用onResponse方法回撥ActionListener,如果執行失敗,則使用onFailure方法回撥ActionListener。

DeleteResponse典型案例

ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
	//呼叫成功時回撥,返回資訊作為引數傳入
    @Override
    public void onResponse(DeleteResponse deleteResponse) {
        
    }

	//呼叫失敗時回撥,錯誤資訊作為引數傳入
    @Override
    public void onFailure(Exception e) {
        
    }
};

Delete Response

DeleteResponse獲取返回的響應資訊方式如下:

String index = deleteResponse.getIndex();
String type = deleteResponse.getType();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
//檢查成功的分片是不是等於總分片
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
    	//獲取分片失敗的原因
        String reason = failure.reason(); 
    }
}

也可以用於檢查文件是否找到:

DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
//如果文件沒有找到
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
    
}

如果版本衝突,我們會得到這樣的ElasticsearchException:

try {
    DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2);
    DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        
    }
}

Update API

Update Request

UpdateRequest形如:

UpdateRequest request = new UpdateRequest(
        "posts", //Index
        "doc",  //Type
        "1");   //Document id

Update API允許通過使用指令碼或通過傳遞部分文件來更新現有文件。

Updates with a script(指令碼更新)

指令碼案例如下:

//Map
Map<String, Object> parameters = singletonMap("count", 4); 
//用painless語言和Map引數構建指令碼
Script inline = new Script(ScriptType.INLINE, "painless",
        "ctx._source.field += params.count", parameters);  
//把指令碼設定給UpdateRequest
request.script(inline);  

或者可以作為儲存指令碼:

Script stored =
        new Script(ScriptType.STORED, null, "increment-field", parameters); 
//把指令碼設定給UpdateRequest 
request.script(stored);  

Updates with a partial document(更新部分文件)

當使用部分文件進行更新時,部分文件將與現有文件合併。

部分文件的生成有如下方式:

JSON字串資料

UpdateRequest request = new UpdateRequest("posts", "doc", "1");
String jsonString = "{" +
        "\"updated\":\"2017-01-01\"," +
        "\"reason\":\"daily update\"" +
        "}";
request.doc(jsonString, XContentType.JSON); 

Map轉JSON

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(jsonMap); 

XContentBuilder工具轉JSON

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.timeField("updated", new Date());
    builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(builder);  

鍵值對構建

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("updated", new Date(),
             "reason", "daily update"); 

Upserts

當文件不存在的時候我們可以使用upsert方式新插入文件:

String jsonString = "{\"created\":\"2017-01-01\"}";
//字串upsert
request.upsert(jsonString, XContentType.JSON);  

與部分文件更新類似,可以使用接受String、Map、XContentBuilder或Object鍵值對的方法來定義upsert文件的內容。

Optional arguments

下面的案例展示功能配置:

//Routing value
request.routing("routing"); 
//Parent value
request.parent("parent"); 
//等待主分片響應的超時時間
request.timeout(TimeValue.timeValueSeconds(1)); 
//字串配置主分片響應的超時時間
request.timeout("1s"); 
//設定重新整理策略為WriteRequest.RefreshPolicy
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
//字元設定
request.setRefreshPolicy("wait_for");                            
//設定重試更新操作的次數(如果要更新的文件已經由更新操作的獲取和索引階段之間的另一個操作更改)
request.retryOnConflict(3); 
//啟用源檢索,預設情況下禁用
request.fetchSource(true); 
String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
//配置source include欄位
request.fetchSource(new FetchSourceContext(true, includes, excludes)); 
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"updated"};
//配置source exclude欄位
request.fetchSource(new FetchSourceContext(true, includes, excludes)); 
//版本設定
request.version(2); 
//禁用noop detection
request.detectNoop(false); 
//設定無論文件是否存在,指令碼都必須執行(如果文件不存在,則指令碼負責建立文件)。
request.scriptedUpsert(true); 
//指明部分更新文件假如文件不存在的時候按照upsert處理
request.docAsUpsert(true); 
//設定在更新操作之前必須處於活躍狀態的分片副本的數量。
request.waitForActiveShards(2); 
//活躍狀態的分片副本的數量可以取值ActiveShardCount.ALL, ActiveShardCount.ONE or ActiveShardCount.DEFAULT (default)
request.waitForActiveShards(ActiveShardCount.ALL); 

Synchronous Execution(同步執行)

UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);

Asynchronous Execution(非同步執行)

索引請求的非同步執行需要將UpdateRequest例項和ActionListener例項傳遞給非同步方法

client.updateAsync(request/*需要執行的UpdateRequest*/, RequestOptions.DEFAULT, listener/*執行完成之後的回撥*/); 

非同步執行不會堵塞並且立即返回,一旦完成,如果執行成功完成,則使用onResponse方法回撥ActionListener,如果執行失敗,則使用onFailure方法回撥ActionListener。

典型的UpdateResponse :

ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
	//呼叫成功時回撥,返回資訊作為引數傳入
    @Override
    public void onResponse(UpdateResponse updateResponse) {
        
    }
	//呼叫失敗時回撥,錯誤資訊作為引數傳入
    @Override
    public void onFailure(Exception e) {
        
    }
};

Update Response

UpdateResponse獲取返回的響應資訊方式如下:

String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
//文件第一次被建立9(upsert)
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    //文件被更新
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    //文件被刪除
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
    //處理文件未受更新影響的情況,即在文件上沒有執行操作(NOOP)
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
    
}

當通過fetchSource方法在UpdateRequest中啟用源檢索時,響應包含更新的文件的源

//按GETResult返回UpdateRequest
GetResult result = updateResponse.getGetResult(); 
if (result.isExists()) {
	//source做字串返回
    String sourceAsString = result.sourceAsString(); 
    //做Map<String, Object>返回
    Map<String, Object> sourceAsMap = result.sourceAsMap(); 
    //按byte[]返回source
    byte[] sourceAsBytes = result.source(); 
//當文件的源不存在於響應中的情況(預設情況下是這樣)
} else {
    
}

我們也可以獲取分片失敗的資訊

ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
//檢查成功的分片是不是等於總分片
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
    	//獲取失敗的原因
        String reason = failure.reason(); 
    }
}

UpdateRequest操作在一個不存在的文件的時候,我們會得到一個404響應碼,和一下如下的ElasticsearchException:

UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist")
        .doc("field", "value");
try {
    UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
	//文件沒有找到
    if (e.status() == RestStatus.NOT_FOUND) {
        
    }
}

如果出現版本衝突, 會得到下面的ElasticsearchException :

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("field", "value")
        .version(1);
try {
    UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
	//版本衝突
    if (e.status() == RestStatus.CONFLICT) {
        
    }
}