1. 程式人生 > >如何通過Java中的物件值來同步塊

如何通過Java中的物件值來同步塊

問題

有時,我們需要通過變數的值來同步程式碼塊。

為了解這個問題,我們將考慮一個簡單的銀行應用程式,它對客戶的每次轉賬進行以下操作:

  1. 通過此外部Web服務轉移評估現金返還金額(CashBackService
  2. 在資料庫中執行匯款(AccountService
  3. 更新現金返還評估系統中的資料(CashBackService

匯款操作如下:

public  void  withdrawMoney(UUID  userId,int  amountOfMoney){
    synchronized(userId){  
        結果 result  =  externalCashBackService。evaluateCashBack(userId,amountOfMoney);
        accountService。轉移(使用者id,amountOfMoney  +  結果。getCashBackAmount());
        externalCashBackService。cashBackComplete(使用者id,結果。getCashBackAmount());
    }
}

 

應用程式的基本元件如下圖所示:

應用程式的元件圖

我試圖儘可能清楚地做出一個例子。支付服務中的資金轉移取決於其他兩項服​​務:

  • 第一個是CashBackService與REST協議下的另一個(外部)Web應用程式互動。而且,為了計算實際的現金返還,我們需要與此應用程式同步事務。這是因為下一筆現金返還金額可能取決於使用者付款總額。
  • 第二個是AccountService與內部資料庫通訊並存儲與其使用者帳戶相關的資料。在此服務中,我們可以使用JPA事務在資料庫中將某些操作作為原子操作。

在現實生活中,我強烈建議重構這樣的系統,以避免這種情況,如果可能的話。但在我們的例子中,想象一下我們別無選擇。

我們來看看這個應用程式的草案程式碼:

@服務
公共  PaymentService {
 
    @Autowired
    private  ExternalCashBackService  externalCashBackService ;
 
    @Autowired
    私人 AccountService  帳戶服務 ;
 
    public  void  withdrawMoney(UUID  userId,int  amountOfMoney){
        synchronized(userId){  
            結果 result  =  externalCashBackService。evaluateCashBack(userId,amountOfMoney);
            accountService。轉移(使用者id,amountOfMoney  +  結果。getCashBackAmount());
            externalCashBackService。cashBackComplete(使用者id,結果。getCashBackAmount());
        }
    }
}
 
 
@服務
公共  ExternalCashBackService {
 
    @Autowired
    私人 RestTemplate  restTemplate ;
 
    public  Result  evaluateCashBack(UUID  userId,int  amountOfMoney){
        return  sendRestRequest(“evaluate”,userId,amountOfMoney);
    }
 
    public  Result  cashBackComplete(UUID  userId,int  cashBackAmount){
        return  sendRestRequest(“complete”,userId,cashBackAmount);
    }
 
    private  Result  sendRestRequest(String  action,UUID  userId,int  value){
 
        URI  externalCashBackSystemUrl  =
                URI。create(“http://cash-back-system.org/api/”  +  action);
 
        HttpHeaders  headers  =  new  HttpHeaders();
        標題。集(“接受”,的MediaType。APPLICATION_JSON_VALUE);
        RequestDto  requestDto  =  new  RequestDto(userId,value);
        HttpEntity <?>  request  =  new  HttpEntity <>(requestDto,headers);
 
        ResponseDto  responseDto  =  restTemplate。exchange(externalCashBackSystemUrl,
                                                        HttpMethod。GET,
                                                        要求,
                                                        ResponseDto。課程
                                              。getBody();  
 
        返回 新的 結果(responseDto。的getStatus(),responseDto。的getValue());
    }
}
 
@服務
公共  AccountService {
 
    @Autowired
    private  AccountRepository  accountRepository ;
 
    @Transactional(隔離 =  REPEATABLE_READ)  
    public  void  transfer(UUID  userId,int  amountOfMoney){
        帳戶 account  =  accountRepository。getOne(userId);
        帳戶。的setBalance(帳戶。所以getBalance()-  amountOfMoney);
        accountRepository。儲存(帳戶);
    }
}

 

但是,您可以擁有多個具有相同值的物件(userId 在此示例中),但同步適用於物件的例項而不是其值。

下面的程式碼不能很好地工作。因為它不正確同步; 靜態工廠方法UUID.fromString(..)在每次呼叫時都會生成UUID類的新例項,即使您傳遞了相等的字串引數。

因此,我們得到UUID了相同鍵的不同例項。如果我們從多個執行緒執行此程式碼,那麼我們很有可能遇到同步問題:

public  void  threadA(){
  paymentService。方法withdrawMoney(UUID。fromString(“ea051187-bb4b-4b07-9150-700000000000” ),1000);
}
 
public  void  threadB(){
  paymentService。方法withdrawMoney(UUID。fromString(“ea051187-bb4b-4b07-9150-700000000000” ),5000);
}

 

在這種情況下,您需要為equals物件獲取相同的引用以在其上進行同步。

解決這個問題的錯誤方法

同步方法

你可以移動synchronized一個方法:

public  synchronized  void  withdrawMoney(UUID  userId,int  amountOfMoney){
    ..
}

 

該解決方案效能不佳。您將阻止絕對所有使用者的資金轉賬。如果您需要使用相同的金鑰同步不同類中的不同操作,則此解決方案根本不會對您有所幫助。

字串實習生

為了確保包含使用者ID的類的例項在所有同步塊中都是相同的,我們可以將它序列化為String並使用它String.intern()來獲取equals字串的相同連結。

String.intern使用全域性池來儲存被攔截的字串。當您在字串上請求實習生時,如果此類字串存在,則從此池中獲取引用,否則此字串將放入池中。

您可以String.internThe Java Language Specification - 3.10.5 String Literals或有關String.intern的Oracle Java文件中找到更多詳細資訊。

public  void  withdrawMoney(UUID  userId,int  amountOfMoney){
  同步(使用者id。的toString()。實習生()){
      ..
  }
}

 

使用實習生不是一個好習慣,因為使用GC很難清理字串池。並且,您的應用程式可以通過主動使用來消耗太多資源  String.intern

此外,外部程式碼有可能在與應用程式相同的字串例項上同步。這可能導致死鎖。

一般來說,實習生的使用最好留給JDK的內部庫; Aleksey Shipilev有關於這個概念的好文章。

我們如何才能正確解決這個問題?

建立自己的同步原語

我們需要實現描述下一個圖的行為:

diag 0672834a7737bb323990aabe3bcb5ce6

首先,我們需要建立一個新的同步原語 - 自定義互斥鎖。這將由變數的值起作用,而不是由物件的引用起作用。

它會像一個“命名的互斥體  但有點寬,與使用任何物品的價值鑑定,而不僅僅是一個字串的值的能力。您可以找到同步原語的示例,以便通過其他語言(C ++,C#)中的名稱進行鎖定。現在,我們將用Java解決這個問題。

解決方案看起來像這樣:

public  void  withdrawMoney(UUID  userId,int  amountOfMoney){
  同步(XMutex。的(使用者id)){
      ..
  }
}

 

為了確保獲得相同的變數值相同的互斥鎖,我們將建立互斥鎖工廠。

public  void  withdrawMoney(UUID  userId,int  amountOfMoney){
  同步(XMutexFactory。得到(使用者id)){
      ..
  }
}
 
public  void  purchase(UUID  userId,int  amountOfMoney,VendorDescription  供應商){
  同步(XMutexFactory。得到(使用者id)){
      ..
  }
}

 

為了使用相等的鍵在每個請求上返回相同的互斥鎖例項,我們需要儲存建立的互斥鎖。如果我們將這些互斥鎖儲存在簡單中HashMap,那麼當新鍵出現時,地圖的大小將會增加。我們沒有工具來評估互斥鎖在任何地方都沒有使用的時間。

在這種情況下,我們可以使用它WeakReference來儲存地圖中互斥鎖的引用,就在它使用時。為了實現這種行為,我們可以使用WeakHashMap資料結構。幾個月前我寫了一篇關於這類參考文章的文章; 你可以在這裡更詳細地考慮它:Java中的Soft,Weak,Phantom References

我們的互斥工廠將以此為基礎WeakHashMap。互斥鎖工廠會建立一個新的互斥鎖,如果value(key) 找不到  互斥鎖的話  HashMap。然後,將建立的互斥鎖新增到  HashMap。使用它WeakHashMap允許我們HashMap 在存在任何對它的任何引用的同時儲存互斥  。並且,HashMap 當釋放所有對它的引用時,互斥鎖將自動從中刪除  。

我們需要使用同步版本WeakHashMap; 讓我們看看文件中描述的內容:

此類未同步。可以構造同步的WeakHashMap
使用Collections.synchronizedMap方法。

 

這很難過,不久之後,我們會仔細研究一下原因。但是現在,讓我們考慮一下實現的例子,這是官方文件提出的(我的意思是使用   Collections.synchronizedMap):

public  final  Map < XMutex < KeyT >,WeakReference < XMutex < KeyT >>>  weakHashMap  =
    收藏。synchronizedMap(new  WeakHashMap < XMutex < KeyT >,
                                                WeakReference < XMutex < KeyT >>>());
 
public  XMutex < KeyT >  getMutex(KeyT  key){
    validateKey(key);
    return  getExist(key)
            。orElseGet(()- >  saveNewReference(key));
}
 
private  可選< XMutex < KeyT >>  getExist(KeyT  key){
    return  可選。ofNullable(WeakHashMap中,得到(XMutex。的(關鍵)))
                   。map(WeakReference :: get);
}
 
private  XMutex < KeyT >  saveNewReference(KeyT  key){
 
    XMutex < KeyT >  互斥鎖 =  XMutex。的(鍵);
 
    WeakReference < XMutex < KeyT >>  res  =  weakHashMap。put(互斥,新的 WeakReference <>(互斥));
    如果(RES  !=   &&  資源。獲得()!=  ){
        返回 資源。get();
    }
    返回 互斥 ;
}

 

效能怎麼樣?

如果我們檢視程式碼Collections.synchronizedMap,那麼我們會在全域性互斥上找到很多同步,這是與SynchronizedMap 例項配對建立的  。

SynchronizedMap(Map < K,V >  m){
    這個。m  =  物體。requireNonNull(m);
    互斥 =  這個 ;   
}

 

並且所有其他方法  SynchronizedMap 都在互斥鎖上同步:

public  int  size(){
    synchronized(互斥){ return  m。size();}
}
public  boolean  containsKeyObject  key){
    synchronized(互斥){ return  m。containsKey(key);}
}
public  V  getObject  key){
    synchronized(互斥){ return  m。得到(關鍵);}
}
public  V  put(K  鍵,V  值){
    synchronized(互斥){ return  m。put(key,value);}
}
public  V  removeObject  key){
    synchronized(互斥){ return  m。刪除(鍵);}
}
 
...

 

此解決方案沒有最佳效能。所有這些同步都會導致我們使用互斥鎖工廠對每個操作進行永久鎖定。

將WeakReference作為鍵的ConcurrentHashMap

我們需要看一下使用的ConcurrentHashMap。它具有比Collections.synchronizedMap更好的效能。但我們有一個問題 - ConcurrentHashMap不允許使用弱引用。這意味著垃圾收集器無法刪除未使用的互斥鎖。

我找到了兩種方法來解決這個問題:

  • 首先是建立我自己的ConcurrentMap實現。這是正確的決定,但需要很長時間。
  • 第二個是使用ConcurrentReferenceHashMapSpring Framework 中的實現。這是一個很好的實現,但它有一些細微差別。我們將在下面考慮它們。

讓我們改變  XMutexFactory 實現來使用ConcurrentReferenceHashMap

公共  XMutexFactory < KeyT > {
 
  / **
   *使用預設設定建立互斥鎖工廠
   * /
  public  XMutexFactory(){
      這個。map  =  new  ConcurrentReferenceHashMap <>(DEFAULT_INITIAL_CAPACITY,
                                                  DEFAULT_LOAD_FACTOR,
                                                  DEFAULT_CONCURRENCY_LEVEL,
                                                  DEFAULT_REFERENCE_TYPE);
  }
 
  / **
   *通過鍵建立並返回互斥鎖。
   *如果此鍵的互斥鎖已存在於弱對映中,
   *然後返回互斥鎖的相同引用。
   * /
  public  XMutex < KeyT >  getMutex(KeyT  key){
      歸還 這個。地圖。compute(key,(k,v)- >(v  ==  null)? new  XMutex <>(k):v);
  }
 
}

 

這很酷!

程式碼少,但效能比以前更多。我們試著檢查一下這個解決方案的效能。

建立一個簡單的基準

為了選擇實現,我做了一個小基準測試。

Map 測試中涉及三種實現  :

  •  Collections.synchronizedMap 基於 WeakHashMap 
  •  ConcurrentHashMap 
  •  ConcurrentReferenceHashMap 

我使用  ConcurrentHashMap in基準測試來比較測量。此實現不適合在互斥鎖的工廠中使用,因為它不支援使用弱引用或軟引用。

所有基準測試都是使用JMH庫編寫的。

# 執行 完成。總 時間:000439
 
基準                                   模式     Cnt         評分         誤差   單位
ConcurrentMap。ConcurrentHashMap的            thrpt        5         0015 0004   OPS / 納秒
ConcurrentMap。ConcurrentReferenceHashMap    thrpt        5         0008 0001   OPS / 納秒
ConcurrentMap。SynchronizedMap               thrpt        5         0005 0001   OPS / 納秒
ConcurrentMap。ConcurrentHashMap的             avgt        5       565515 23638    納秒/ 運算
ConcurrentMap。ConcurrentReferenceHashMap     avgt        5      1098939 28828    納秒/ 運算
ConcurrentMap。SynchronizedMap                avgt        5      1503593 150552    納秒/ 運算
ConcurrentMap。ConcurrentHashMap的           樣品  301796       663330 11708    納秒/ 運算
ConcurrentMap。ConcurrentReferenceHashMap   樣品  180062      1110882 6928    納秒/ 運算
ConcurrentMap。SynchronizedMap              樣品  136290      1465543 5150    納秒/ 運算
ConcurrentMap。的ConcurrentHashMap                SS        5    336419150 617549053    納秒/ 運算
ConcurrentMap。ConcurrentReferenceHashMap       SS        5    922844750 468380489    納秒/ 運算
ConcurrentMap。SynchronizedMap                  SS        5   1199159700 4339391394    納秒/ 運算

 

在這個微基準測試中,我建立了一個情況,當幾個執行緒計算地圖中的值。您可以在Concurrent Map基準測試中更詳細地考慮此基準測試的原始碼 

把它放在圖表上:

基準結果

因此,ConcurrentReferenceHashMap 在這種情況下使用它是  正確的。

XSync庫入門

我將此程式碼打包到XSync庫中,您可以將其用作變數值同步的現成解決方案。

為此,您需要新增下一個依賴項:

< 依賴>
  < groupId > com.antkorwin </ groupId >
  < artifactId > xsync </ artifactId >
  < version > 1.1 </ version >
</ dependency >

 

然後,您可以建立XSync類的例項,以便在需要的型別上進行同步。對於Spring Framework,您可以將它們作為bean:

@豆
public  XSync < UUID >  xSync(){
    返回 新的 XSync <>();
}

 

現在,您可以使用它:

@Autowired
私有 XSync < UUID >  xSync ;
 
public  void  withdrawMoney(UUID  userId,int  amountOfMoney){
  xSync。execute(userId,()- > {
      結果 result  =  externalPolicySystem。validateTransfer(userId,amountOfMoney,WITHDRAW);
      accountService。轉移(userId,amountOfMoney,WITHDRAW);
  });
}
 
public  void  purchase(UUID  userId,int  amountOfMoney,VendorDescription  供應商){
  xSync。execute(userId,()- > {
      ..
  });
}

 

併發測試

為了確保這段程式碼執行良好,我寫了幾個併發測試。

有一個這樣的測試的例子:

public  void  testSyncBySingleKeyInConcurrency(){
   //安排
   XSync < UUID >  xsync  =  new  XSync <>();
   String  id  =  UUID。randomUUID()。toString();
   NonAtomicInt  var  =  new  NonAtomicInt(0);
 
   //這裡有一個魔力: 
   //我們建立了一個並行流並嘗試增加 
   //每個流中的相同非原子整數變數
   IntStream。範圍(0,THREAD_CNT)
            。盒裝()
            。並行()
            。的forEach(Ĵ  - >  XSYNC。執行(UUID。fromString(ID),VAR :: 增量));  
 
   //斷言
   等待()。atMost(5,TIMEUNIT。SECONDS)
          。直到(var :: getValue,equalTo(THREAD_CNT));
 
   斷言。assertThat(VAR。的getValue())。isEqualTo(THREAD_CNT);
}
 
/ **
 *執行不是執行緒安全的整數變數:
 * /
@Getter
@AllArgsConstructor
私有  NonAtomicInt {  
    私有 int  值 ;
 
    public  int  increment(){
        返回 值++ ;
    }
}

 

讓我們看看這個測試的結果:

併發測試結果