1. 程式人生 > >非阻塞同步演算法與CAS(Compare and Swap)無鎖演算法

非阻塞同步演算法與CAS(Compare and Swap)無鎖演算法

鎖(lock)的代價

鎖是用來做併發最簡單的方式,當然其代價也是最高的。核心態的鎖的時候需要作業系統進行一次上下文切換,加鎖、釋放鎖會導致比較多的上下文切換和排程延時,等待鎖的執行緒會被掛起直至鎖釋放。在上下文切換的時候,cpu之前快取的指令和資料都將失效,對效能有很大的損失。作業系統對多執行緒的鎖進行判斷就像兩姐妹在為一個玩具在爭吵,然後作業系統就是能決定他們誰能拿到玩具的父母,這是很慢的。使用者態的鎖雖然避免了這些問題,但是其實它們只是在沒有真實的競爭時才有效。

Java在JDK1.5之前都是靠synchronized關鍵字保證同步的,這種通過使用一致的鎖定協議來協調對共享狀態的訪問,可以確保無論哪個執行緒持有守護變數的鎖,都採用獨佔的方式來訪問這些變數,如果出現多個執行緒同時訪問鎖,那第一些線執行緒將被掛起,當執行緒恢復執行時,必須等待其它執行緒執行完他們的時間片以後才能被排程執行,在掛起和恢復執行過程中存在著很大的開銷。鎖還存在著其它一些缺點,當一個執行緒正在等待鎖時,它不能做任何事。如果一個執行緒在持有鎖的情況下被延遲執行,那麼所有需要這個鎖的執行緒都無法執行下去。如果被阻塞的執行緒優先順序高,而持有鎖的執行緒優先順序低,將會導致優先順序反轉(Priority Inversion)。

樂觀鎖與悲觀鎖

獨佔鎖是一種悲觀鎖,synchronized就是一種獨佔鎖,它假設最壞的情況,並且只有在確保其它執行緒不會造成干擾的情況下執行,會導致其它所有需要鎖的執行緒掛起,等待持有鎖的執行緒釋放鎖。而另一個更加有效的鎖就是樂觀鎖。所謂樂觀鎖就是,每次不加鎖而是假設沒有衝突而去完成某項操作,如果因為衝突失敗就重試,直到成功為止。

volatile的問題

與鎖相比,volatile變數是一和更輕量級的同步機制,因為在使用這些變數時不會發生上下文切換和執行緒排程等操作,但是volatile變數也存在一些侷限:不能用於構建原子的複合操作,因此當一個變數依賴舊值時就不能使用volatile變數。(參考:

談談volatiile

volatile只能保證變數對各個執行緒的可見性,但不能保證原子性。為什麼?見我的另外一篇文章:《為什麼volatile不能保證原子性而Atomic可以?

Java中的原子操作( atomic operations)

原子操作指的是在一步之內就完成而且不能被中斷。原子操作在多執行緒環境中是執行緒安全的,無需考慮同步的問題。在java中,下列操作是原子操作:

  • all assignments of primitive types except for long and double
  • all assignments of references
  • all operations of java.concurrent.Atomic* classes
  • all assignments to volatile longs and doubles

問題來了,為什麼long型賦值不是原子操作呢?例如:

long foo = 65465498L;

實時上java會分兩步寫入這個long變數,先寫32位,再寫後32位。這樣就執行緒不安全了。如果改成下面的就執行緒安全了:

private volatile long foo;

因為volatile內部已經做了synchronized.

CAS無鎖演算法

要實現無鎖(lock-free)的非阻塞演算法有多種實現方法,其中CAS(比較與交換,Compare and swap)是一種有名的無鎖演算法。CAS, CPU指令,在大多數處理器架構,包括IA32、Space中採用的都是CAS指令,CAS的語義是“我認為V的值應該為A,如果是,那麼將V的值更新為B,否則不修改並告訴V的值實際為多少”,CAS是項樂觀鎖技術,當多個執行緒嘗試使用CAS同時更新同一個變數時,只有其中一個執行緒能更新變數的值,而其它執行緒都失敗,失敗的執行緒並不會被掛起,而是被告知這次競爭中失敗,並可以再次嘗試。CAS有3個運算元,記憶體值V,舊的預期值A,要修改的新值B。當且僅當預期值A和記憶體值V相同時,將記憶體值V修改為B,否則什麼都不做。CAS無鎖演算法的C實現如下:

int compare_and_swap (int* reg, int oldval, int newval) 
{
  ATOMIC();
  int old_reg_val = *reg;
  if (old_reg_val == oldval) 
     *reg = newval;
  END_ATOMIC();
  return old_reg_val;
}

CAS(樂觀鎖演算法)的基本假設前提

CAS比較與交換的虛擬碼可以表示為:

do{  
       備份舊資料; 
       基於舊資料構造新資料; 
}while(!CAS( 記憶體地址,備份的舊資料,新資料 ))  

ConcurrencyCAS 

(上圖的解釋:CPU去更新一個值,但如果想改的值不再是原來的值,操作就失敗,因為很明顯,有其它操作先改變了這個值。)

就是指當兩者進行比較時,如果相等,則證明共享資料沒有被修改,替換成新值,然後繼續往下執行;如果不相等,說明共享資料已經被修改,放棄已經所做的操作,然後重新執行剛才的操作。容易看出 CAS 操作是基於共享資料不會被修改的假設,採用了類似於資料庫的 commit-retry 的模式。當同步衝突出現的機會很少時,這種假設能帶來較大的效能提升。

CAS的開銷(CPU Cache Miss problem)

前面說過了,CAS(比較並交換)是CPU指令級的操作,只有一步原子操作,所以非常快。而且CAS避免了請求作業系統來裁定鎖的問題,不用麻煩作業系統,直接在CPU內部就搞定了。但CAS就沒有開銷了嗎?不!有cache miss的情況。這個問題比較複雜,首先需要了解CPU的硬體體系結構:

2014-02-19_11h35_45

上圖可以看到一個8核CPU計算機系統,每個CPU有cache(CPU內部的快取記憶體,暫存器),管芯內還帶有一個互聯模組,使管芯內的兩個核可以互相通訊。在圖中央的系統互聯模組可以讓四個管芯相互通訊,並且將管芯與主存連線起來。資料以“快取線”為單位在系統中傳輸,“快取線”對應於記憶體中一個 2 的冪大小的位元組塊,大小通常為 32 到 256 位元組之間。當 CPU 從記憶體中讀取一個變數到它的暫存器中時,必須首先將包含了該變數的快取線讀取到 CPU 快取記憶體。同樣地,CPU 將暫存器中的一個值儲存到記憶體時,不僅必須將包含了該值的快取線讀到 CPU 快取記憶體,還必須確保沒有其他 CPU 擁有該快取線的拷貝。

比如,如果 CPU0 在對一個變數執行“比較並交換”(CAS)操作,而該變數所在的快取線在 CPU7 的快取記憶體中,就會發生以下經過簡化的事件序列:

  • CPU0 檢查本地快取記憶體,沒有找到快取線。
  • 請求被轉發到 CPU0 和 CPU1 的互聯模組,檢查 CPU1 的本地快取記憶體,沒有找到快取線。
  • 請求被轉發到系統互聯模組,檢查其他三個管芯,得知快取線被 CPU6和 CPU7 所在的管芯持有。
  • 請求被轉發到 CPU6 和 CPU7 的互聯模組,檢查這兩個 CPU 的快取記憶體,在 CPU7 的快取記憶體中找到快取線。
  • CPU7 將快取線傳送給所屬的互聯模組,並且重新整理自己快取記憶體中的快取線。
  • CPU6 和 CPU7 的互聯模組將快取線傳送給系統互聯模組。
  • 系統互聯模組將快取線傳送給 CPU0 和 CPU1 的互聯模組。
  • CPU0 和 CPU1 的互聯模組將快取線傳送給 CPU0 的快取記憶體。
  • CPU0 現在可以對快取記憶體中的變數執行 CAS 操作了

以上是重新整理不同CPU快取的開銷。最好情況下的 CAS 操作消耗大概 40 納秒,超過 60 個時鐘週期。這裡的“最好情況”是指對某一個變數執行 CAS 操作的 CPU 正好是最後一個操作該變數的CPU,所以對應的快取線已經在 CPU 的快取記憶體中了,類似地,最好情況下的鎖操作(一個“round trip 對”包括獲取鎖和隨後的釋放鎖)消耗超過 60 納秒,超過 100 個時鐘週期。這裡的“最好情況”意味著用於表示鎖的資料結構已經在獲取和釋放鎖的 CPU 所屬的快取記憶體中了。鎖操作比 CAS 操作更加耗時,是因深入理解並行程式設計
為鎖操作的資料結構中需要兩個原子操作。快取未命中消耗大概 140 納秒,超過 200 個時鐘週期。需要在儲存新值時查詢變數的舊值的 CAS 操作,消耗大概 300 納秒,超過 500 個時鐘週期。想想這個,在執行一次 CAS 操作的時間裡,CPU 可以執行 500 條普通指令。這表明了細粒度鎖的侷限性。

以下是cache miss cas 和lock的效能對比:

2014-02-19_11h43_23

JVM對CAS的支援:AtomicInt, AtomicLong.incrementAndGet()

在JDK1.5之前,如果不編寫明確的程式碼就無法執行CAS操作,在JDK1.5中引入了底層的支援,在int、long和物件的引用等型別上都公開了CAS的操作,並且JVM把它們編譯為底層硬體提供的最有效的方法,在執行CAS的平臺上,執行時把它們編譯為相應的機器指令,如果處理器/CPU不支援CAS指令,那麼JVM將使用自旋鎖。因此,值得注意的是,CAS解決方案與平臺/編譯器緊密相關(比如x86架構下其對應的彙編指令是lock cmpxchg,如果想要64Bit的交換,則應使用lock cmpxchg8b。在.NET中我們可以使用Interlocked.CompareExchange函式)

在原子類變數中,如java.util.concurrent.atomic中的AtomicXXX,都使用了這些底層的JVM支援為數字型別的引用型別提供一種高效的CAS操作,而在java.util.concurrent中的大多數類在實現時都直接或間接的使用了這些原子變數類。

Java 1.6中AtomicLong.incrementAndGet()的實現原始碼為:

1: /*
   2:  * Written by Doug Lea with assistance from members of JCP JSR-166
   3:  * Expert Group and released to the public domain, as explained at
   4:  * http://creativecommons.org/licenses/publicdomain
   5:  */
   6: 
   7: package java.util.concurrent.atomic;
   8: import sun.misc.Unsafe;
   9: 
  10: /**
  11:  * A <tt>long</tt> value that may be updated atomically.  See the
  12:  * {@link java.util.concurrent.atomic} package specification for
  13:  * description of the properties of atomic variables. An
  14:  * <tt>AtomicLong</tt> is used in applications such as atomically
  15:  * incremented sequence numbers, and cannot be used as a replacement
  16:  * for a {@link java.lang.Long}. However, this class does extend
  17:  * <tt>Number</tt> to allow uniform access by tools and utilities that
  18:  * deal with numerically-based classes.
  19:  *
  20:  * @since 1.5
  21:  * @author Doug Lea
  22:  */
  23: public class AtomicLong extends Number implements java.io.Serializable {
  24:     private static final long serialVersionUID = 1927816293512124184L;
  25: 
  26:     // setup to use Unsafe.compareAndSwapLong for updates
  27:     private static final Unsafe unsafe = Unsafe.getUnsafe();
  28:     private static final long valueOffset;
  29: 
  30:     /**
  31:      * Records whether the underlying JVM supports lockless
  32:      * CompareAndSet for longs. While the unsafe.CompareAndSetLong
  33:      * method works in either case, some constructions should be
  34:      * handled at Java level to avoid locking user-visible locks.
  35:      */
  36:     static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
  37: 
  38:     /**
  39:      * Returns whether underlying JVM supports lockless CompareAndSet
  40:      * for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS.
  41:      */
  42:     private static native boolean VMSupportsCS8();
  43: 
  44:     static {
  45:       try {
  46:         valueOffset = unsafe.objectFieldOffset
  47:             (AtomicLong.class.getDeclaredField("value"));
  48:       } catch (Exception ex) { throw new Error(ex); }
  49:     }
  50: 
  51:     private volatile long value;
  52: 
  53:     /**
  54:      * Creates a new AtomicLong with the given initial value.
  55:      *
  56:      * @param initialValue the initial value
  57:      */
  58:     public AtomicLong(long initialValue) {
  59:         value = initialValue;
  60:     }
  61: 
  62:     /**
  63:      * Creates a new AtomicLong with initial value <tt>0</tt>.
  64:      */
  65:     public AtomicLong() {
  66:     }
  67: 
  68:     /**
  69:      * Gets the current value.
  70:      *
  71:      * @return the current value
  72:      */
  73:     public final long get() {
  74:         return value;
  75:     }
  76: 
  77:     /**
  78:      * Sets to the given value.
  79:      *
  80:      * @param newValue the new value
  81:      */
  82:     public final void set(long newValue) {
  83:         value = newValue;
  84:     }
  85: 
  86:     /**
  87:      * Eventually sets to the given value.
  88:      *
  89:      * @param newValue the new value
  90:      * @since 1.6
  91:      */
  92:     public final void lazySet(long newValue) {
  93:         unsafe.putOrderedLong(this, valueOffset, newValue);
  94:     }
  95: 
  96:     /**
  97:      * Atomically sets to the given value and returns the old value.
  98:      *
  99:      * @param newValue the new value
 100:      * @return the previous value
 101:      */
 102:     public final long getAndSet(long newValue) {
 103:         while (true) {
 104:             long current = get();
 105:             if (compareAndSet(current, newValue))
 106:                 return current;
 107:         }
 108:     }
 109: 
 110:     /**
 111:      * Atomically sets the value to the given updated value
 112:      * if the current value <tt>==</tt> the expected value.
 113:      *
 114:      * @param expect the expected value
 115:      * @param update the new value
 116:      * @return true if successful. False return indicates that
 117:      * the actual value was not equal to the expected value.
 118:      */
 119:     public final boolean compareAndSet(long expect, long update) {
 120:     return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
 121:     }
 122: 
 123:     /**
 124:      * Atomically sets the value to the given updated value
 125:      * if the current value <tt>==</tt> the expected value.
 126:      * May fail spuriously and does not provide ordering guarantees,
 127:      * so is only rarely an appropriate alternative to <tt>compareAndSet</tt>.
 128:      *
 129:      * @param expect the expected value
 130:      * @param update the new value
 131:      * @return true if successful.
 132:      */
 133:     public final boolean weakCompareAndSet(long expect, long update) {
 134:     return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
 135:     }
 136: 
 137:     /**
 138:      * Atomically increments by one the current value.
 139:      *
 140:      * @return the previous value
 141:      */
 142:     public final long getAndIncrement() {
 143:         while (true) {
 144:             long current = get();
 145:             long next = current + 1;
 146:             if (compareAndSet(current, next))
 147:                 return current;
 148:         }
 149:     }
 150: 
 151:     /**
 152:      * Atomically decrements by one the current value.
 153:      *
 154:      * @return the previous value
 155:      */
 156:     public final long getAndDecrement() {
 157:         while (true) {
 158:             long current = get();
 159:             long next = current - 1;
 160:             if (compareAndSet(current, next))
 161:                 return current;
 162:         }
 163:     }
 164: 
 165:     /**
 166:      * Atomically adds the given value to the current value.
 167:      *
 168:      * @param delta the value to add
 169:      * @return the previous value
 170:      */
 171:     public final long getAndAdd(long delta) {
 172:         while (true) {
 173:             long current = get();
 174:             long next = current + delta;
 175:             if (compareAndSet(current, next))
 176:                 return current;
 177:         }
 178:     }
 179: 
 180:     /**
 181:      * Atomically increments by one the current value.
 182:      *
 183:      * @return the updated value
 184:      */
 185:     public final long incrementAndGet() {
 186:         for (;;) {
 187:             long current = get();
 188:             long next = current + 1;
 189:             if (compareAndSet(current, next))
 190:                 return next;
 191:         }
 192:     }
 193: 
 194:     /**
 195:      * Atomically decrements by one the current value.
 196:      *
 197:      * @return the updated value
 198:      */
 199:     public final long decrementAndGet() {
 200:         for (;;) {
 201:             long current = get();
 202:             long next = current - 1;
 203:             if (compareAndSet(current, next))
 204:                 return next;
 205:         }
 206:     }
 207: 
 208:     /**
 209:      * Atomically adds the given value to the current value.
 210:      *
 211:      * @param delta the value to add
 212:      * @return the updated value
 213:      */
 214:     public final long addAndGet(long delta) {
 215:         for (;;) {
 216:             long current = get();
 217:             long next = current + delta;
 218:             if (compareAndSet(current, next))
 219:                 return next;
 220:         }
 221:     }
 222: 
 223:     /**
 224:      * Returns the String representation of the current value.
 225:      * @return the String representation of the current value.
 226:      */
 227:     public String toString() {
 228:         return Long.toString(get());
 229:     }
 230: 
 231: 
 232:     public int intValue() {
 233:     return (int)get();
 234:     }
 235: 
 236:     public long longValue() {
 237:     return (long)get();
 238:     }
 239: 
 240:     public float floatValue() {
 241:     return (float)get();
 242:     }
 243: 
 244:     public double doubleValue() {
 245:     return (double)get();
 246:     }
 247: 
 248: }

由此可見,AtomicLong.incrementAndGet的實現用了樂觀鎖技術,呼叫了sun.misc.Unsafe類庫裡面的 CAS演算法,用CPU指令來實現無鎖自增。所以,AtomicLong.incrementAndGet的自增比用synchronized的鎖效率倍增。

public final int getAndIncrement() {  
        for (;;) {  
            int current = get();  
            int next = current + 1;  
            if (compareAndSet(current, next))  
                return current;  
        }  
}  
  
public final boolean compareAndSet(int expect, int update) {  
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);  
}  

下面是測試程式碼:可以看到用AtomicLong.incrementAndGet的效能比用synchronized高出幾倍。

2014-02-12_14h56_39

package console;

import java.util.concurrent.atomic.AtomicLong;

public class main {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		System.out.println("START -- ");
		calc();
		calcSynchro();
		calcAtomic();
		
		testThreadsSync();
		testThreadsAtomic();
		
		testThreadsSync2();
		testThreadsAtomic2();
		
		System.out.println("-- FINISHED ");
	}

	private static void calc() {
		stopwatch sw = new stopwatch();
		sw.start();

		long val = 0;
		while (val < 10000000L) {
			val++;
		}
		sw.stop();
		long milSecds = sw.getElapsedTime();

		System.out.println(" calc() elapsed (ms): " + milSecds);
	}

	private static void calcSynchro() {
		stopwatch sw = new stopwatch();
		sw.start();

		long val = 0;

		while (val < 10000000L) {
			synchronized (main.class) {
				val++;
			}
		}

		sw.stop();
		long milSecds = sw.getElapsedTime();

		System.out.println(" calcSynchro() elapsed (ms): " + milSecds);
	}

	private static void calcAtomic() {
		stopwatch sw = new stopwatch();
		sw.start();

		AtomicLong val = new AtomicLong(0);
		while (val.incrementAndGet() < 10000000L) {

		}
		sw.stop();
		long milSecds = sw.getElapsedTime();

		System.out.println(" calcAtomic() elapsed (ms): " + milSecds);

	}
    

	private static void testThreadsSync(){
		
		stopwatch sw = new stopwatch();
		sw.start();
		
		Thread t1 = new Thread(new LoopSync());
		t1.start();
		
		Thread t2 = new Thread(new LoopSync());
		t2.start();
				
		while (t1.isAlive() || t2.isAlive()) {
			
        }
				
		sw.stop();
		long milSecds = sw.getElapsedTime();

		System.out.println(" testThreadsSync() 1 thread elapsed (ms): " + milSecds);
		
	}
	
	private static void testThreadsAtomic(){
	
		stopwatch sw = new stopwatch();
		sw.start();
		
		Thread t1 = new Thread(new LoopAtomic());
		t1.start();
		
		Thread t2 = new Thread(new LoopAtomic());
		t2.start();
				
		while (t1.isAlive() || t2.isAlive()) {
			
        }
				
		sw.stop();
		long milSecds = sw.getElapsedTime();

		System.out.println(" testThreadsAtomic() 1 thread elapsed (ms): " + milSecds);
		
	}
	
	private static void testThreadsSync2(){
		
		stopwatch sw = new stopwatch();
		sw.start();
		
		Thread t1 = new Thread(new LoopSync());
		t1.start();
		
		Thread t2 = new Thread(new LoopSync());
		t2.start();
				
		while (t1.isAlive() || t2.isAlive()) {
			
        }
				
		sw.stop();
		long milSecds = sw.getElapsedTime();

		System.out.println(" testThreadsSync() 2 threads elapsed (ms): " + milSecds);
		
	}
	
	private static void testThreadsAtomic2(){
	
		stopwatch sw = new stopwatch();
		sw.start();
		
		Thread t1 = new Thread(new LoopAtomic());
		t1.start();
		
		Thread t2 = new Thread(new LoopAtomic());
		t2.start();
				
		while (t1.isAlive() || t2.isAlive()) {
			
        }
				
		sw.stop();
		long milSecds = sw.getElapsedTime();

		System.out.println(" testThreadsAtomic() 2 threads elapsed (ms): " + milSecds);
		
	}
	
	private static class LoopAtomic implements Runnable {
		public void run() {
			AtomicLong val = new AtomicLong(0);
			while (val.incrementAndGet() < 10000000L) {

			}
		}
	}
	private static class LoopSync implements Runnable {
		public void run() {
			long val = 0;

			while (val < 10000000L) {
				synchronized (main.class) {
					val++;
				}
			}
		}
	}
}


public class stopwatch {

	private long startTime = 0;
	private long stopTime = 0;
	private boolean running = false;

	public void start() {
		this.startTime = System.currentTimeMillis();
		this.running = true;
	}

	public void stop() {
		this.stopTime = System.currentTimeMillis();
		this.running = false;
	}

	public long getElapsedTime() {
		long elapsed;
		if (running) {
			elapsed = (System.currentTimeMillis() - startTime);
		} else {
			elapsed = (stopTime - startTime);
		}
		return elapsed;
	}

	public long getElapsedTimeSecs() {
		long elapsed;
		if (running) {
			elapsed = ((System.currentTimeMillis() - startTime) / 1000);
		} else {
			elapsed = ((stopTime - startTime) / 1000);
		}
		return elapsed;
	}

	// sample usage
	// public static void main(String[] args) {
	// StopWatch s = new StopWatch();
	// s.start();
	// //code you want to time goes here
	// s.stop();
	// System.out.println("elapsed time in milliseconds: " +
	// s.getElapsedTime());
	// }
}

CAS的例子:非阻塞堆疊

下面是比非阻塞自增稍微複雜一點的CAS的例子:非阻塞堆疊/ConcurrentStackConcurrentStack 中的 push()pop() 操作在結構上與NonblockingCounter 上相似,只是做的工作有些冒險,希望在 “提交” 工作的時候,底層假設沒有失效。push() 方法觀察當前最頂的節點,構建一個新節點放在堆疊上,然後,如果最頂端的節點在初始觀察之後沒有變化,那麼就安裝新節點。如果 CAS 失敗,意味著另一個執行緒已經修改了堆疊,那麼過程就會重新開始。

public class ConcurrentStack<E> {
    AtomicReference<Node<E>> head = new AtomicReference<Node<E>>();
    public void push(E item) {
        Node<E> newHead = new Node<E>(item);
        Node<E> oldHead;
        do {
            oldHead = head.get();
            newHead.next = oldHead;
        } while (!head.compareAndSet(oldHead, newHead));
    }
    public E pop() {
        Node<E> oldHead;
        Node<E> newHead;
        do {
            oldHead = head.get();
            if (oldHead == null) 
                return null;
            newHead = oldHead.next;
        } while (!head.compareAndSet(oldHead,newHead));
        return oldHead.item;
    }
    static class Node<E> {
        final E item;
        Node<E> next;
        public Node(E item) { this.item = item; }
    }
}

在輕度到中度的爭用情況下,非阻塞演算法的效能會超越阻塞演算法,因為 CAS 的多數時間都在第一次嘗試時就成功,而發生爭用時的開銷也不涉及執行緒掛起和上下文切換,只多了幾個迴圈迭代。沒有爭用的 CAS 要比沒有爭用的鎖便宜得多(這句話肯定是真的,因為沒有爭用的鎖涉及 CAS 加上額外的處理),而爭用的 CAS 比爭用的鎖獲取涉及更短的延遲。

在高度爭用的情況下(即有多個執行緒不斷爭用一個記憶體位置的時候),基於鎖的演算法開始提供比非阻塞演算法更好的吞吐率,因為當執行緒阻塞時,它就會停止爭用,耐心地等候輪到自己,從而避免了進一步爭用。但是,這麼高的爭用程度並不常見,因為多數時候,執行緒會把執行緒本地的計算與爭用共享資料的操作分開,從而給其他執行緒使用共享資料的機會。

CAS的例子3:非阻塞連結串列

以上的示例(自增計數器和堆疊)都是非常簡單的非阻塞演算法,一旦掌握了在迴圈中使用 CAS,就可以容易地模仿它們。對於更復雜的資料結構,非阻塞演算法要比這些簡單示例複雜得多,因為修改連結串列、樹或雜湊表可能涉及對多個指標的更新。CAS 支援對單一指標的原子性條件更新,但是不支援兩個以上的指標。所以,要構建一個非阻塞的連結串列、樹或雜湊表,需要找到一種方式,可以用 CAS 更新多個指標,同時不會讓資料結構處於不一致的狀態。

在連結串列的尾部插入元素,通常涉及對兩個指標的更新:“尾” 指標總是指向列表中的最後一個元素,“下一個” 指標從過去的最後一個元素指向新插入的元素。因為需要更新兩個指標,所以需要兩個 CAS。在獨立的 CAS 中更新兩個指標帶來了兩個需要考慮的潛在問題:如果第一個 CAS 成功,而第二個 CAS 失敗,會發生什麼?如果其他執行緒在第一個和第二個 CAS 之間企圖訪問連結串列,會發生什麼?

對於非複雜資料結構,構建非阻塞演算法的 “技巧” 是確保資料結構總處於一致的狀態(甚至包括線上程開始修改資料結構和它完成修改之間),還要確保其他執行緒不僅能夠判斷出第一個執行緒已經完成了更新還是處在更新的中途,還能夠判斷出如果第一個執行緒走向 AWOL,完成更新還需要什麼操作。如果執行緒發現了處在更新中途的資料結構,它就可以 “幫助” 正在執行更新的執行緒完成更新,然後再進行自己的操作。當第一個執行緒回來試圖完成自己的更新時,會發現不再需要了,返回即可,因為 CAS 會檢測到幫助執行緒的干預(在這種情況下,是建設性的干預)。

這種 “幫助鄰居” 的要求,對於讓資料結構免受單個執行緒失敗的影響,是必需的。如果執行緒發現數據結構正處在被其他執行緒更新的中途,然後就等候其他執行緒完成更新,那麼如果其他執行緒在操作中途失敗,這個執行緒就可能永遠等候下去。即使不出現故障,這種方式也會提供糟糕的效能,因為新到達的執行緒必須放棄處理器,導致上下文切換,或者等到自己的時間片過期(而這更糟)。

public class LinkedQueue <E> {
    private static class Node <E> {
        final E item;
        final AtomicReference<Node<E>> next;
        Node(E item, Node<E> next) {
            this.item = item;
            this.next = new AtomicReference<Node<E>>(next);
        }
    }
    private AtomicReference<Node<E>> head
        = new AtomicReference<Node<E>>(new Node<E>(null, null));
    private AtomicReference<Node<E>> tail = head;
    public boolean put(E item) {
        Node<E> newNode = new Node<E>(item, null);
        while (true) {
            Node<E> curTail = tail.get();
            Node<E> residue = curTail.next.get();
            if (curTail == tail.get()) {
                if (residue == null) /* A */ {
                    if (curTail.next.compareAndSet(null, newNode)) /* C */ {
                        tail.compareAndSet(curTail, newNode) /* D */ ;
                        return true;
                    }
                } else {
                    tail.compareAndSet(curTail, residue) /* B */;
                }
            }
        }
    }
}

Java的ConcurrentHashMap的實現原理

Java5中的ConcurrentHashMap,執行緒安全,設計巧妙,用桶粒度的鎖,避免了put和get中對整個map的鎖定,尤其在get中,只對一個HashEntry做鎖定操作,效能提升是顯而易見的。

8aea11a8-4184-3f1f-aba7-169aa5e0797a

具體實現中使用了鎖分離機制,在這個帖子中有非常詳細的討論。這裡有關於Java記憶體模型結合ConcurrentHashMap的分析。以下是JDK6的ConcurrentHashMap的原始碼:

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

/*
 * This file is available under and governed by the GNU General Public
 * License version 2 only, as published by the Free Software Foundation.
 * However, the following notice accompanied the original version of this
 * file:
 *
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/licenses/publicdomain
 */

package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.*;
import java.io.Serializable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamField;

/**
 * A hash table supporting full concurrency of retrievals and
 * adjustable expected concurrency for updates. This class obeys the
 * same functional specification as {@link java.util.Hashtable}, and
 * includes versions of methods corresponding to each method of
 * <tt>Hashtable</tt>. However, even though all operations are
 * thread-safe, retrieval operations do <em>not</em> entail locking,
 * and there is <em>not</em> any support for locking the entire table
 * in a way that prevents all access.  This class is fully
 * interoperable with <tt>Hashtable</tt> in programs that rely on its
 * thread safety but not on its synchronization details.
 *
 * <p> Retrieval operations (including <tt>get</tt>) generally do not
 * block, so may overlap with update operations (including
 * <tt>put</tt> and <tt>remove</tt>). Retrievals reflect the results
 * of the most recently <em>completed</em> update operations holding
 * upon their onset.  For aggregate operations such as <tt>putAll</tt>
 * and <tt>clear</tt>, concurrent retrievals may reflect insertion or
 * removal of only some entries.  Similarly, Iterators and
 * Enumerations return elements reflecting the state of the hash table
 * at some point at or since the creation of the iterator/enumeration.
 * They do <em>not</em> throw {@link ConcurrentModificationException}.
 * However, iterators are designed to be used by only one thread at a time.
 *
 * <p> The allowed concurrency among update operations is guided by
 * the optional <tt>concurrencyLevel</tt> constructor argument
 * (default <tt>16</tt>), which is used as a hint for internal sizing.  The
 * table is internally partitioned to try to permit the indicated
 * number of concurrent updates without contention. Because placement
 * in hash tables is essentially random, the actual concurrency will
 * vary.  Ideally, you should choose a value to accommodate as many
 * threads as will ever concurrently modify the table. Using a
 * significantly higher value than you need can waste space and time,
 * and a significantly lower value can lead to thread contention. But
 * overestimates and underestimates within an order of magnitude do
 * not usually have much noticeable impact. A value of one is
 * appropriate when it is known that only one thread will modify and
 * all others will only read. Also, resizing this or any other kind of
 * hash table is a relatively slow operation, so, when possible, it is
 * a good idea to provide estimates of expected table sizes in
 * constructors.
 *
 * <p>This class and its views and iterators implement all of the
 * <em>optional</em> methods of the {@link Map} and {@link Iterator}
 * interfaces.
 *
 * <p> Like {@link Hashtable} but unlike {@link HashMap}, this class
 * does <em>not</em> allow <tt>null</tt> to be used as a key or value.
 *
 * <p>This class is a member of the
 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 * Java Collections Framework</a>.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <K> the type of keys maintained by this map
 * @param <V> the type of mapped values
 */
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable {
    private static final long serialVersionUID = 7249069246763182397L;

    /*
     * The basic strategy is to subdivide the table among Segments,
     * each of which itself is a concurrently readable hash table.  To
     * reduce footprint, all but one segments are constructed only
     * when first needed (see ensureSegment). To maintain visibility
     * in the presence of lazy construction, accesses to segments as
     * well as elements of segment's table must use volatile access,
     * which is done via Unsafe within methods segmentAt etc
     * below. These provide the functionality of AtomicReferenceArrays
     * but reduce the levels of indirection. Additionally,
     * volatile-writes of table elements and entry "next" fields
     * within locked operations use the cheaper "lazySet" forms of
     * writes (via putOrderedObject) because these writes are always
     * followed by lock releases that maintain sequential consistency
     * of table updates.
     *
     * Historical note: The previous version of this class relied
     * heavily on "final" fields, which avoided some volatile reads at
     * the expense of a large initial footprint.  Some remnants of
     * that design (including forced construction of segment 0) exist
     * to ensure serialization compatibility.
     */

    /* ---------------- Constants -------------- */

    /**
     * The default initial capacity for this table,
     * used when not otherwise specified in a constructor.
     */
    static final int DEFAULT_INITIAL_CAPACITY = 16;

    /**
     * The default load factor for this table, used when not
     * otherwise specified in a constructor.
     */
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

    /**
     * The default concurrency level for this table, used when not
     * otherwise specified in a constructor.
     */
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    /**
     * The maximum capacity, used if a higher value is implicitly
     * specified by either of the constructors with arguments.  MUST
     * be a power of two <= 1<<30 to ensure that entries are indexable
     * using ints.
     */
    static final int MAXIMUM_CAPACITY = 1 << 30;

    /**
     * The minimum capacity for per-segment tables.  Must be a power
     * of two, at least two to avoid immediate resizing on next use
     * after lazy construction.
     */
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

    /**
     * The maximum number of segments to allow; used to bound
     * constructor arguments. Must be power of two less than 1 << 24.
     */
    static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

    /**
     * Number of unsynchronized retries in size and containsValue
     * methods before resorting to locking. This is used to avoid
     * unbounded retries if tables undergo continuous modification
     * which would make it impossible to obtain an accurate result.
     */
    static final int RETRIES_BEFORE_LOCK = 2;

    /* ---------------- Fields -------------- */

    /**
     * Mask value for indexing into segments. The upper bits of a
     * key's hash code are used to choose the segment.
     */
    final int segmentMask;

    /**
     * Shift value for indexing within segments.
     */
    final int segmentShift;

    /**
     * The segments, each of which is a specialized hash table.
     */
    final Segment<K,V>[] segments;

    transient Set<K> keySet;
    transient Set<Map.Entry<K,V>> entrySet;
    transient Collection<V> values;

    /**
     * ConcurrentHashMap list entry. Note that this is never exported
     * out as a user-visible Map.Entry.
     */
    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

        /**
         * Sets next field with volatile write semantics.  (See above
         * about use of putOrderedObject.)
         */
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

        // Unsafe mechanics
        static final sun.misc.Unsafe UNSAFE;
        static final long nextOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = HashEntry.class;
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

    /**
     * Gets the ith element of given table (if nonnull) with volatile
     * read semantics.
     */
    @SuppressWarnings("unchecked")
    static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
        return (tab == null) ? null :
            (HashEntry<K,V>) UNSAFE.getObjectVolatile
            (tab, ((long)i << TSHIFT) + TBASE);
    }

    /**
     * Sets the ith element of given table, with volatile write
     * semantics. (See above about use of putOrderedObject.)
     */
    static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                       HashEntry<K,V> e) {
        UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
    }

    /**
     * Applies a supplemental hash function to a given hashCode, which
     * defends against poor quality hash functions.  This is critical
     * because ConcurrentHashMap uses power-of-two length hash tables,
     * that otherwise encounter collisions for hashCodes that do not
     * differ in lower or upper bits.
     */
    private static int hash(int h) {
        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }

    /**
     * Segments are specialized versions of hash tables.  This
     * subclasses from ReentrantLock opportunistically, just to
     * simplify some locking and avoid separate construction.
     */
    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        /*
         * Segments maintain a table of entry lists that are always
         * kept in a consistent state, so can be read (via volatile
         * reads of segments and tables) without locking.  This
         * requires replicating nodes when necessary during table
         * resizing, so the old lists can be traversed by readers
         * still using old version of table.
         *
         * This class defines only mutative methods requiring locking.
         * Except as noted, the methods of this class perform the
         * per-segment versions of ConcurrentHashMap methods.  (Other
         * methods are integrated directly into ConcurrentHashMap
         * methods.) These mutative methods use a form of controlled
         * spinning on contention via methods scanAndLock and
         * scanAndLockForPut. These intersperse tryLocks with
         * traversals to locate nodes.  The main benefit is to absorb
         * cache misses (which are very common for hash tables) while
         * obtaining locks so that traversal is faster once
         * acquired. We do not actually use the found nodes since they
         * must be re-acquired under lock anyway to ensure sequential
         * consistency of updates (and in any case may be undetectably
         * stale), but they will normally be much faster to re-locate.
         * Also, scanAndLockForPut speculatively creates a fresh node
         * to use in put if no node is found.
         */

        private static final long serialVersionUID = 2249069246763182397L;

        /**
         * The maximum number of times to tryLock in a prescan before
         * possibly blocking on acquire in preparation for a locked
         * segment operation. On multiprocessors, using a bounded
         * number of retries maintains cache acquired while locating
         * nodes.
         */
        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

        /**
         * The per-segment table. Elements are accessed via
         * entryAt/setEntryAt providing volatile semantics.
         */
        transient volatile HashEntry<K,V>[] table;

        /**
         * The number of elements. Accessed only either within locks
         * or among other volatile reads that maintain visibility.
         */
        transient int count;

        /**
         * The total number of mutative operations in this segment.
         * Even though this may overflows 32 bits, it provides
         * sufficient accuracy for stability checks in CHM isEmpty()
         * and size() methods.  Accessed only either within locks or
         * among other volatile reads that maintain visibility.
         */
        transient int modCount;

        /**
         * The table is rehashed when its size exceeds this threshold.
         * (The value of this field is always <tt>(int)(capacity *
         * loadFactor)</tt>.)
         */
        transient int threshold;

        /**
         * The load factor for the hash table.  Even though this value
         * is same for all segments, it is replicated to avoid needing
         * links to outer object.
         * @serial
         */
        final float loadFactor;

        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }

        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && first != null &&
                            tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

        /**
         * Doubles size of table and repacks entries, also adding the
         * given node to new table
         */
        @SuppressWarnings("unchecked")
        private void rehash(HashEntry<K,V> node) {
            /*
             * Reclassify nodes in each list to new table.  Because we
             * are using power-of-two expansion, the elements from
             * each bin must either stay at same index, or move with a
             * power of two offset. We eliminate unnecessary node
             * creation by catching cases where old nodes can be
             * reused because their next fields won't change.
             * Statistically, at the default threshold, only about
             * one-sixth of them need cloning when a table
             * doubles. The nodes they replace will be garbage
             * collectable as soon as they are no longer referenced by
             * any reader thread that may be in the midst of
             * concurrently traversing table. Entry accesses use plain
             * array indexing because they are followed by volatile
             * table write.
             */
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }

        /**
         * Scans for a node containing given key while trying to
         * acquire lock, creating and returning one if not found. Upon
         * return, guarantees that lock is held. UNlike in most
         * methods, calls to method equals are not screened: Since
         * traversal speed doesn't matter, we might as well help warm
         * up the associated code and accesses as well.
         *
         * @return a new node if key not found, else null
         */
        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

        /**
         * Scans for a node containing the given key while trying to
         * acquire lock for a remove or replace operation. Upon
         * return, guarantees that lock is held.  Note that we must
         * lock even if the key is not found, to ensure sequential
         * consistency of updates.
         */
        private void scanAndLock(Object key, int hash) {
            // similar to but simpler than scanAndLockForPut
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            int retries = -1;
            while (!tryLock()) {
                HashEntry<K,V> f;
                if (retries < 0) {
                    if (e == null || key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f;
                    retries = -1;
                }
            }
        }

        /**
         * Remove; match on key only if value null, else match both.
         */
        final V remove(Object key, int hash, Object value) {
            if (!tryLock())
                scanAndLock(key, hash);
            V oldValue = null;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> e = entryAt(tab, index);
                HashEntry<K,V> pred = null;
                while (e != null) {
                    K k;
                    HashEntry<K,V> next = e.next;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        V v = e.value;
                        if (value == null || value == v || value.equals(v)) {
                            if (pred == null)
                                setEntryAt(tab, index, next);
                            else
                                pred.setNext(next);
                            ++modCount;
                            --count;
                            oldValue = v;
                        }
                        break;
                    }
                    pred = e;
                    e = next;
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

        final boolean replace(K key, int hash, V oldValue, V newValue) {
            if (!tryLock())
                scanAndLock(key, hash);
            boolean replaced = false;
            try {
                HashEntry<K,V> e;
                for (e = entryForHash(this, hash); e != null; e = e.next) {
                    K k;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        if (oldValue.equals(e.value)) {
                            e.value = newValue;
                            ++modCount;
                            replaced = true;
                        }
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return replaced;
        }

        final V replace(K key, int hash, V value) {
            if (!tryLock())
                scanAndLock(key, hash);
            V oldValue = null;
            try {
                HashEntry<K,V> e;
                for (e = entryForHash(this, hash); e != null; e = e.next) {
                    K k;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        e.value = value;
                        ++modCount;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

        final void clear() {
            lock();
            try {
                HashEntry<K,V>[] tab = table;
                for (int i = 0; i < tab.length ; i++)
                    setEntryAt(tab, i, null);
                ++modCount;
                count = 0;
            } finally {
                unlock();
            }
        }
    }

    // Accessing segments

    /**
     * Gets the jth element of given segment array (if nonnull) with
     * volatile element access semantics via Unsafe.
     */
    @SuppressWarnings("unchecked")
    static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) {
        long u = (j << SSHIFT) + SBASE;
        return ss == null ? null :
            (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u);
    }

    /**
     * Returns the segment for the given index, creating it and
     * recording in segment table (via CAS) if not already present.
     *
     * @param k the index
     * @return the segment
     */
    @SuppressWarnings("unchecked")
    private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

    // Hash-based segment and entry accesses

    /**
     * Get the segment for the given hash
     */
    @SuppressWarnings("unchecked")
    private Segment<K,V> segmentForHash(int h) {
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
    }

    /**
     * Gets the table entry for the given segment and hash
     */
    @SuppressWarnings("unchecked")
    static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) {
        HashEntry<K,V>[] tab;
        return (seg == null || (tab = seg.table) == null) ? null :
            (HashEntry<K,V>) UNSAFE.getObjectVolatile
            (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
    }

    /* ---------------- Public operations -------------- */

    /**
     * Creates a new, empty map with the specified initial
     * capacity, load factor and concurrency level.
     *
     * @param initialCapacity the initial capacity. The implementation
     * performs internal sizing to accommodate this many elements.
     * @param loadFactor  the load factor threshold, used to control resizing.
     * Resizing may be performed when the average number of elements per
     * bin exceeds this threshold.
     * @param concurrencyLevel the estimated number of concurre