1. 程式人生 > >自己動手寫帶有事務支援的分散式Key-Value儲存系統——讀寫鎖

自己動手寫帶有事務支援的分散式Key-Value儲存系統——讀寫鎖

      為了實現高併發的鎖機制事務,需要使用讀寫鎖進行併發控制。Java類庫本身提供了讀寫鎖,但是其靈活性對於這個系統的需求是不夠的。本系統的讀寫鎖需要實現在單個執行緒內部能夠任意的加寫鎖,加讀鎖,並且不會阻塞,鎖的釋放可以單次單次進行,也可以一次完成(多次加鎖,一次釋放);但是多個執行緒之間仍然符合讀寫鎖的互斥性。

     為了實現這樣的需求,我的設計如下

     |--------------|           讀執行緒佇列                       |-----------------------|-----------------------|--------------------|

     |                   |--------------------------------------->|執行緒+讀鎖定次數|執行緒+讀鎖定次數|。。。。。。   |

     |                   |

      |  讀寫鎖    |         寫執行緒資訊                           |---------------------------|

      |-------------|----------------------------------------->| 寫執行緒+寫鎖定次數|

                                                                                   |---------------------------|

    每一個讀寫鎖,儲存一個讀執行緒佇列,佇列中儲存加了讀鎖的執行緒,以及每個執行緒加讀鎖的次數;並且儲存一個寫執行緒資訊(執行緒+加寫鎖的次數)。每一次加讀鎖(共享鎖)的時候,必須是寫鎖未被鎖定,或者寫鎖的鎖定者是當前執行緒,否則就等待,加讀鎖就是將當前執行緒對應的讀鎖定次數加1;加寫鎖(排他鎖)的時候,已加寫鎖執行緒必須為空,或者為當前執行緒並且讀執行緒佇列為空,或者只有當前執行緒,否則等待,加鎖時就是將寫鎖定次數加一。其餘的操作讀者可以自行推理。如果有問題,請留言相互討論。

org.footoo.storage.transaction.lock.RandomSync

/**
 * Alipay.com Inc.
 * Copyright (c) 2004-2014 All Rights Reserved.
 */
package org.footoo.storage.transaction.lock;

import java.util.LinkedList;
import java.util.List;

import org.footoo.common.log.Logger;
import org.footoo.common.log.LoggerFactory;

/**
 * <h1>同步函式,這哥們必須是執行緒安全的</h1>
 * <b>可以加讀鎖的前提</b>
 * <ul>
 *  <li>寫鎖沒有鎖定,即加寫鎖的執行緒為null,或者為當前執行緒
 * </ul>
 * <br>
 * <b>可以加寫鎖的前提</b>
 * <ul>
 *  <li>寫鎖沒有鎖定,即加寫鎖的執行緒為null,或者為當前執行緒
 *  <li>讀鎖沒有鎖定,即加讀鎖的執行緒為null,或者是當前執行緒
 * </ul>
 * 
 * <br>
 * <h2>問題<h2>
 * <ul>
 *  <li>隨機的喚醒方式會出現寫執行緒飢餓的現象,即寫執行緒鎖定了,
 *  可以鎖定大量的讀執行緒,導致讀執行緒得不到執行,
 *  詳見org.footoo.storage.SyncTest2。
 *  這樣就需要一種公平的喚醒方式來提高公平性
 * </ul>
 * 
 * @author jeff
 * @version $Id: Sync.java, v 0.1 2014年3月17日 下午4:48:48 jeff Exp $
 */
public class RandomSync {
    /** 鎖物件 */
    private final Object               lock          = new Object();
    /** 獲得了共享鎖的執行緒 */
    private final List<ThreadRefCount> sharedThreads = new LinkedList<>();
    /** 獲得獨佔鎖的執行緒 */
    private ThreadRefCount             writeThreads  = null;
    /** 日誌工具 */
    private static final Logger        logger        = LoggerFactory.getLogger(RandomSync.class);
    /** 是否已經被釋放 */
    private boolean                    released      = false;

    /**
     * 共享鎖的方式鎖
     * 
     * @throws LockReleasedException
     */
    public void lockShared() throws LockReleasedException {
        synchronized (lock) {
            waitUtilCanRead();
            //進行加讀鎖,其實就是增加相應的執行緒的鎖計數
            ThreadRefCount refCount = getOrCreateRef(sharedThreads, Thread.currentThread());
            refCount.incRefCount();
        }
    }

    /**
     * 排他的方式鎖
     * 
     * @throws LockReleasedException
     */
    public void lockWrite() throws LockReleasedException {
        synchronized (lock) {
            waitUtilCanWrite();
            if (writeThreads == null) {
                writeThreads = new ThreadRefCount(Thread.currentThread(), 0);
            }
            //進行寫鎖的加鎖,其實就是增加寫執行緒的加鎖計數
            writeThreads.incRefCount();
        }
    }

    /**
     * 解鎖共享鎖
     * 
     * @throws LockReleasedException
     * @throws IllegalStateLockException 
     */
    public void unlockShared() throws LockReleasedException, IllegalStateLockException {
        synchronized (lock) {
            testSyncOk();
            //獲取執行緒對應的應用計數情況
            ThreadRefCount refCount = getRefCount(sharedThreads, Thread.currentThread());
            //狀態不合法
            if (refCount == null || refCount.getRefCount() <= 0) {
                throw new IllegalStateLockException("無法解鎖未處於鎖定狀態的讀鎖");
            }
            //減去一個引用計數
            refCount.decRefCount();
            if (refCount.getRefCount() == 0) {
                sharedThreads.remove(refCount);
                //喚醒等待的寫執行緒
                if (sharedThreads.isEmpty() && writeThreads == null) {
                    lock.notifyAll();
                }
            }
        }
    }

    /**
     * 解鎖寫鎖
     * 
     * @throws LockReleasedException
     * @throws IllegalStateLockException
     */
    public void unlockWrite() throws LockReleasedException, IllegalStateLockException {
        synchronized (lock) {
            testSyncOk();

            if (!writeThreads.getThread().equals(Thread.currentThread())) {
                throw new IllegalStateLockException("無法解鎖未被鎖定的寫鎖");
            }

            writeThreads.decRefCount();
            if (writeThreads.getRefCount() == 0) {
                //寫鎖徹底被解開
                writeThreads = null;
                //喚醒所有等待的執行緒
                lock.notifyAll();
            }
        }
    }

    /**
     * 嘗試timeoutms長度時間的加鎖
     * 
     * @param timeoutms
     * @return
     * @throws LockReleasedException
     */
    public boolean tryLockShared(int timeoutms) throws LockReleasedException {
        synchronized (lock) {
            testSyncOk();
            if (!isWriteLockUnlock()) {
                if (timeoutms > 0) {
                    try {
                        lock.wait(timeoutms);
                    } catch (InterruptedException e) {
                        logger.warn(e, "try lock shared時等待異常");
                    }
                } else {
                    return false;
                }
                testSyncOk();
            }
            //醒來重新測試
            if (isWriteLockUnlock()) {
                ThreadRefCount refCount = getOrCreateRef(sharedThreads, Thread.currentThread());
                refCount.incRefCount();
                return true;
            } else {
                return false;
            }
        }
    }

    /**
     * 不阻塞的方式加鎖
     * 
     * @return
     * @throws LockReleasedException
     */
    public boolean tryLockShared() throws LockReleasedException {
        return tryLockShared(0);
    }

    /**
     * 嘗試在timeoutms毫秒內加寫鎖
     * 
     * @param timeoutms
     * @return
     * @throws LockReleasedException
     */
    public boolean tryLockWrite(int timeoutms) throws LockReleasedException {
        synchronized (lock) {
            if (!canAddWriteLock()) {
                if (timeoutms > 0) {
                    try {
                        lock.wait(timeoutms);
                    } catch (InterruptedException e) {
                        logger.error(e, "try lock write 等待失敗");
                    }
                } else {
                    return false;
                }
                testSyncOk();
            }
            //測試狀態
            if (!canAddWriteLock()) {
                return false;
            } else {
                if (writeThreads == null) {
                    writeThreads = new ThreadRefCount(Thread.currentThread(), 0);
                }
                writeThreads.incRefCount();
                return true;
            }
        }
    }

    /**
     * 非阻塞方式加鎖
     * 
     * @return
     * @throws LockReleasedException
     */
    public boolean tryLockWrite() throws LockReleasedException {
        return tryLockWrite(0);
    }

    /**
     * 釋放當前執行緒的所有的資源
     * 
     * @throws LockReleasedException
     */
    public void releaseSelf() throws LockReleasedException {
        synchronized (lock) {
            testSyncOk();

            //釋放讀鎖資源
            ThreadRefCount refCount = getRefCount(sharedThreads, Thread.currentThread());
            if (refCount != null) {
                sharedThreads.remove(refCount);
            }

            //釋放寫鎖
            if (writeThreads != null && writeThreads.getThread() == Thread.currentThread()) {
                writeThreads = null;
                lock.notifyAll();
            }
        }
    }

    /**
     * 釋放掉這個鎖
     * 
     * @throws LockReleasedException
     */
    public void release() throws LockReleasedException {
        synchronized (lock) {
            //不允許重複釋放
            testSyncOk();
            released = true;
        }
    }

    /**
     * 等待當前執行緒可以加寫鎖
     * 
     * @throws LockReleasedException
     */
    private void waitUtilCanRead() throws LockReleasedException {
        synchronized (lock) {
            testSyncOk();
            while (!isWriteLockUnlock()) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    logger.warn(e, "執行緒等待失敗");
                }
                //重新測試鎖狀態
                testSyncOk();
            }
        }
    }

    /**
     * 等待直到可以加寫鎖
     * 
     * @throws LockReleasedException
     */
    private void waitUtilCanWrite() throws LockReleasedException {
        synchronized (lock) {
            testSyncOk();
            //等待直到可以加寫鎖
            while (!canAddWriteLock()) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    logger.warn(e, "執行緒等待失敗");
                }
                //重新測試鎖狀態
                testSyncOk();
            }
        }
    }

    /**
     * 是否可以加讀鎖
     * 
     * @return
     */
    private boolean canAddWriteLock() {
        return isWriteLockUnlock() && isReadLockUnlock();
    }

    /**
     * 是否讀鎖還沒有鎖定
     * 
     * @return
     */
    private boolean isReadLockUnlock() {
        return sharedThreads.isEmpty()
               || (sharedThreads.size() == 1 && sharedThreads.get(0).getThread() == Thread
                   .currentThread());
    }

    /**
     * 寫鎖是否未鎖
     * 
     * @return
     */
    private boolean isWriteLockUnlock() {
        synchronized (lock) {
            return (writeThreads == null)
                   || (writeThreads.getThread().equals(Thread.currentThread()));
        }
    }

    /**
     * 獲取執行緒therad對應的refCount
     * 
     * @param refCounts
     * @param thread
     * @return
     */
    private ThreadRefCount getRefCount(List<ThreadRefCount> refCounts, Thread thread) {
        ThreadRefCount refCount = new ThreadRefCount(Thread.currentThread(), 0);
        int index = 0;
        synchronized (lock) {

            if ((index = refCounts.indexOf(refCount)) == -1) {
                refCount = null;
            } else {
                refCount = refCounts.get(index);
            }
            return refCount;
        }
    }

    /**
     * 獲取或者建立執行緒的加鎖資訊
     * 
     * @param refCounts
     * @param thread
     * @return
     */
    private ThreadRefCount getOrCreateRef(List<ThreadRefCount> refCounts, Thread thread) {
        ThreadRefCount refCount = new ThreadRefCount(Thread.currentThread(), 0);
        int index = 0;
        synchronized (lock) {

            if ((index = refCounts.indexOf(refCount)) == -1) {
                refCounts.add(refCount);
            } else {
                refCount = refCounts.get(index);
            }
            return refCount;
        }

    }

    /**
     * 確保鎖沒有被釋放
     * 
     * @throws LockReleasedException
     */
    public void testSyncOk() throws LockReleasedException {
        synchronized (lock) {
            if (released) {
                throw new LockReleasedException();
            }
        }
    }

    /**
     * 計數執行緒鎖定的次數
     * 
     * @author jeff
     * @version $Id: Sync.java, v 0.1 2014年3月17日 下午5:45:26 jeff Exp $
     */
    private class ThreadRefCount {
        /** 鎖定的執行緒 */
        private Thread thread;
        /** 鎖定的次數 */
        private int    refCount = 0;

        public ThreadRefCount(Thread thread, int refCount) {
            this.thread = thread;
            this.refCount = refCount;
        }

        /**
         * 增加引用計數
         */
        public void incRefCount() {
            refCount++;
        }

        /**
         * 減少引用計數
         */
        public void decRefCount() {
            refCount--;
        }

        /**
         * Getter method for property <tt>thread</tt>.
         * 
         * @return property value of thread
         */
        public Thread getThread() {
            return thread;
        }

        /**
         * Setter method for property <tt>thread</tt>.
         * 
         * @param thread value to be assigned to property thread
         */
        @SuppressWarnings("unused")
        public void setThread(Thread thread) {
            this.thread = thread;
        }

        /**
         * Getter method for property <tt>refCount</tt>.
         * 
         * @return property value of refCount
         */
        public int getRefCount() {
            return refCount;
        }

        /**
         * Setter method for property <tt>refCount</tt>.
         * 
         * @param refCount value to be assigned to property refCount
         */
        @SuppressWarnings("unused")
        public void setRefCount(int refCount) {
            this.refCount = refCount;
        }

        /** 
         * @see java.lang.Object#hashCode()
         */
        public int hashCode() {
            return thread.hashCode();
        }

        /** 
         * @see java.lang.Object#equals(java.lang.Object)
         */
        public boolean equals(Object other) {
            if (this == other) {
                return true;
            }
            if (!(other instanceof ThreadRefCount)) {
                return false;
            }
            return thread.equals(((ThreadRefCount) other).getThread());
        }

    }

}

封裝後

org.footoo.storage.transaction.lock.RandomSyncRWLock

/**
 * Alipay.com Inc.
 * Copyright (c) 2004-2014 All Rights Reserved.
 */
package org.footoo.storage.transaction.lock;

import org.footoo.common.log.Logger;
import org.footoo.common.log.LoggerFactory;

/**
 * 讀寫鎖,實際只是封裝了RandomSync
 * 
 * @author jeff
 * @version $Id: RandomSyncRWLock.java, v 0.1 2014年3月17日 下午10:09:45 jeff Exp $
 */
public class RandomSyncRWLock implements RWLock {
    /** 實際的同步工具 */
    private RandomSync          sync           = new RandomSync();
    /** 讀鎖 */
    private Lock                readLockInner  = new ReadLock();
    /** 寫鎖 */
    private Lock                writeLockInner = new WriteLock();
    /** 日誌工具 */
    private static final Logger logger         = LoggerFactory.getLogger(RandomSyncRWLock.class);

    /** 
     * @see org.footoo.storage.transaction.lock.RWLock#readLock()
     */
    @Override
    public Lock readLock() {
        return readLockInner;
    }

    /** 
     * @see org.footoo.storage.transaction.lock.RWLock#writeLock()
     */
    @Override
    public Lock writeLock() {
        return writeLockInner;
    }

    /** 
     * @see org.footoo.storage.transaction.lock.RWLock#releaseSelf()
     */
    @Override
    public void releaseSelf() {
        try {
            sync.releaseSelf();
        } catch (LockReleasedException e) {
            logger.warn(e, "釋放當前執行緒的讀寫鎖發生異常");

        }
    }

    /** 
     * @see org.footoo.storage.transaction.lock.RWLock#release()
     */
    @Override
    public void release() {
        try {
            sync.release();
        } catch (LockReleasedException e) {
            logger.error(e, "釋放讀寫鎖發生異常");
        }
    }

    /**
     * 讀鎖
     * 
     * @author jeff
     * @version $Id: RandomSyncRWLock.java, v 0.1 2014年3月17日 下午10:12:42 jeff Exp $
     */
    private class ReadLock implements Lock {

        @Override
        public void lock() {
            try {
                sync.lockShared();
            } catch (LockReleasedException e) {
                logger.warn(e);
            }
        }

        @Override
        public void unlock() {
            try {
                sync.unlockShared();
            } catch (LockReleasedException | IllegalStateLockException e) {
                logger.warn(e);
            }
        }

        @Override
        public boolean tryLock() {
            try {
                return sync.tryLockShared();
            } catch (LockReleasedException e) {
                logger.warn(e);
                return false;
            }
        }

        @Override
        public boolean tryLock(int ms) {
            try {
                return sync.tryLockShared(ms);
            } catch (LockReleasedException e) {
                logger.warn(e);
                return false;
            }
        }

    }

    /**
     * 寫鎖
     * 
     * @author jeff
     * @version $Id: RandomSyncRWLock.java, v 0.1 2014年3月17日 下午10:13:40 jeff Exp $
     */
    private class WriteLock implements Lock {

        @Override
        public void lock() {
            try {
                sync.lockWrite();
            } catch (LockReleasedException e) {
                logger.warn(e);
            }
        }

        @Override
        public void unlock() {
            try {
                sync.unlockWrite();
            } catch (LockReleasedException | IllegalStateLockException e) {
                logger.warn(e);

            }
        }

        @Override
        public boolean tryLock() {
            try {
                return sync.tryLockWrite();
            } catch (LockReleasedException e) {
                logger.warn(e);
                return false;
            }
        }

        @Override
        public boolean tryLock(int ms) {
            try {
                return sync.tryLockWrite(ms);
            } catch (LockReleasedException e) {
                logger.warn(e);
                return false;
            }
        }

    }

}
所有介面

org.footoo.storage.transaction.lock.Lock

/**
 * Alipay.com Inc.
 * Copyright (c) 2004-2014 All Rights Reserved.
 */
package org.footoo.storage.transaction.lock;

/**
 * 鎖
 * 
 * @author jeff
 * @version $Id: Rock.java, v 0.1 2014年3月14日 下午7:10:46 jeff Exp $
 */
public interface Lock {
    /**
     * 鎖定
     */
    public void lock();

    /**
     * 解鎖
     */
    public void unlock();

    /**
     * 嘗試加鎖,立即返回
     * 
     * @return
     */
    public boolean tryLock();

    /**
     * 嘗試加鎖,ms毫秒後超時
     * 
     * @param ms
     * @return
     */
    public boolean tryLock(int ms);
}

org.footoo.storage.transaction.lock.RWLock

/**
 * Alipay.com Inc.
 * Copyright (c) 2004-2014 All Rights Reserved.
 */
package org.footoo.storage.transaction.lock;

/**
 * 讀寫鎖
 * <ul>
 * <li>能夠實現基本的讀寫鎖功能
 * <li>同線程的讀寫鎖不會阻塞
 * </ul>
 * 
 * @author jeff
 * @version $Id: RWRock.java, v 0.1 2014年3月14日 下午7:10:05 jeff Exp $
 */
public interface RWLock {
    /**
     * 獲得讀鎖
     * 
     * @return
     */
    public Lock readLock();

    /**
     * 寫鎖
     * 
     * @return
     */
    public Lock writeLock();

    /**
     * 釋放當前執行緒的所有讀寫鎖
     */
    public void releaseSelf();

    /**
     * 釋放鎖
     */
    public void release();

}