1. 程式人生 > >Elasticsearch Document Update API詳解、原理與示例

Elasticsearch Document Update API詳解、原理與示例

   本文將詳細介紹單文件(Document)的更新API,其更新API如下:

  • public final UpdateResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException
  • public final void updateAsync(UpdateRequest updateRequest, RequestOptions options, ActionListener listener)
       其核心需要關注UpdateRequest。
    1、UpdateRequest詳解
       UpdateRequest的核心類圖如圖所示:
    在這裡插入圖片描述

    我們首先來看一下UpdateRequest的核心屬性:
  • protected ShardId shardId:指定需要執行的分片資訊。
  • protected String index:索引庫,類似關係型資料庫的database。
  • private String type:型別名,類似於關係資料庫的table(表)。
  • private String id :文件ID,所謂的文件,類似於關係資料庫的行,id,類似於關係資料庫的主鍵ID。
  • private String routing:分片值,預設為id的值,elasticsearch的分片路由演算法為( hashcode(routing) % primary_sharding_count(主分片個數) )。
  • private String parent:
  • Script script:通過腳步更新文件。
  • private String[] fields:指定更新操作後,需要返回的文件的欄位資訊,預設為不返回,已廢棄,被fetchSourceContext取代。
  • private FetchSourceContext fetchSourceContext:執行更新操作後,如果命中,需要返回_source的上下文配置,與fields的區別是fetchSourceContext支援萬用字元表示式來匹配欄位名,其詳細已經在《Elasticsearch Document Get API詳解、原理與示例》中詳細介紹過。
  • private long version = Versions.MATCH_ANY:版本號。
  • private VersionType versionType = VersionType.INTERNAL:版本型別,分為內部版本、外部版本,預設為內部版本。
  • private int retryOnConflict = 0:更新衝突時重試次數。
  • private RefreshPolicy refreshPolicy = RefreshPolicy.NONE:重新整理策略。NONE:代表不重試;
  • private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT:執行操作之前需要等待啟用的副本數,已在《Elasticsearch Document Get API詳解、原理與示例》中詳細介紹。
  • private IndexRequest upsertRequest:使用該 欄位進行更新操作,如果原索引不存在,則更新,類似於saveOrUpdate操作,該操作需要與腳步執行,詳細將在後續章節中描述,
  • private boolean scriptedUpsert = false;是否是用腳步執行更新操作。
  • private boolean docAsUpsert = false; 是否使用saveOrUpdate模式,即是否使用IndexRequest upsertRequest進行更新操作。(docAsUpser=true+ doc組合,將使用saveOrUpdate模式)。
  • private boolean detectNoop = true;是否檢查空操作,下文會進行詳細介紹。
  • private IndexRequest doc;預設使用該請求進行更新操作。
       從上述我們基本可以得知更新基本有3種方式,script、upsert、doc(普通更新)。

2、深入分析Elasticsearch Update API(更新API)
2.1 Script腳步更新
   Elasticsearch可以通過指令碼(painless)進行更新,其具體語法見:https://www.elastic.co/guide/en/elasticsearch/painless/current/index.html ,,本節不會深入去學習其語法,後續會看單獨的章節對其進行詳細講解。

2.2 部分欄位更新(普通更新方式)
   更新API支援傳遞一個部分文件(_source欄位中包含型別的部門欄位),它將被合併到現有的文件中(簡單的遞迴合併,物件的內部合併,替換核心的“鍵/值”和陣列)。如果需要完全替代現有的文件,請使用(Index API)。以下部分更新為現有文件添加了一個新欄位:(下文會給出基於java的API呼叫)。

POST test/_doc/1/_update
{
    "doc" : {
        "name" : "new_name"
    }
}

   如果指定了doc和script,則script屬性優先,關於更新API一個比較好的實踐是使用腳步更新(painless),後續會重點章節詳細介紹。

2.3 檢測空更新(檢測本請求是否值得更新)
   該功能特性的意思是當提交的請求,發現與原文件的資料並未傳送變化,是否執行update操作,預設檢測。如果開啟檢測,detectNoop=true,如果檢測到資料並未發生變化,則返回結果為noop(空操作),如果detectNoop=false,每次操作都會執行,版本號將自增。

2.4 儲存或更新(Upserts)
   如果文件還不存在,upsert元素的內容將作為新文件插入。Elasticsearch支援scripted_upsert和doc_as_upsert兩種模式,以scripted_upsert優先。通過UpdateRequest#scriptedUpsert和UpdateRequest#docAsUpsert控制。

2.5 核心引數一覽表
   更新API主要核心引數一覽表

引數名 說明
retry_on_conflict Elasticsearch基於版本進行樂觀鎖控制,當版本衝突後,允許的重試次數,超過重試次數retry_on_conflict後丟擲異常。
routing 路由策略。
timeout 等待分片的超時時間。
wait_for_active_shards 在執行命令之前需要等待副本的數量。
refresh 重新整理機制。
_source
version 版本欄位,基於樂觀鎖控制。

   注意:更新API不支援除內部以外的版本控制,外部(版本型別外部和外部的)或強制(版本型別的force)版本控制不受更新API的支援,因為它會導致彈性搜尋版本號與外部系統不同步。

3、Update API使用示例
   本節將暫時不會展示使用腳步進行更新的Demo,此部分會在後續文章中單獨的章節來介紹ElasticSearch painless Script。

3.1 常規更新(更新部分欄位)

public static void testUpdate_partial() {
		RestHighLevelClient client = EsClient.getClient();
		try {
			UpdateRequest request = new UpdateRequest("twitter", "_doc", "10");
			IndexRequest indexRequest = new IndexRequest("twitter", "_doc", "10");
			Map<String, String> source = new HashMap<>();
			source.put("user", "dingw2");
			indexRequest.source(source);
			request.doc(indexRequest);
			UpdateResponse result = client.update(request, RequestOptions.DEFAULT);
			System.out.println(result);
			testGet();
		} catch (Throwable e) {
			e.printStackTrace();
		} finally {
			EsClient.close(client);
		}
	}

   最終結果:呼叫get API能反映出user欄位已經更新為dingw2,及更新成功。

3.2 開啟detectNoop(detectNoop=true),並且並不改變資料

public static void testUpdate_noop() {
		RestHighLevelClient client = EsClient.getClient();
		try {
			UpdateRequest request = new UpdateRequest("twitter", "_doc", "10");
			request.detectNoop(true);
			request.doc(buildIndexRequest());
			
			UpdateResponse result = client.update(request, RequestOptions.DEFAULT);
			System.out.println(result);
		} catch (Throwable e) {
			e.printStackTrace();
		} finally {
			EsClient.close(client);
		}
	}

   返回結果:

{
   "_shards": {
        "total": 0,
        "successful": 0,
        "failed": 0
   },
   "_index": "twitter",
   "_type": "_doc",
   "_id": "10",
   "_version": 6,
   "result": "noop"
}

   其特徵為result為noop,並且_shards各個欄位都返回0,表示沒有在任何分片上執行該動作,並且資料的版本_version並不會傳送變化。

3.3 不開啟detectNoop(detectNoop=false),並且並不改變資料

public static void testUpdate_no_noop() {
		RestHighLevelClient client = EsClient.getClient();
		try {
			UpdateRequest request = new UpdateRequest("twitter", "_doc", "10");
			request.detectNoop(false);
			request.doc(buildIndexRequest());
			UpdateResponse result = client.update(request, RequestOptions.DEFAULT);
			System.out.println(result);
		} catch (Throwable e) {
			e.printStackTrace();
		} finally {
			EsClient.close(client);
		}
	}

   返回結果:

{
   "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
   },
   "_index": "twitter",
   "_type": "_doc",
   "_id": "10",
   "_version": 7,
   "result": "updated"
}

   其主要特徵表現為result=updated,表示執行的動作為更新,並且版本號自增1,_shards反饋的是各分片的執行情況。

3.4 saveOrUpdate更新模式(upsert)

/**
	 * 更新操作,原記錄不存在,使用saveOrUpdate模式。
	 */
	public static void testUpdate_upsert() {
		RestHighLevelClient client = EsClient.getClient();
		try {
			UpdateRequest request = new UpdateRequest("twitter", "_doc", "11");
			IndexRequest indexRequest = new IndexRequest("twitter", "_doc", "11");
			Map<String, String> source = new HashMap<>();
			source.put("user", "dingw");
			source.put("post_date", "2009-11-17T14:12:12");
			source.put("message", "hello,update upsert。");
			
			indexRequest.source(source);
			request.doc(indexRequest);
			request.docAsUpsert(true);
			UpdateResponse result = client.update(request, RequestOptions.DEFAULT);
			System.out.println(result);
		} catch (Throwable e) {
			e.printStackTrace();
		} finally {
			EsClient.close(client);
		}
	}

   返回結果:

{
   "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
   },
   "_index": "twitter",
   "_type": "_doc",
   "_id": "11",
   "_version": 1,
   "result": "created"
}

   返回結果其核心表現為:result:created,表示是一個新增操作。
   Document API就講解到這裡了,本節詳細介紹了Document Update API的核心關鍵點以及實現要點,最後給出Demo展示如何在JAVA中使用Update API。