1. 程式人生 > >elasticsearch(五)java 使用批量操作bulk及注意事項

elasticsearch(五)java 使用批量操作bulk及注意事項

1,BulkRequest物件可以用來在一次請求中,執行多個索引、更新或刪除操作

    且允許在一次請求中進行不同的操作,即一次請求中索引、更新、刪除操作可以同時存在

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new DeleteRequest("posts", "doc", "300"));
bulkRequest.add(new UpdateRequest("posts", "doc", "2").doc(XContentType.JSON,"other", "test").fetchSource(true));
bulkRequest.add(new IndexRequest("posts", "doc", "4").source(XContentType.JSON,"field", "baz"));

2,關於BulkRequest的引數設定,除了使用BulkRequest add(IndexRequest request)等方法加入針對單個不同的文件操作請求外,其它通用引數設定同單個文件操作設定:

bulkRequest.timeout(TimeValue.timeValueMinutes(2));
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);

注意,針對單個文件操作的設定,應該在add方法裡面設定,如為某個更新操作進行返回結果的設定【.fetchSource(true)】:

 bulkRequest.add(new UpdateRequest("posts", "doc", "2").doc(XContentType.JSON,"other", "test").fetchSource(true));

3,BulkResponse 作為執行結果的接收物件,它包含執行操作的資訊,且可以使用它來遍歷每個操作的執行結果

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { 
        IndexResponse indexResponse = (IndexResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { 
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}

注意的是:bulkItemResponse.getOpType() 返回的是請求問題的add方法加入的操作,而不是實際對文件進行操作的值,如新增到請求中的操作為

bulkRequest.add(new IndexRequest("posts", "doc2", "1").source(XContentType.JSON,"field", "foo"));

要是文件不存在,會自動建立一個,此時如下程式碼是執行的,也就是判斷是建立成功是正確的

if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
     IndexResponse indexResponse = (IndexResponse) itemResponse;
     System.out.println("id=" + indexResponse.getId() + "的文件建立成功");
     System.out.println("id=" + indexResponse.getId() + "文件操作型別:" + indexResponse.getResult());
}

但是要是文件存在,原來的文件會被更新(而非建立),如上程式碼依然執行,而如下判斷

bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE

返回的卻是false,所以要是想知道文件實際被進行的操作,可以通過如下程式碼進行:

DocWriteResponse itemResponse = bulkItemResponse.getResponse();
IndexResponse indexResponse = (IndexResponse) itemResponse;
indexResponse.getResult()

其中itemResponse.getResult()和indexResponse.getResult()都可以獲取實際的操作行為

4,如果elasticsearch伺服器中不存在對應的值為1的文件id,會自動建立一個id為1的文件

同樣,如果不存在posts文件庫的話,也會根index/type/id據自動建立整個文件

bulkRequest.add(new IndexRequest("posts", "doc", "1").source(XContentType.JSON,"field", "foo"));

但是類似如下,如果posts文件庫中如果已存在型別為doc的文件,則會報錯

bulkRequest.add(new IndexRequest("posts", "doc2", "1").source(XContentType.JSON,"field", "foo"));

報錯內容如下:

Rejecting mapping update to [posts] as the final mapping would have more than 1 type: [doc2, doc]

原因:在ElasticSearch6.0以後一個index下只能有一個type值,所以無法在posts下自動再建立一個新的型別的文件

程式碼中可以通過如下判斷是否出現這種非法的執行操作:

                if (bulkItemResponse.getFailure() != null) {
                    BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                    if(failure.getStatus() == RestStatus.BAD_REQUEST) {
                        System.out.println("id=" + bulkItemResponse.getId() + "為非法的請求!");
                        continue;
                    }
                }

對於IndexRequest請求操作,如果希望建立文件,而文件要是存在時不要進行更新的話,可以進行如下設定:

bulkRequest.add(new IndexRequest("posts", "doc", "5").source(XContentType.JSON,"field", "foo").opType(DocWriteRequest.OpType.CREATE));

即新增.opType(DocWriteRequest.OpType.CREATE)設定,同時failure.getStatus() == RestStatus.CONFLICT設定不丟擲異常

if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
    if(bulkItemResponse.getFailure() != null && bulkItemResponse.getFailure().getStatus() == RestStatus.CONFLICT) {
       System.out.println("id=" + bulkItemResponse.getId() + "與現在文件衝突");
       continue;
    }
    IndexResponse indexResponse = (IndexResponse) itemResponse;
    System.out.println("id=" + indexResponse.getId() + "的文件建立成功");
    System.out.println("id=" + indexResponse.getId() + "文件操作型別:" + itemResponse.getResult());
}

 

5,對於刪除操作,如果不作特別的判斷,如下的話,會一直都是會進入if方法執行的(即使文件不存在)

if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
    DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}

所以如果要想判斷文件不存在的情況,則需要如下判斷:

if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
    DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
        System.out.println("id=" + deleteResponse.getId() + "的文件未找到,未執行刪除!");
    }else {
        System.out.println("id=" + deleteResponse.getId() + "的文件刪除成功");
    }
}

6,完整程式碼示例:

package com.example.elasticsearch.document;

import org.apache.http.HttpHost;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: Weichang Zhong
 * @Date: 2018/11/7
 * @Time: 16:26
 * @Description:
 */
public class SynBulkRequest {

    public static void main(String[] args) {
        try (RestHighLevelClient client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("127.0.0.1", 9200, "http")
                )
        )) {

            BulkRequest bulkRequest = new BulkRequest();
            bulkRequest.add(new IndexRequest("posts", "doc", "5").source(XContentType.JSON,"field", "foo").opType(DocWriteRequest.OpType.CREATE));
            bulkRequest.add(new IndexRequest("posts2000", "doc", "2").source(XContentType.JSON,"field", "bar"));
            bulkRequest.add(new IndexRequest("posts", "doc", "3").source(XContentType.JSON,"field", "baz"));

            bulkRequest.add(new DeleteRequest("posts", "doc", "300"));
            bulkRequest.add(new UpdateRequest("posts", "doc", "2").doc(XContentType.JSON,"other", "test").fetchSource(true));
            bulkRequest.add(new IndexRequest("posts", "doc", "4").source(XContentType.JSON,"field", "baz"));

            bulkRequest.timeout(TimeValue.timeValueMinutes(2));
            bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
            BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);

            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                if (bulkItemResponse.getFailure() != null) {
                    BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                    System.out.println(failure.getCause());
                    if(failure.getStatus() == RestStatus.BAD_REQUEST) {
                        System.out.println("id=" + bulkItemResponse.getId() + "為非法的請求!");
                        continue;
                    }
                }

                DocWriteResponse itemResponse = bulkItemResponse.getResponse();

                if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
                    if(bulkItemResponse.getFailure() != null && bulkItemResponse.getFailure().getStatus() == RestStatus.CONFLICT) {
                        System.out.println("id=" + bulkItemResponse.getId() + "與現在文件衝突");
                        continue;
                    }
                    IndexResponse indexResponse = (IndexResponse) itemResponse;
                    System.out.println("id=" + indexResponse.getId() + "的文件建立成功");
                    System.out.println("id=" + indexResponse.getId() + "文件操作型別:" + itemResponse.getResult());
                } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
                    UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                    System.out.println("id=" + updateResponse.getId() + "的文件更新成功");
                    System.out.println("id=" + updateResponse.getId() +"文件內容為:" + updateResponse.getGetResult().sourceAsString());
                } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
                    DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                    if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
                        System.out.println("id=" + deleteResponse.getId() + "的文件未找到,未執行刪除!");
                    }else {
                        System.out.println("id=" + deleteResponse.getId() + "的文件刪除成功");
                    }
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注意:bulk批量操作裡是不允許執行get操作的,因為get操作和其它操作的引數是不同的,所以如下程式碼會報錯:

bulkRequest.add(new GetRequest("posts", "doc", "22"));