ElasticSearch學習筆記之三十 JAVA Client 之 Document APIs
阿新 • • 發佈:2018-11-06
ElasticSearch學習筆記之三十 JAVA Client 之 文件請求概述
- Document APIs(文件APIS)
- Index API
- Index Request(索引請求)
- Providing the document source(構建文件請求體)
- Optional arguments(功能引數)
- Synchronous Execution(同步執行)
- Asynchronous Execution(非同步執行)
- Index Response(索引返回)
- 完整案例如下:
Document APIs(文件APIS)
Java High Level REST Client
支援下面的文件APIS
單文件APIs
- Index API
- Get API
- Delete API
- Update API
多文件操作 APIs
- Bulk API
- Multi-Get API
Index API
Index Request(索引請求)
一個IndexRequest
就像下面案例一樣:
IndexRequest request = new IndexRequest(
"posts" , // Index
"doc", // Type
"1"); // Document id
//JSON字串請求體
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
request.source(jsonString, XContentType.JSON);
Providing the document source(構建文件請求體)
除了上面所示的字串示例之外,還可以以不同的方式構建文件請求體。
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
.source(jsonMap);
用Map
構建文件請求體會自動轉為JSON
格式。
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "kimchy");
builder.timeField("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
.source(builder);
ElasticSearch內建XContentBuilder
可以用來幫我們構建JSON
請求體。
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
.source("user", "kimchy",
"postDate", new Date(),
"message", "trying out Elasticsearch");
用鍵值對構建請求體也會自動轉為JSON
格式。
Optional arguments(功能引數)
下面的案例展示功能配置
request.routing("routing"); //Routing value
request.parent("parent"); //Parent value
//等待主分片響應的超時時間
request.timeout(TimeValue.timeValueSeconds(1));
//字串配置主分片響應的超時時間
request.timeout("1s");
//設定重新整理策略為WriteRequest.RefreshPolicy
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
//字元設定
request.setRefreshPolicy("wait_for");
//版本設定
request.version(2);
//設定版本型別
request.versionType(VersionType.EXTERNAL);
//設定操作型別為DocWriteRequest.OpType
request.opType(DocWriteRequest.OpType.CREATE);
//字串設定,可以配置create or update (default)
request.opType("create");
//設定在索引文件之前要執行的攝取管道的名稱
request.setPipeline("pipeline");
Synchronous Execution(同步執行)
IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);
Asynchronous Execution(非同步執行)
索引請求的非同步執行需要將IndexRequest例項和ActionListener例項傳遞給非同步方法:
client.indexAsync(request/*需要執行的IndexRequest*/, RequestOptions.DEFAULT, listener/*執行完成之後的回撥*/);
非同步執行不會堵塞並且立即返回,一旦完成,如果執行成功完成,則使用onResponse
方法回撥ActionListener,如果執行失敗,則使用onFailure
方法回撥ActionListener。
IndexResponse
典型的ActionListener
例如:
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
//呼叫成功時回撥,返回資訊作為引數傳入
@Override
public void onResponse(IndexResponse indexResponse) {
}
//呼叫失敗時回撥,錯誤資訊作為引數傳入
@Override
public void onFailure(Exception e) {
}
};
Index Response(索引返回)
從IndexResponse
獲取返回的響應資訊方式如下:
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
//文件建立成功操作
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
//文件更新成功操作
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
//檢查成功的分片是不是等於總分片
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
//獲取分片失敗的原因
String reason = failure.reason();
}
}
如果版本衝突,我們會得到這樣的ElasticsearchException
:
IndexRequest request = new IndexRequest("posts", "doc", "1")
.source("field", "value")
.version(1);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
//版本衝突錯誤
if (e.status() == RestStatus.CONFLICT) {
}
}
當我們把opType
設定為 create
但是存在相同的 index, type 和 id :
IndexRequest request = new IndexRequest("posts", "doc", "1")
.source("field", "value")
.opType(DocWriteRequest.OpType.CREATE);
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
}
完整案例如下:
配置Maven pom.xml依賴(改為對應ES版本):
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.2.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/transport -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.4</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>6.2.4</version>
</dependency>
這裡,我們使用 ObjectMapper
將JavaBean
轉為JSON
請求體,對應有以下依賴:
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.6</version>
</dependency>
//建立連線
//建立連線
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("192.168.199.18", 9200, "http"),
new HttpHost("192.168.199.118", 9200, "http")));
//建立請求
IndexRequest request = new IndexRequest(
"posts", // Index
"doc", // Type
"1"); // Document id
//構建請求體(這裡演示ObjectMapper將JavaBean轉為JSON請求體)
JavaBean bean = new JavaBean();
ObjectMapper objectMapper = new ObjectMapper();//create once reuse
// generate json
String json = null;
try {
json = objectMapper.writeValueAsString(bean);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
request.source(json, XContentType.JSON);
//設定操作型別為DocWriteRequest.OpType
request.opType(DocWriteRequest.OpType.CREATE);
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
//呼叫成功時回撥,返回資訊作為引數傳入
@Override
public void onResponse(IndexResponse indexResponse) {
System.out.println("非同步回撥成功");
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
//文件建立成功操作
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
System.out.println(index+ type+ id+ version);
//文件更新成功操作
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
}
}
//呼叫失敗時回撥,錯誤資訊作為引數傳入
@Override
public void onFailure(Exception e) {
e.printStackTrace();
}
};
//非同步操作
//client.indexAsync(request/*需要執行的IndexRequest*/, listener/*執行完成之後的回撥*/);
//同步操作
try {
client.index(request);
} catch (IOException e) {
e.printStackTrace();
}
//關閉連線
if(client !=null){
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}