1. 程式人生 > >【轉】MaBatis學習---源碼分析MyBatis緩存原理

【轉】MaBatis學習---源碼分析MyBatis緩存原理

iso 負責 等待 全局 安全問題 cto suse 都去 pst

【原文】https://www.toutiao.com/i6594029178964673027/

源碼分析MyBatis緩存原理

1.簡介

在 Web 應用中,緩存是必不可少的組件。通常我們都會用 Redis 或 memcached 等緩存中間件,攔截大量奔向數據庫的請求,減輕數據庫壓力。作為一個重要的組件,MyBatis 自然也在內部提供了相應的支持。通過在框架層面增加緩存功能,可減輕數據庫的壓力,同時又可以提升查詢速度,可謂一舉兩得。MyBatis 緩存結構由一級緩存和二級緩存構成,這兩級緩存均是使用 Cache 接口的實現類。因此,在接下裏的章節中,我將首先會向大家介紹 Cache 幾種實現類的源碼,然後再分析一級和二級緩存的實現。下面先來分析 Cache 及其實現類。

2.緩存類介紹

在 MyBatis 中,Cache 是緩存接口,定義了一些基本的緩存操作,所有緩存類都應該實現該接口。MyBatis 內部提供了豐富的緩存實現類,比如具有基本緩存功能的 PerpetualCache,具有 LRU 策略的緩存 LruCache,以及可保證線程安全的緩存 SynchronizedCache 和具備阻塞功能的緩存 BlockingCache 等。除此之外,還有很多緩存實現類,這裏就不一一列舉了。需要特別說明的是,MyBatis 在實現緩存模塊的過程中,使用了裝飾模式。在以上幾種緩存實現類中,PerpetualCache 相當於裝飾模式中的 ConcreteComponent。LruCache、SynchronizedCache 和 BlockingCache 等相當於裝飾模式中的 ConcreteDecorator。它們的關系如下:

技術分享圖片

以上對 Cache 接口的實現類進行了簡單的介紹,接下來,我們一起深入到源碼中,看看這些緩存類的實現。

2.1 PerpetualCache

PerpetualCache 是一個具有基本功能的緩存類,內部使用了 HashMap 實現緩存功能。它的源碼如下:

public class PerpetualCache implements Cache {

private final String id;

private Map<Object, Object> cache = new HashMap<Object, Object>();

public PerpetualCache

(String id) {

this.id = id;

}

@Override

public String getId() {

return id;

}

@Override

public int getSize() {

return cache.size();

}

@Override

public void putObject(Object key, Object value) {

// 存儲鍵值對到 HashMap

cache.put(key, value);

}

@Override

public Object getObject(Object key) {

// 查找緩存項

return cache.get(key);

}

@Override

public Object removeObject(Object key) {

// 移除緩存項

return cache.remove(key);

}

@Override

public void clear() {

cache.clear();

}

// 省略部分代碼

}

上面是 PerpetualCache 的全部代碼,很簡單。接下來,我們通過裝飾類對該類進行裝飾,使其功能變的豐富起來。

2.2 LruCache

LruCache,顧名思義,是一種具有 LRU 策略的緩存實現類。除此之外,MyBatis 還提供了具有 FIFO 策略的緩存 FifoCache。不過並未提供 LFU 緩存,如果大家有興趣,可以自行拓展。接下來,我們來看一下 LruCache 的實現。

public class LruCache implements Cache {

private final Cache delegate;

private Map<Object, Object> keyMap;

private Object eldestKey;

public LruCache(Cache delegate) {

this.delegate = delegate;

setSize(1024);

}

public int getSize() {

return delegate.getSize();

}

public void setSize(final int size) {

/*

* 初始化 keyMap,註意,keyMap 的類型繼承自 LinkedHashMap,

* 並覆蓋了 removeEldestEntry 方法

*/

keyMap = new LinkedHashMap<Object, Object>(size, .75F, true) {

private static final long serialVersionUID = 4267176411845948333L;

// 覆蓋 LinkedHashMap 的 removeEldestEntry 方法

@Override

protected boolean removeEldestEntry(Map.Entry<Object, Object> eldest) {

boolean tooBig = size() > size;

if (tooBig) {

// 獲取將要被移除緩存項的鍵值

eldestKey = eldest.getKey();

}

return tooBig;

}

};

}

@Override

public void putObject(Object key, Object value) {

// 存儲緩存項

delegate.putObject(key, value);

cycleKeyList(key);

}

@Override

public Object getObject(Object key) {

// 刷新 key 在 keyMap 中的位置

keyMap.get(key);

// 從被裝飾類中獲取相應緩存項

return delegate.getObject(key);

}

@Override

public Object removeObject(Object key) {

// 從被裝飾類中移除相應的緩存項

return delegate.removeObject(key);

}

@Override

public void clear() {

delegate.clear();

keyMap.clear();

}

private void cycleKeyList(Object key) {

// 存儲 key 到 keyMap 中

keyMap.put(key, key);

if (eldestKey != null) {

// 從被裝飾類中移除相應的緩存項

delegate.removeObject(eldestKey);

eldestKey = null;

}

}

// 省略部分代碼

}

如上,LruCache 的 keyMap 屬性是實現 LRU 策略的關鍵,該屬性類型繼承自 LinkedHashMap,並覆蓋了 removeEldestEntry 方法。LinkedHashMap 可保持鍵值對的插入順序,當插入一個新的鍵值對時,LinkedHashMap 內部的 tail 節點會指向最新插入的節點。head 節點則指向第一個被插入的鍵值對,也就是最久未被訪問的那個鍵值對。默認情況下,LinkedHashMap 僅維護鍵值對的插入順序。若要基於 LinkedHashMap 實現 LRU 緩存,還需通過構造方法將 LinkedHashMap 的 accessOrder 屬性設為 true,此時 LinkedHashMap 會維護鍵值對的訪問順序。比如,上面代碼中 getObject 方法中執行了這樣一句代碼 keyMap.get(key),目的是刷新 key 對應的鍵值對在 LinkedHashMap 的位置。LinkedHashMap 會將 key 對應的鍵值對移動到鏈表的尾部,尾部節點表示最久剛被訪問過或者插入的節點。除了需將 accessOrder 設為 true,還需覆蓋 removeEldestEntry 方法。LinkedHashMap 在插入新的鍵值對時會調用該方法,以決定是否在插入新的鍵值對後,移除老的鍵值對。在上面的代碼中,當被裝飾類的容量超出了 keyMap 的所規定的容量(由構造方法傳入)後,keyMap 會移除最長時間未被訪問的鍵,並保存到 eldestKey 中,然後由 cycleKeyList 方法將 eldestKey 傳給被裝飾類的 removeObject 方法,移除相應的緩存項目。

上面講了 LinkedHashMap 是如何實現 LRU 特性的,這個是理解 LruCache 的源碼的關鍵所在,所以大家務必搞懂。如果大家想深入了解 LinkedHashMap 的源碼,也可參考我之前寫的文章 LinkedHashMap 源碼詳細分析 。好了,關於 LruCache 就先分析這麽多了。

2.3 BlockingCache

BlockingCache 實現了阻塞特性,該特性是基於 Java 重入鎖實現的。同一時刻下,BlockingCache 僅允許一個線程訪問指定 key 的緩存項,其他線程將會被阻塞住。下面我們來看一下 BlockingCache 的源碼。

public class BlockingCache implements Cache {

private long timeout;

private final Cache delegate;

private final ConcurrentHashMap<Object, ReentrantLock> locks;

public BlockingCache(Cache delegate) {

this.delegate = delegate;

this.locks = new ConcurrentHashMap<Object, ReentrantLock>();

}

@Override

public void putObject(Object key, Object value) {

try {

// 存儲緩存項

delegate.putObject(key, value);

} finally {

// 釋放鎖

releaseLock(key);

}

}

@Override

public Object getObject(Object key) {

// 請 // 請求鎖

acquireLock(key);

Object value = delegate.getObject(key);

// 若緩存命中,則釋放鎖。需要註意的是,未命中則不釋放鎖

if (value != null) {

// 釋放鎖

releaseLock(key);

}

return value;

}

@Override

public Object removeObject(Object key) {

// 釋放鎖

releaseLock(key);

return null;

}

private ReentrantLock getLockForKey(Object key) {

ReentrantLock lock = new ReentrantLock();

// 存儲 <key, Lock> 鍵值對到 locks 中

ReentrantLock previous = locks.putIfAbsent(key, lock);

return previous == null ? lock : previous;

}

private void acquireLock(Object key) {

Lock lock = getLockForKey(key);

if (timeout > 0) {

try {

// 嘗試加鎖

boolean acquired = lock.tryLock(timeout, TimeUnit.MILLISECONDS);

if (!acquired) {

throw new CacheException("...");

}

} catch (InterruptedException e) {

throw new CacheException("...");

}

} else {

// 加鎖

lock.lock();

}

}

private void releaseLock(Object key) {

// 獲取與當前 key 對應的鎖

ReentrantLock lock = locks.get(key);

if (lock.isHeldByCurrentThread()) {

// 釋放鎖

lock.unlock();

}

}

// 省略部分代碼

}

如上,查詢緩存時,getObject 方法會先獲取與 key 對應的鎖,並加鎖。若緩存命中,getObject 方法會釋放鎖,否則將一直鎖定。getObject 方法若返回 null,表示緩存未命中。此時 MyBatis 會進行數據庫查詢,並調用 putObject 方法存儲查詢結果。同時,putObject 方法會將指定 key 對應的鎖進行解鎖,這樣被阻塞的線程即可恢復運行。

上面的描述有點啰嗦,倒是 BlockingCache 類的註釋說到比較簡單明了。這裏引用一下:

It sets a lock over a cache key when the element is not found in cache. This way, other threads will wait until this element is filled instead of hitting the database.

這段話的意思是,當指定 key 對應元素不存在於緩存中時,BlockingCache 會根據 lock 進行加鎖。此時,其他線程將會進入等待狀態,直到與 key 對應的元素被填充到緩存中。而不是讓所有線程都去訪問數據庫。

在上面代碼中,removeObject 方法的邏輯很奇怪,僅調用了 releaseLock 方法釋放鎖,卻沒有調用被裝飾類的 removeObject 方法移除指定緩存項。這樣做是為什麽呢?大家可以先思考,答案將在分析二級緩存的相關邏輯時分析。

3. CacheKey

在 MyBatis 中,引入緩存的目的是為提高查詢效率,降低數據庫壓力。既然 MyBatis 引入了緩存,那麽大家思考過緩存中的 key 和 value 的值分別是什麽嗎?大家可能很容易能回答出 value 的內容,不就是 SQL 的查詢結果嗎。那 key 是什麽呢?是字符串,還是其他什麽對象?如果是字符串的話,那麽大家首先能想到的是用 SQL 語句作為 key。但這是不對的,比如:

SELECT * FROM author where id > ?

id > 1 和 id > 10 查出來的結果可能是不同的,所以我們不能簡單的使用 SQL 語句作為 key。從這裏可以看出來,運行時參數將會影響查詢結果,因此我們的 key 應該涵蓋運行時參數。除此之外呢,如果進行分頁查詢也會導致查詢結果不同,因此 key 也應該涵蓋分頁參數。綜上,我們不能使用簡單的 SQL 語句作為 key。應該考慮使用一種復合對象,能涵蓋可影響查詢結果的因子。在 MyBatis 中,這種復合對象就是 CacheKey。下面來看一下它的定義。

public class CacheKey implements Cloneable, Serializable {

private static final int DEFAULT_MULTIPLYER = 37;

private static final int DEFAULT_HASHCODE = 17;

// 乘子,默認為37

private final int multiplier;

// CacheKey 的 hashCode,綜合了各種影響因子

private int hashcode;

// 校驗和

private long checksum;

// 影響因子個數

private int count;

// 影響因子集合

private List<Object> updateList;

public CacheKey() {

this.hashcode = DEFAULT_HASHCODE;

this.multiplier = DEFAULT_MULTIPLYER;

this.count = 0;

this.updateList = new ArrayList<Object>();

}

// 省略其他方法

}

如上,除了 multiplier 是恒定不變的 ,其他變量將在更新操作中被修改。下面看一下更新操作的代碼。

/** 每當執行更新操作時,表示有新的影響因子參與計算 */

public void update(Object object) {

int baseHashCode = object == null ? 1 : ArrayUtil.hashCode(object);

// 自增 count

count++;

// 計算校驗和

checksum += baseHashCode;

// 更新 baseHashCode

baseHashCode *= count;

// 計算 hashCode

hashcode = multiplier * hashcode + baseHashCode;

// 保存影響因子

updateList.add(object);

}

當不斷有新的影響因子參與計算時,hashcode 和 checksum 將會變得愈發復雜和隨機。這樣可降低沖突率,使 CacheKey 可在緩存中更均勻的分布。CacheKey 最終要作為鍵存入 HashMap,因此它需要覆蓋 equals 和 hashCode 方法。下面我們來看一下這兩個方法的實現。

public boolean equals(Object object) {

// 檢測是否為同一個對象

if (this == object) {

return true;

}

// 檢測 object 是否為 CacheKey

if (!(object instanceof CacheKey)) {

return false;

}

final CacheKey cacheKey = (CacheKey) object;

// 檢測 hashCode 是否相等

if (hashcode != cacheKey.hashcode) {

return false;

}

// 檢測校驗和是否相同

if (checksum != cacheKey.checksum) {

return false;

}

// 檢測 coutn 是否相同

if (count != cacheKey.count) {

return false;

}

// 如果上面的檢測都通過了,下面分別對每個影響因子進行比較

for (int i = 0; i < updateList.size(); i++) {

Object thisObject = updateList.get(i);

Object thatObject = cacheKey.updateList.get(i);

if (!ArrayUtil.equals(thisObject, thatObject)) {

return false;

}

}

return true;

}

public int hashCode() {

// 返回 hashcode 變量

return hashcode;

}

equals 方法的檢測邏輯比較嚴格,對 CacheKey 中多個成員變量進行了檢測,已保證兩者相等。hashCode 方法比較簡單,返回 hashcode 變量即可。

關於 CacheKey 就先分析到這,CacheKey 在一二級緩存中會被用到,接下來還會看到它的身影。

4.一級緩存

在進行數據庫查詢之前,MyBatis 首先會檢查以及緩存中是否有相應的記錄,若有的話直接返回即可。一級緩存是數據庫的最後一道防護,若一級緩存未命中,查詢請求將落到數據庫上。一級緩存是在 BaseExecutor 被初始化的,下面我們來看一下相關的初始化邏輯:

public abstract class BaseExecutor implements Executor {

protected PerpetualCache localCache;

// 省略其他字段

protected BaseExecutor(Configuration configuration, Transaction transaction) {

this.localCache = new PerpetualCache("LocalCache");

// 省略其他字段初始化方法

}

}

如上,一級緩存的類型為 PerpetualCache,沒有被其他緩存類裝飾過。一級緩存所存儲從查詢結果會在 MyBatis 執行更新操作(INSERT/UPDATE/DELETE),以及提交和回滾事務時被清空。下面我們來看一下查詢一級緩存的邏輯。

public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {

BoundSql boundSql = ms.getBoundSql(parameter);

// 創建 CacheKey

CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);

return query(ms, parameter, rowBounds, resultHandler, key, boundSql);

}

public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {

// 省略部分代碼

List<E> list;

try {

queryStack++;

// 查詢一級緩存

list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;

if (list != null) {

// 存儲過程相關邏輯,忽略

handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);

} else {

// 緩存未命中,則從數據庫中查詢

list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);

}

} finally {

queryStack--;

}

// 省略部分代碼

return list;

}

如上,在訪問一級緩存之前,MyBatis 首先會調用 createCacheKey 方法創建 CacheKey。下面我們來看一下 createCacheKey 方法的邏輯:

public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {

if (closed) {

throw new ExecutorException("Executor was closed.");

}

// 創建 CacheKey 對象

CacheKey cacheKey = new CacheKey();

// 將 MappedStatement 的 id 作為影響因子進行計算

cacheKey.update(ms.getId());

// RowBounds 用於分頁查詢,下面將它的兩個字段作為影響因子進行計算

cacheKey.update(rowBounds.getOffset());

cacheKey.update(rowBounds.getLimit());

// 獲取 sql 語句,並進行計算

cacheKey.update(boundSql.getSql());

List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();

TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();

for (ParameterMapping parameterMapping : parameterMappings) {

if (parameterMapping.getMode() != ParameterMode.OUT) {

Object value; // 運行時參數

// 當前大段代碼用於獲取 SQL 中的占位符 #{xxx} 對應的運行時參數,

// 前文有類似分析,這裏忽略了

// 讓運行時參數參與計算

cacheKey.update(value);

}

}

if (configuration.getEnvironment() != null) {

// 獲取 Environment id 遍歷,並讓其參與計算

cacheKey.update(configuration.getEnvironment().getId());

}

return cacheKey;

}

如上,在計算 CacheKey 的過程中,有很多影響因子參與了計算。比如 MappedStatement 的 id 字段,SQL 語句,分頁參數,運行時變量,Environment 的 id 字段等。通過讓這些影響因子參與計算,可以很好的區分不同查詢請求。所以,我們可以簡單的把 CacheKey 看做是一個查詢請求的 id。有了 CacheKey,我們就可以使用它讀寫緩存了。在上面代碼中,若一級緩存為命中,BaseExecutor 會調用 queryFromDatabase 查詢數據庫,並將查詢結果寫入緩存中。下面看一下 queryFromDatabase 的邏輯。

private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds,ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {

List<E> list;

// 向緩存中存儲一個占位符

localCache.putObject(key, EXECUTION_PLACEHOLDER);

try {

// 查詢數據庫

list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);

} finally {

// 移除占位符

localCache.removeObject(key);

}

// 存儲查詢結果

localCache.putObject(key, list);

// 存儲過程相關邏輯,忽略

if (ms.getStatementType() == StatementType.CALLABLE) {

localOutputParameterCache.putObject(key, parameter);

}

return list;

}

到此,關於一級緩存相關的邏輯就差不多分析完了。一級緩存的邏輯比較簡單,大家可以簡單過一遍。接下來分析二級緩存。

5.二級緩存

二級緩存構建在一級緩存之上,在收到查詢請求時,MyBatis 首先會查詢二級緩存。若二級緩存未命中,再去查詢一級緩存。與一級緩存不同,二級緩存和具體的命名空間綁定,一級緩存則是和 SqlSession 綁定。在按照 MyBatis 規範使用 SqlSession 的情況下,一級緩存不存在並發問題。二級緩存則不然,二級緩存可在多個命名空間間共享。這種情況下,會存在並發問題,因此需要針對性去處理。除了並發問題,二級緩存還存在事務問題,相關問題將在接下來進行分析。下面首先來看一下訪問二級緩存的邏輯。

// -☆- CachingExecutor

public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {

BoundSql boundSql = ms.getBoundSql(parameterObject);

// 創建 CacheKey

CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);

return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);

}

public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)

throws SQLException {

// 從 MappedStatement 中獲取 Cache,註意這裏的 Cache 並非是在 CachingExecutor 中創建的

Cache cache = ms.getCache();

// 如果配置文件中沒有配置 <cache>,則 cache 為空

if (cache != null) {

flushCacheIfRequired(ms);

if (ms.isUseCache() && resultHandler == null) {

ensureNoOutParams(ms, boundSql);

// 訪問二級緩存

List<E> list = (List<E>) tcm.getObject(cache, key);

// 緩存未命中

if (list == null) {

// 向一級緩存或者數據庫進行查詢

list = delegate.<E>query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);

// 緩存查詢結果

tcm.putObject(cache, key, list);

}

return list;

}

}

return delegate.<E>query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);

}

如上,註意二級緩存是從 MappedStatement 中獲取的,而非由 CachingExecutor 創建。由於 MappedStatement 存在於全局配置中,可以多個 CachingExecutor 獲取到,這樣就會出現線程安全問題。除此之外,若不加以控制,多個事務共用一個緩存實例,會導致臟讀問題。線程安全問題可以通過 SynchronizedCache 裝飾類解決,該裝飾類會在 Cache 實例構造期間被添加上。相關過程可以參考我之前寫的文章 MyBatis-源碼分析-映射文件解析過程 ,這裏就不多說了。至於臟讀問題,需要借助其他類來處理,也就是上面代碼中 tcm 變量對應的類型。下面分析一下。

/** 事務緩存管理器 */

public class TransactionalCacheManager {

// Cache 與 TransactionalCache 的映射關系表

private final Map<Cache, TransactionalCache> transactionalCaches = newHashMap<Cache, TransactionalCache>();

public void clear(Cache cache) {

// 獲取 TransactionalCache 對象,並調用該對象的 clear 方法,下同

getTransactionalCache(cache).clear();

}

public Object getObject(Cache cache, CacheKey key) {

return getTransactionalCache(cache).getObject(key);

}

public void putObject(Cache cache, CacheKey key, Object value) {

getTransactionalCache(cache).putObject(key, value);

}

public void commit() {

for (TransactionalCache txCache : transactionalCaches.values()) {

txCache.commit();

}

}

public void rollback() {

for (TransactionalCache txCache : transactionalCaches.values()) {

txCache.rollback();

}

}

private TransactionalCache getTransactionalCache(Cache cache) {

// 從映射表中獲取 TransactionalCache

TransactionalCache txCache = transactionalCaches.get(cache);

if (txCache == null) {

// TransactionalCache 也是一種裝飾類,為 Cache 增加事務功能

txCache = new TransactionalCache(cache);

transactionalCaches.put(cache, txCache);

}

return txCache;

}

}

TransactionalCacheManager 內部維護了 Cache 實例與 TransactionalCache 實例間的映射關系,該類也僅負責維護兩者的映射關系,真正做事的還是 TransactionalCache。TransactionalCache 是一種緩存裝飾器,可以為 Cache 實例增加事務功能。我在之前提到的臟讀問題正是由該類進行處理的。下面分析一下該類的邏輯。

public class TransactionalCache implements Cache {

private final Cache delegate;

private boolean clearOnCommit;

// 在事務被提交前,所有從數據庫中查詢的結果將緩存在此集合中

private final Map<Object, Object> entriesToAddOnCommit;

// 在事務被提交前,當緩存未命中時,CacheKey 將會被存儲在此集合中

private final Set<Object> entriesMissedInCache;

// 省略部分代碼

@Override

public Object getObject(Object key) {

// 查詢 delegate 所代表的緩存

Object object = delegate.getObject(key);

if (object == null) {

// 緩存未命中,則將 key 存入到 entriesMissedInCache 中

entriesMissedInCache.add(key);

}

if (clearOnCommit) {

return null;

} else {

return object;

}

}

@Override

public void putObject(Object key, Object object) {

// 將鍵值對存入到 entriesToAddOnCommit 中,而非 delegate 緩存中

entriesToAddOnCommit.put(key, object);

}

@Override

public Object removeObject(Object key) {

return null;

}

@Override

public void clear() {

clearOnCommit = true;

// 清空 entriesToAddOnCommit,但不清空 delegate 緩存

entriesToAddOnCommit.clear();

}

public void commit() {

// 根據 clearOnCommit 的值決定是否清空 delegate

if (clearOnCommit) {

delegate.clear();

}

// 刷新未緩存的結果到 delegate 緩存中

flushPendingEntries();

// 重置 entriesToAddOnCommit 和 entriesMissedInCache

reset();

}

public void rollback() {

unlockMissedEntries();

reset();

}

private void reset() {

clearOnCommit = false;

// 清空集合

entriesToAddOnCommit.clear();

entriesMissedInCache.clear();

}

private void flushPendingEntries() {

for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) {

// 將 entriesToAddOnCommit 中的內容轉存到 delegate 中

delegate.putObject(entry.getKey(), entry.getValue());

}

for (Object entry : entriesMissedInCache) {

if (!entriesToAddOnCommit.containsKey(entry)) {

// 存入空值

delegate.putObject(entry, null);

}

}

}

private void unlockMissedEntries() {

for (Object entry : entriesMissedInCache) {

try {

// 調用 removeObject 進行解鎖

delegate.removeObject(entry);

} catch (Exception e) {

log.warn("...");

}

}

}

}

在 TransactionalCache 的代碼中,我們要重點關註 entriesToAddOnCommit 集合,TransactionalCache 中的很多方法都會與這個集合打交道。該集合用於存儲從查詢的結果,那為什麽要將結果保存在該集合中,而非 delegate 所表示的緩存中呢?主要是因為直接存到 delegate 會導致臟數據問題。下面通過一張圖演示一下臟數據問題發生的過程,假設兩個線程開啟兩個不同的事務,它們的執行過程如下:

技術分享圖片

如上圖,時刻2,事務 A 對記錄 A 進行了更新。時刻3,事務 A 從數據庫查詢記錄 A,並將記錄 A 寫入緩存中。時刻4,事務 B 查詢記錄 A,由於緩存中存在記錄 A,事務 B 直接從緩存中取數據。這個時候,臟數據問題就發生了。事務 B 在事務 A 未提交情況下,讀取到了事務 A 所修改的記錄。為了解決這個問題,我們可以為每個事務引入一個獨立的緩存。查詢數據時,仍從 delegate 緩存(以下統稱為共享緩存)中查詢。若緩存未命中,則查詢數據庫。存儲查詢結果時,並不直接存儲查詢結果到共享緩存中,而是先存儲到事務緩存中,也就是 entriesToAddOnCommit 集合。當事務提交時,再將事務緩存中的緩存項轉存到共享緩存中。這樣,事務 B 只能在事務 A 提交後,才能讀取到事務 A 所做的修改,解決了臟讀問題。整個過程大致如下:

技術分享圖片

如上,時刻2,事務 A 和 B 同時查詢記錄 A。此時共享緩存中還沒沒有數據,所以兩個事務均會向數據庫發起查詢請求,並將查詢結果存儲到各自的事務緩存中。時刻3,事務 A 更新記錄 A,這裏把更新後的記錄 A 記為 A′。時刻4,兩個事務再次進行查詢。此時,事務 A 讀取到的記錄為修改後的值,而事務 B 讀取到的記錄仍為原值。時刻5,事務 A 被提交,並將事務緩存 A 中的內容轉存到共享緩存中。時刻6,事務 B 再次查詢記錄 A,由於共享緩存中有相應的數據,所以直接取緩存數據即可。因此得到記錄 A′,而非記錄 A。但由於事務 A 已經提交,所以事務 B 讀取到的記錄 A′ 並非是臟數據。MyBatis 引入事務緩存解決了臟讀問題,事務間只能讀取到其他事務提交後的內容,這相當於事務隔離級別中的“讀已提交(Read Committed)”。但需要註意的時,MyBatis 緩存事務機制只能解決臟讀問題,並不能解決“不可重復讀”問題。再回到上圖,事務 B 在被提交前進行了三次查詢。前兩次查詢得到的結果為記錄 A,最後一次查詢得到的結果為 A′。最有一次的查詢結果與前兩次不同,這就會導致“不可重復讀”的問題。MyBatis 的緩存事務機制最高只支持“讀已提交”,並不能解決“不可重復讀”問題。即使數據庫使用了更高的隔離級別解決了這個問題,但因 MyBatis 緩存事務機制級別較低。此時仍然會導致“不可重復讀”問題的發生,這個在日常開發中需要註意一下。

下面寫點測試代碼驗證 MyBatis 所導致的“不可重復讀”問題,首先看一下實體類:

public class Student {

private Integer id;

private String name;

private Integer age;

// 省略 getter/setter

}

對應的數據表如下:

student

+----+----------+------+

| id | name | age |

+----+----------+------+

| 1 | coolblog | 20 |

+----+----------+------+

Dao 接口與映射文件定義如下:

public interface StudentDao {

Student findOne(@Param("id") Integer id);

int update(@Param("id") Integer id, @Param("name") String name);

}

<mapper namespace="xyz.coolblog.dao.StudentDao">

<!-- 註意要在映射文件中配置緩存 -->

<cache/>

<select id="findOne" resultType="xyz.coolblog.model.Student">

SELECT

`id`, `name`, `age`

FROM

student

WHERE

id = #{id}

</select>

<update id="update">

UPDATE

student

SET

`name` = #{name}

WHERE

id = #{id}

</update>

</mapper>

測試代碼如下:

public class TransactionalCacheTest {

private SqlSessionFactory sqlSessionFactory;

private CountDownLatch countDownLatch = new CountDownLatch(1);

@Before

public void prepare() throws IOException {

String resource = "mybatis-transactional-cache-config.xml";

InputStream inputStream = Resources.getResourceAsStream(resource);

sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);

inputStream.close();

}

@Test

public void testTransactional() throws IOException, InterruptedException, ExecutionException {

ExecutorService es = Executors.newFixedThreadPool(2);

// 開啟兩個線程

Future<String> fa = es.submit(this::transactionalA);

Future<String> fb = es.submit(this::transactionalB);

countDownLatch.countDown();

es.awaitTermination(6, TimeUnit.SECONDS);

System.out.println(fa.get());

System.out.println(" -------- 分割線 ------- ");

System.out.println(fb.get());

}

private String transactionalA() throws Exception {

SqlSession sqlSession = sqlSessionFactory.openSession();

StudentDao studentDao = sqlSession.getMapper(StudentDao.class);

countDownLatch.await();

StringBuilder sb = new StringBuilder();

sb.append("時刻1:開啟事務 A ");

sb.append("時刻2:查詢記錄 A ");

Student s1 = studentDao.findOne(1);

sb.append(s1).append(" ");

sb.append("時刻3:更新記錄 A ");

studentDao.update(1, "tianxiaobo");

sb.append("時刻4:查詢記錄 A‘ ");

Student s2 = studentDao.findOne(1);

sb.append(s2).append(" ");

// 此處睡眠1秒,讓事務 B 在事務 A 提交前,完成時刻4的查詢請求

Thread.sleep(1000);

sb.append("時刻5:提交事務 A");

sqlSession.commit();

return sb.toString();

}

private String transactionalB() throws Exception {

SqlSession sqlSession = sqlSessionFactory.openSession();

StudentDao studentDao = sqlSession.getMapper(StudentDao.class);

countDownLatch.await();

StringBuilder sb = new StringBuilder();

sb.append("時刻1:開啟事務 B ");

sb.append("時刻2:查詢數據 A ");

Student s1 = studentDao.findOne(1);

sb.append(s1).append(" ");

sb.append("時刻3:--------- ");

sb.append("時刻4:查詢數據 A ");

Student s2 = studentDao.findOne(1);

sb.append(s2).append(" ");

// 此處睡眠3秒,等待事務 A 提交

Thread.sleep(3000);

sb.append("時刻5:--------- ");

sb.append("時刻6:查詢數據 A‘ ");

Student s3 = studentDao.findOne(1);

sb.append(s3).append(" ");

sb.append("時刻7:提交事務 B");

sqlSession.commit();

return sb.toString();

}

}

最後對輸出結果進行簡單的美化,如下:

技術分享圖片

如上,事務 B 在時刻2和時刻4讀取到的記錄與數據庫中的記錄一致,表示可重復讀。但當事務 A 提交後,事務 B 在時刻6讀取到的數據則是事務 A 修改的內容,這個時候就出現了“不可重復讀”問題。以上測試是基於 MySql 數據可讀,MySQL 默認事務級別為“可重復讀”。

技術分享圖片

下面在本地開啟兩個 MySQL 客戶端,模擬上面的執行流程。最終結果如下:

技術分享圖片

從測試結果可以看出,不可重復讀問題並未發生,事務 B 三次查詢結果均相同。好了,到此關於 MyBatis 二級緩存所引發的問題就分析完了。

接下來,我們再來看一下 entriesMissedInCache 集合,這個集合是用於存儲未命中緩存的查詢請求對應的 CacheKey。單獨分析與 entriesMissedInCache 相關的邏輯沒什麽意義,要搞清 entriesMissedInCache 的實際用途,需要把它和 BlockingCache 的邏輯結合起來進行分析。在 BlockingCache,同一時刻僅允許一個線程通過 getObject 方法查詢指定 key 對應的緩存項。如果緩存未命中,getObject 方法不會釋放鎖,導致其他線程被阻塞住。其他線程要想恢復運行,必須進行解鎖,解鎖邏輯由 BlockingCache 的 putObject 和 removeObject 方法執行。其中 putObject 會在 TransactionalCache 的 flushPendingEntries 方法中被調用,removeObject 方法則由 TransactionalCache 的 unlockMissedEntries 方法調用。flushPendingEntries 和 unlockMissedEntries 最終都會遍歷 entriesMissedInCache 集合,並將集合元素傳給 BlockingCache 的相關方法。這樣可以解開指定 key 對應的鎖,讓阻塞線程恢復運行。

最後特別說明一下,本節的內容參考了 《MyBatis技術內幕》 一書中關於緩存的一些分析,這裏向這本書的作者表示感謝。如果大家不是很能看懂上面的內容,也可參考這本書的部分章節。

6.總結

本篇文章簡單介紹了一些緩存類的實現,並對一二級緩存進行了深入分析。本文僅分析了緩存的使用過程,並未對緩存的初始化,以及 CachingExecutor 和 SimpleExecutor(繼承自 BaseExecutor)創建過程進行分析。相關內容已在本系列之前的文章中已分析過,再次分析有點贅述。對於本文忽略掉的部分,希望大家可以自行探索,或者閱讀我之前寫的文章。

【轉】MaBatis學習---源碼分析MyBatis緩存原理