1. 程式人生 > >基於同步器AbstractQueuedSynchronizer 實現自定義鎖

基於同步器AbstractQueuedSynchronizer 實現自定義鎖

AbstractQueuedSynchronizer(AQS) 同步器 是JAVA5 實現鎖關建。利用同步器將所得語義(Semantic) 實現, 然後在自定義鎖的實現中聚合AQS同步器。這裡推薦一篇非常好的關於AQS的文章:
http://ifeve.com/introduce-abstractqueuedsynchronizer/

下面是自定鎖的一個實現,其實就是Semphore, 令牌數為2 的例子。

package com.bill.mq.common;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 一個鎖, 在同一時間,只能有2個執行緒能夠並行訪問,超過限制的其他程序進入阻塞狀態。<br/>
 * 定義一個資源初始狀態為2, 一個程序進入獲取,那麼減少1, 一個程序釋放,那麼加1。<br/>
 * 資源狀態的正確範圍在【0,1,2】三個之間。<br/>
 * 當為0時,代表再有新的執行緒對資源進行獲取時,只能進入阻塞狀態。<br/>
 * 注意在任何時候進行狀態變更的時候,均要以CAS作為原子性保障。 <br/>
 * 由於資源的數量多以1個, 同時可以有兩個執行緒佔用資源,需要實現tryAcquireShared 和 tryReleaseShared 方法。
 *
 */
		public class MyTwinsLock implements Lock {

private final Sync sync = new Sync(2);

private static final class Sync extends AbstractQueuedSynchronizer {
    Sync(int count) {
        if (count < 0) {
            throw new IllegalArgumentException("count must be large than zero");
        }
        setState(count);
    }
    @Override
    protected int tryAcquireShared(int reduceCount) {
        for (;;) {
            int current = getState();
            int newCount = current - reduceCount;
            if (newCount < 0 || compareAndSetState(current, newCount)) {
                return newCount;
            }
        }
    }
    @Override
    protected boolean tryReleaseShared(int returnCount) {
        for (;;) {
            int current = getState();
            int newCount = current + returnCount;
            if (compareAndSetState(current, newCount)) {
                return true;
            }
        }
    }
}

@Override
public void lock() {
    sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock() {
    return sync.tryAcquireShared(1) >= 0;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}

@Override
public void unlock() {
    sync.releaseShared(1);
}

@Override
public Condition newCondition() {
    return null;
}

}