自己動手寫帶有事務支援的分散式Key-Value儲存系統——讀寫鎖
為了實現高併發的鎖機制事務,需要使用讀寫鎖進行併發控制。Java類庫本身提供了讀寫鎖,但是其靈活性對於這個系統的需求是不夠的。本系統的讀寫鎖需要實現在單個執行緒內部能夠任意的加寫鎖,加讀鎖,並且不會阻塞,鎖的釋放可以單次單次進行,也可以一次完成(多次加鎖,一次釋放);但是多個執行緒之間仍然符合讀寫鎖的互斥性。
為了實現這樣的需求,我的設計如下
|--------------| 讀執行緒佇列 |-----------------------|-----------------------|--------------------|
| |--------------------------------------->|執行緒+讀鎖定次數|執行緒+讀鎖定次數|。。。。。。 |
| |
| 讀寫鎖 | 寫執行緒資訊 |---------------------------|
|-------------|----------------------------------------->| 寫執行緒+寫鎖定次數|
|---------------------------|
每一個讀寫鎖,儲存一個讀執行緒佇列,佇列中儲存加了讀鎖的執行緒,以及每個執行緒加讀鎖的次數;並且儲存一個寫執行緒資訊(執行緒+加寫鎖的次數)。每一次加讀鎖(共享鎖)的時候,必須是寫鎖未被鎖定,或者寫鎖的鎖定者是當前執行緒,否則就等待,加讀鎖就是將當前執行緒對應的讀鎖定次數加1;加寫鎖(排他鎖)的時候,已加寫鎖執行緒必須為空,或者為當前執行緒並且讀執行緒佇列為空,或者只有當前執行緒,否則等待,加鎖時就是將寫鎖定次數加一。其餘的操作讀者可以自行推理。如果有問題,請留言相互討論。
org.footoo.storage.transaction.lock.RandomSync
/** * Alipay.com Inc. * Copyright (c) 2004-2014 All Rights Reserved. */ package org.footoo.storage.transaction.lock; import java.util.LinkedList; import java.util.List; import org.footoo.common.log.Logger; import org.footoo.common.log.LoggerFactory; /** * <h1>同步函式,這哥們必須是執行緒安全的</h1> * <b>可以加讀鎖的前提</b> * <ul> * <li>寫鎖沒有鎖定,即加寫鎖的執行緒為null,或者為當前執行緒 * </ul> * <br> * <b>可以加寫鎖的前提</b> * <ul> * <li>寫鎖沒有鎖定,即加寫鎖的執行緒為null,或者為當前執行緒 * <li>讀鎖沒有鎖定,即加讀鎖的執行緒為null,或者是當前執行緒 * </ul> * * <br> * <h2>問題<h2> * <ul> * <li>隨機的喚醒方式會出現寫執行緒飢餓的現象,即寫執行緒鎖定了, * 可以鎖定大量的讀執行緒,導致讀執行緒得不到執行, * 詳見org.footoo.storage.SyncTest2。 * 這樣就需要一種公平的喚醒方式來提高公平性 * </ul> * * @author jeff * @version $Id: Sync.java, v 0.1 2014年3月17日 下午4:48:48 jeff Exp $ */ public class RandomSync { /** 鎖物件 */ private final Object lock = new Object(); /** 獲得了共享鎖的執行緒 */ private final List<ThreadRefCount> sharedThreads = new LinkedList<>(); /** 獲得獨佔鎖的執行緒 */ private ThreadRefCount writeThreads = null; /** 日誌工具 */ private static final Logger logger = LoggerFactory.getLogger(RandomSync.class); /** 是否已經被釋放 */ private boolean released = false; /** * 共享鎖的方式鎖 * * @throws LockReleasedException */ public void lockShared() throws LockReleasedException { synchronized (lock) { waitUtilCanRead(); //進行加讀鎖,其實就是增加相應的執行緒的鎖計數 ThreadRefCount refCount = getOrCreateRef(sharedThreads, Thread.currentThread()); refCount.incRefCount(); } } /** * 排他的方式鎖 * * @throws LockReleasedException */ public void lockWrite() throws LockReleasedException { synchronized (lock) { waitUtilCanWrite(); if (writeThreads == null) { writeThreads = new ThreadRefCount(Thread.currentThread(), 0); } //進行寫鎖的加鎖,其實就是增加寫執行緒的加鎖計數 writeThreads.incRefCount(); } } /** * 解鎖共享鎖 * * @throws LockReleasedException * @throws IllegalStateLockException */ public void unlockShared() throws LockReleasedException, IllegalStateLockException { synchronized (lock) { testSyncOk(); //獲取執行緒對應的應用計數情況 ThreadRefCount refCount = getRefCount(sharedThreads, Thread.currentThread()); //狀態不合法 if (refCount == null || refCount.getRefCount() <= 0) { throw new IllegalStateLockException("無法解鎖未處於鎖定狀態的讀鎖"); } //減去一個引用計數 refCount.decRefCount(); if (refCount.getRefCount() == 0) { sharedThreads.remove(refCount); //喚醒等待的寫執行緒 if (sharedThreads.isEmpty() && writeThreads == null) { lock.notifyAll(); } } } } /** * 解鎖寫鎖 * * @throws LockReleasedException * @throws IllegalStateLockException */ public void unlockWrite() throws LockReleasedException, IllegalStateLockException { synchronized (lock) { testSyncOk(); if (!writeThreads.getThread().equals(Thread.currentThread())) { throw new IllegalStateLockException("無法解鎖未被鎖定的寫鎖"); } writeThreads.decRefCount(); if (writeThreads.getRefCount() == 0) { //寫鎖徹底被解開 writeThreads = null; //喚醒所有等待的執行緒 lock.notifyAll(); } } } /** * 嘗試timeoutms長度時間的加鎖 * * @param timeoutms * @return * @throws LockReleasedException */ public boolean tryLockShared(int timeoutms) throws LockReleasedException { synchronized (lock) { testSyncOk(); if (!isWriteLockUnlock()) { if (timeoutms > 0) { try { lock.wait(timeoutms); } catch (InterruptedException e) { logger.warn(e, "try lock shared時等待異常"); } } else { return false; } testSyncOk(); } //醒來重新測試 if (isWriteLockUnlock()) { ThreadRefCount refCount = getOrCreateRef(sharedThreads, Thread.currentThread()); refCount.incRefCount(); return true; } else { return false; } } } /** * 不阻塞的方式加鎖 * * @return * @throws LockReleasedException */ public boolean tryLockShared() throws LockReleasedException { return tryLockShared(0); } /** * 嘗試在timeoutms毫秒內加寫鎖 * * @param timeoutms * @return * @throws LockReleasedException */ public boolean tryLockWrite(int timeoutms) throws LockReleasedException { synchronized (lock) { if (!canAddWriteLock()) { if (timeoutms > 0) { try { lock.wait(timeoutms); } catch (InterruptedException e) { logger.error(e, "try lock write 等待失敗"); } } else { return false; } testSyncOk(); } //測試狀態 if (!canAddWriteLock()) { return false; } else { if (writeThreads == null) { writeThreads = new ThreadRefCount(Thread.currentThread(), 0); } writeThreads.incRefCount(); return true; } } } /** * 非阻塞方式加鎖 * * @return * @throws LockReleasedException */ public boolean tryLockWrite() throws LockReleasedException { return tryLockWrite(0); } /** * 釋放當前執行緒的所有的資源 * * @throws LockReleasedException */ public void releaseSelf() throws LockReleasedException { synchronized (lock) { testSyncOk(); //釋放讀鎖資源 ThreadRefCount refCount = getRefCount(sharedThreads, Thread.currentThread()); if (refCount != null) { sharedThreads.remove(refCount); } //釋放寫鎖 if (writeThreads != null && writeThreads.getThread() == Thread.currentThread()) { writeThreads = null; lock.notifyAll(); } } } /** * 釋放掉這個鎖 * * @throws LockReleasedException */ public void release() throws LockReleasedException { synchronized (lock) { //不允許重複釋放 testSyncOk(); released = true; } } /** * 等待當前執行緒可以加寫鎖 * * @throws LockReleasedException */ private void waitUtilCanRead() throws LockReleasedException { synchronized (lock) { testSyncOk(); while (!isWriteLockUnlock()) { try { lock.wait(); } catch (InterruptedException e) { logger.warn(e, "執行緒等待失敗"); } //重新測試鎖狀態 testSyncOk(); } } } /** * 等待直到可以加寫鎖 * * @throws LockReleasedException */ private void waitUtilCanWrite() throws LockReleasedException { synchronized (lock) { testSyncOk(); //等待直到可以加寫鎖 while (!canAddWriteLock()) { try { lock.wait(); } catch (InterruptedException e) { logger.warn(e, "執行緒等待失敗"); } //重新測試鎖狀態 testSyncOk(); } } } /** * 是否可以加讀鎖 * * @return */ private boolean canAddWriteLock() { return isWriteLockUnlock() && isReadLockUnlock(); } /** * 是否讀鎖還沒有鎖定 * * @return */ private boolean isReadLockUnlock() { return sharedThreads.isEmpty() || (sharedThreads.size() == 1 && sharedThreads.get(0).getThread() == Thread .currentThread()); } /** * 寫鎖是否未鎖 * * @return */ private boolean isWriteLockUnlock() { synchronized (lock) { return (writeThreads == null) || (writeThreads.getThread().equals(Thread.currentThread())); } } /** * 獲取執行緒therad對應的refCount * * @param refCounts * @param thread * @return */ private ThreadRefCount getRefCount(List<ThreadRefCount> refCounts, Thread thread) { ThreadRefCount refCount = new ThreadRefCount(Thread.currentThread(), 0); int index = 0; synchronized (lock) { if ((index = refCounts.indexOf(refCount)) == -1) { refCount = null; } else { refCount = refCounts.get(index); } return refCount; } } /** * 獲取或者建立執行緒的加鎖資訊 * * @param refCounts * @param thread * @return */ private ThreadRefCount getOrCreateRef(List<ThreadRefCount> refCounts, Thread thread) { ThreadRefCount refCount = new ThreadRefCount(Thread.currentThread(), 0); int index = 0; synchronized (lock) { if ((index = refCounts.indexOf(refCount)) == -1) { refCounts.add(refCount); } else { refCount = refCounts.get(index); } return refCount; } } /** * 確保鎖沒有被釋放 * * @throws LockReleasedException */ public void testSyncOk() throws LockReleasedException { synchronized (lock) { if (released) { throw new LockReleasedException(); } } } /** * 計數執行緒鎖定的次數 * * @author jeff * @version $Id: Sync.java, v 0.1 2014年3月17日 下午5:45:26 jeff Exp $ */ private class ThreadRefCount { /** 鎖定的執行緒 */ private Thread thread; /** 鎖定的次數 */ private int refCount = 0; public ThreadRefCount(Thread thread, int refCount) { this.thread = thread; this.refCount = refCount; } /** * 增加引用計數 */ public void incRefCount() { refCount++; } /** * 減少引用計數 */ public void decRefCount() { refCount--; } /** * Getter method for property <tt>thread</tt>. * * @return property value of thread */ public Thread getThread() { return thread; } /** * Setter method for property <tt>thread</tt>. * * @param thread value to be assigned to property thread */ @SuppressWarnings("unused") public void setThread(Thread thread) { this.thread = thread; } /** * Getter method for property <tt>refCount</tt>. * * @return property value of refCount */ public int getRefCount() { return refCount; } /** * Setter method for property <tt>refCount</tt>. * * @param refCount value to be assigned to property refCount */ @SuppressWarnings("unused") public void setRefCount(int refCount) { this.refCount = refCount; } /** * @see java.lang.Object#hashCode() */ public int hashCode() { return thread.hashCode(); } /** * @see java.lang.Object#equals(java.lang.Object) */ public boolean equals(Object other) { if (this == other) { return true; } if (!(other instanceof ThreadRefCount)) { return false; } return thread.equals(((ThreadRefCount) other).getThread()); } } }
封裝後
org.footoo.storage.transaction.lock.RandomSyncRWLock
/**
* Alipay.com Inc.
* Copyright (c) 2004-2014 All Rights Reserved.
*/
package org.footoo.storage.transaction.lock;
import org.footoo.common.log.Logger;
import org.footoo.common.log.LoggerFactory;
/**
* 讀寫鎖,實際只是封裝了RandomSync
*
* @author jeff
* @version $Id: RandomSyncRWLock.java, v 0.1 2014年3月17日 下午10:09:45 jeff Exp $
*/
public class RandomSyncRWLock implements RWLock {
/** 實際的同步工具 */
private RandomSync sync = new RandomSync();
/** 讀鎖 */
private Lock readLockInner = new ReadLock();
/** 寫鎖 */
private Lock writeLockInner = new WriteLock();
/** 日誌工具 */
private static final Logger logger = LoggerFactory.getLogger(RandomSyncRWLock.class);
/**
* @see org.footoo.storage.transaction.lock.RWLock#readLock()
*/
@Override
public Lock readLock() {
return readLockInner;
}
/**
* @see org.footoo.storage.transaction.lock.RWLock#writeLock()
*/
@Override
public Lock writeLock() {
return writeLockInner;
}
/**
* @see org.footoo.storage.transaction.lock.RWLock#releaseSelf()
*/
@Override
public void releaseSelf() {
try {
sync.releaseSelf();
} catch (LockReleasedException e) {
logger.warn(e, "釋放當前執行緒的讀寫鎖發生異常");
}
}
/**
* @see org.footoo.storage.transaction.lock.RWLock#release()
*/
@Override
public void release() {
try {
sync.release();
} catch (LockReleasedException e) {
logger.error(e, "釋放讀寫鎖發生異常");
}
}
/**
* 讀鎖
*
* @author jeff
* @version $Id: RandomSyncRWLock.java, v 0.1 2014年3月17日 下午10:12:42 jeff Exp $
*/
private class ReadLock implements Lock {
@Override
public void lock() {
try {
sync.lockShared();
} catch (LockReleasedException e) {
logger.warn(e);
}
}
@Override
public void unlock() {
try {
sync.unlockShared();
} catch (LockReleasedException | IllegalStateLockException e) {
logger.warn(e);
}
}
@Override
public boolean tryLock() {
try {
return sync.tryLockShared();
} catch (LockReleasedException e) {
logger.warn(e);
return false;
}
}
@Override
public boolean tryLock(int ms) {
try {
return sync.tryLockShared(ms);
} catch (LockReleasedException e) {
logger.warn(e);
return false;
}
}
}
/**
* 寫鎖
*
* @author jeff
* @version $Id: RandomSyncRWLock.java, v 0.1 2014年3月17日 下午10:13:40 jeff Exp $
*/
private class WriteLock implements Lock {
@Override
public void lock() {
try {
sync.lockWrite();
} catch (LockReleasedException e) {
logger.warn(e);
}
}
@Override
public void unlock() {
try {
sync.unlockWrite();
} catch (LockReleasedException | IllegalStateLockException e) {
logger.warn(e);
}
}
@Override
public boolean tryLock() {
try {
return sync.tryLockWrite();
} catch (LockReleasedException e) {
logger.warn(e);
return false;
}
}
@Override
public boolean tryLock(int ms) {
try {
return sync.tryLockWrite(ms);
} catch (LockReleasedException e) {
logger.warn(e);
return false;
}
}
}
}
所有介面
org.footoo.storage.transaction.lock.Lock
/**
* Alipay.com Inc.
* Copyright (c) 2004-2014 All Rights Reserved.
*/
package org.footoo.storage.transaction.lock;
/**
* 鎖
*
* @author jeff
* @version $Id: Rock.java, v 0.1 2014年3月14日 下午7:10:46 jeff Exp $
*/
public interface Lock {
/**
* 鎖定
*/
public void lock();
/**
* 解鎖
*/
public void unlock();
/**
* 嘗試加鎖,立即返回
*
* @return
*/
public boolean tryLock();
/**
* 嘗試加鎖,ms毫秒後超時
*
* @param ms
* @return
*/
public boolean tryLock(int ms);
}
org.footoo.storage.transaction.lock.RWLock
/**
* Alipay.com Inc.
* Copyright (c) 2004-2014 All Rights Reserved.
*/
package org.footoo.storage.transaction.lock;
/**
* 讀寫鎖
* <ul>
* <li>能夠實現基本的讀寫鎖功能
* <li>同線程的讀寫鎖不會阻塞
* </ul>
*
* @author jeff
* @version $Id: RWRock.java, v 0.1 2014年3月14日 下午7:10:05 jeff Exp $
*/
public interface RWLock {
/**
* 獲得讀鎖
*
* @return
*/
public Lock readLock();
/**
* 寫鎖
*
* @return
*/
public Lock writeLock();
/**
* 釋放當前執行緒的所有讀寫鎖
*/
public void releaseSelf();
/**
* 釋放鎖
*/
public void release();
}