1. 程式人生 > >Elasticsearch-對併發衝突的解決(樂觀鎖、悲觀鎖)

Elasticsearch-對併發衝突的解決(樂觀鎖、悲觀鎖)

多個執行緒去同時訪問es中的一份資料,然後各自去修改之後更新到es,由於執行緒的先後順序不同,可能會導致後續的修改覆蓋掉之前的修改,顯然一些場景下我們是不允許發生這種併發衝突的問題,例如電商庫存的修改等

在es中,如何能夠解決這種併發衝突的問題?

  • 通過_version版本號的方式進行樂觀鎖併發控制

在es內部第次一建立document的時候,它的_version預設會是1,之後進行的刪除和修改的操作_version都會增加1。可以看到刪除一個document之後,再進行同一個id的document新增操作,版本號是加1而不是初始化為1,從而可以說明document並不是正真地被物理刪除,它的一些版本號資訊一樣會存在,而是會在某個時刻一起被清除。

在es後臺,有很多類似於replica同步的請求,這些請求都是非同步多執行緒的,對於多個修改請求是亂序的,因此會使用_version樂觀鎖來控制這種併發的請求處理。當後續的修改請求先到達,對應修改成功之後_version會加1,然後檢測到之前的修改到達會直接丟棄掉該請求;而當後續的修改請求按照正常順序到達則會正常修改然後_version在前一次修改後的基礎上加1(此時_version可能就是3,會基於之前修改後的狀態)。

es提供了一個外部版本號的樂觀控制方案來替代內部的_version。例如:

?version=1&version_type=external

和內在的_version的區別在於。對於內在_version=1,只有在後續請求滿足?_version=1的時候才能夠更新成功;對於外部_version=1,只有在後續請求滿足?_version>1才能夠修改成功。

  • 通過悲觀鎖的方式進行樂觀鎖併發控制

1.全域性鎖,通過doc來進行對整個index上鎖

一個執行緒進行操作之前建立一個鎖,例如:

PUT /lockindex/locktype/global/_create
{}

同時如果有另一個執行緒要進行相關更新操作,那麼同樣執行上述程式碼是會報錯。在上述執行緒執行完DELETE對應doc之後,該執行緒就可以重新獲取到doc的鎖從而執行自己的一些列操作。

這種方式,操作很簡單,但是鎖住了整個index,導致整個系統的併發能力很低。

2.document鎖,粒度更細的鎖
需要通過指令碼來實現:

POST /fs/lock/1/_update
{
  "upsert"
: { "process_id": 123 }, "script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';" "params": { "process_id": 123 } }

process_id,很重要,會在lock中,設定對對應的doc加鎖的程序的id,這樣其他程序過來的時候,才知道,這條資料已經被別人給鎖了
assert false,不是當前程序加鎖的話,則丟擲異常
ctx.op=’noop’,不做任何修改
params,裡面有個process_id,是你的要執行增刪改操作的程序的唯一id

對於同一個process_id的程序是都可以來修改doc,但是用不同的process_id去修改已經上鎖的其他process_id是會assert false拋錯。

3.共享鎖與排它鎖
共享鎖:資料是共享的,多個執行緒可以獲取同一個資料的共享鎖,然後對這個資料執行讀操作
排它鎖:只能有一個執行緒獲取排它鎖,然後執行更新操作

共享鎖與排他鎖是互斥的特性,如果有一個執行緒想要去修改一個數據,也就是獲取一個排它鎖,此時需要等待其他所有的共享鎖先釋放掉才能夠進行操作,反之亦然。

首先新增共享鎖,其他執行緒也可以來讀取資料:

judge-lock-2.groovy: if (ctx._source.lock_type == 'exclusive') { assert false }; ctx._source.lock_count++

POST /fs/lock/1/_update 
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
    "lang": "groovy",
    "file": "judge-lock-2"
  }
}

如果其他執行緒也需要獲取共享鎖,那麼執行上述同樣的程式碼即可,最終只是lock_count加1了:

GET /fs/lock/1

{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 3,
  "found": true,
  "_source": {
    "lock_type": "shared",
    "lock_count": 3
  }
}

當新增排他鎖的時候:

PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }

則會報錯

對共享鎖進行解鎖:

POST /fs/lock/1/_update
{
  "script": {
    "lang": "groovy",
    "file": "unlock-shared"
  }
}

新增過多少個共享鎖,對應的執行解鎖操作相應次數即可完全解鎖。每次解鎖lock_count對應減1,當為0的時候就將/fs/lock/1刪除

對應的解除排它鎖:

DELETE /fs/lock/1