java 並發(concurrent)包源碼分析
參考連接:
http://www.cnblogs.com/luoxn28/p/6059881.html
http://www.cnblogs.com/java-zhao/p/5140158.html
持續更新中。。。。。
並發是一種能並行運行多個程序或並行運行一個程序中多個部分的能力。如果程序中一個耗時的任務能以異步或並行的方式運行,那麽整個程序的吞吐量和可交互性將大大改善。現代的PC都有多個CPU或一個CPU中有多個核,是否能合理運用多核的能力將成為一個大規模應用程序的關鍵。
Java多線程相關類的實現都在Java的並發包concurrent,concurrent包主要包含3部分內容,第一個是atomic包,裏面主要是一些原子類,比如AtomicInteger、AtomicIntegerArray等;第二個是locks包,裏面主要是鎖相關的類,比如ReentrantLock、Condition等;第三個就是屬於concurrent包的內容,主要包括線程池相關類(Executors)、阻塞集合類(BlockingQueue)、並發Map類(ConcurrentHashMap)等。
/*************************************************************************************************************************************************************/
atomic包
/*************************************************************************************************************************************************************/
JDK1.5中引入了底層的支持,在int、long和對象的引用等類型上都公開了CAS的操作,並且JVM把它們編譯為底層硬件提供的最有效的方法,在運行CAS的平臺上,運行時把它們編譯為相應的機器指令。在java.util.concurrent.atomic包下面的所有的原子變量類型中,比如AtomicInteger,都使用了這些底層的JVM支持為數字類型的引用類型提供一種高效的CAS操作。
Unsafe中的操作一般都是基於CAS來實現的,CAS就是Compare and Swap的意思,比較並操作。很多的cpu直接支持CAS指令。CAS是一項樂觀鎖技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知這次競爭中失敗,並可以再次嘗試。CAS有3個操作數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改為B,否則什麽都不做。
下面就以AtomicInteger為例。
在沒有AtomicInteger之前,對於一個Integer的線程安全操作,是需要使用同步鎖來實現的,當然現在也可以通過ReentrantLock來實現,但是最好最方便的實現方式是采用AtomicInteger。
測試代碼:
1 package com.collection.test; 2 3 import java.util.concurrent.atomic.AtomicInteger; 4 5 /** 6 * 原子類的測試 7 */ 8 public class AtomicTest { 9 private static AtomicInteger atomicInteger = new AtomicInteger(); 10 11 //獲取當前值 12 public static void getCurrentValue(){ 13 System.out.println(atomicInteger.get());//-->0 14 } 15 16 //設置value值 17 public static void setValue(){ 18 atomicInteger.set(12);//直接用12覆蓋舊值 19 System.out.println(atomicInteger.get());//-->12 20 } 21 22 //根據方法名稱getAndSet就知道先get,則最後返回的就是舊值,如果get在後,就是返回新值 23 public static void getAndSet(){ 24 System.out.println(atomicInteger.getAndSet(15));//-->12 25 } 26 27 public static void getAndIncrement(){ 28 System.out.println(atomicInteger.getAndIncrement());//-->15 29 } 30 31 public static void getAndDecrement(){ 32 System.out.println(atomicInteger.getAndDecrement());//-->16 33 } 34 35 public static void getAndAdd(){ 36 System.out.println(atomicInteger.getAndAdd(10));//-->15 37 } 38 39 public static void incrementAndGet(){ 40 System.out.println(atomicInteger.incrementAndGet());//-->26 41 } 42 43 public static void decrementAndGet(){ 44 System.out.println(atomicInteger.decrementAndGet());//-->25 45 } 46 47 public static void addAndGet(){ 48 System.out.println(atomicInteger.addAndGet(20));//-->45 49 } 50 51 public static void main(String[] args) { 52 AtomicTest test = new AtomicTest(); 53 test.getCurrentValue(); 54 test.setValue(); 55 //返回舊值系列 56 test.getAndSet(); 57 test.getAndIncrement(); 58 test.getAndDecrement(); 59 test.getAndAdd(); 60 //返回新值系列 61 test.incrementAndGet(); 62 test.decrementAndGet(); 63 test.addAndGet(); 64 65 } 66 }
AtomicInteger類的源代碼
1 private volatile int value;// 初始化值 2 3 /** 4 * 創建一個AtomicInteger,初始值value為initialValue 5 */ 6 public AtomicInteger(int initialValue) { 7 value = initialValue; 8 } 9 10 /** 11 * 創建一個AtomicInteger,初始值value為0 12 */ 13 public AtomicInteger() { 14 } 15 16 /** 17 * 返回value 18 */ 19 public final int get() { 20 return value; 21 } 22 23 /** 24 * 為value設值(基於value),而其他操作是基於舊值<--get() 25 */ 26 public final void set(int newValue) { 27 value = newValue; 28 } 29 30 public final boolean compareAndSet(int expect, int update) { 31 return unsafe.compareAndSwapInt(this, valueOffset, expect, update); 32 } 33 34 /** 35 * 基於CAS為舊值設定新值,采用無限循環,直到設置成功為止 36 * 37 * @return 返回舊值 38 */ 39 public final int getAndSet(int newValue) { 40 for (;;) { 41 int current = get();// 獲取當前值(舊值) 42 if (compareAndSet(current, newValue))// CAS新值替代舊值 43 return current;// 返回舊值 44 } 45 } 46 47 /** 48 * 當前值+1,采用無限循環,直到+1成功為止 49 * @return the previous value 返回舊值 50 */ 51 public final int getAndIncrement() { 52 for (;;) { 53 int current = get();//獲取當前值 54 int next = current + 1;//當前值+1 55 if (compareAndSet(current, next))//基於CAS賦值 56 return current; 57 } 58 } 59 60 /** 61 * 當前值-1,采用無限循環,直到-1成功為止 62 * @return the previous value 返回舊值 63 */ 64 public final int getAndDecrement() { 65 for (;;) { 66 int current = get(); 67 int next = current - 1; 68 if (compareAndSet(current, next)) 69 return current; 70 } 71 } 72 73 /** 74 * 當前值+delta,采用無限循環,直到+delta成功為止 75 * @return the previous value 返回舊值 76 */ 77 public final int getAndAdd(int delta) { 78 for (;;) { 79 int current = get(); 80 int next = current + delta; 81 if (compareAndSet(current, next)) 82 return current; 83 } 84 } 85 86 /** 87 * 當前值+1, 采用無限循環,直到+1成功為止 88 * @return the updated value 返回新值 89 */ 90 public final int incrementAndGet() { 91 for (;;) { 92 int current = get(); 93 int next = current + 1; 94 if (compareAndSet(current, next)) 95 return next;//返回新值 96 } 97 } 98 99 /** 100 * 當前值-1, 采用無限循環,直到-1成功為止 101 * @return the updated value 返回新值 102 */ 103 public final int decrementAndGet() { 104 for (;;) { 105 int current = get(); 106 int next = current - 1; 107 if (compareAndSet(current, next)) 108 return next;//返回新值 109 } 110 } 111 112 /** 113 * 當前值+delta,采用無限循環,直到+delta成功為止 114 * @return the updated value 返回新值 115 */ 116 public final int addAndGet(int delta) { 117 for (;;) { 118 int current = get(); 119 int next = current + delta; 120 if (compareAndSet(current, next)) 121 return next;//返回新值 122 } 123 } 124 125 /** 126 * 獲取當前值 127 */ 128 public int intValue() { 129 return get(); 130 }
註意:
- value是volatile的,關於volatile的相關內容見:http://www.cnblogs.com/java-zhao/p/5125698.html
- 單步操作:例如set()是直接對value進行操作的,不需要CAS,因為單步操作就是原子操作。
- 多步操作:例如getAndSet(int newValue)是兩步操作-->先獲取值,在設置值,所以需要原子化,這裏采用CAS實現。
- 對於方法是返回舊值還是新值,直接看方法是以get開頭(返回舊值)還是get結尾(返回新值)就好
- CAS:比較CPU內存上的值是不是當前值current,如果是就換成新值update,如果不是,說明獲取值之後到設置值之前,該值已經被別人先一步設置過了,此時如果自己再設置值的話,需要在別人修改後的值的基礎上去操作,否則就會覆蓋別人的修改,所以這個時候會直接返回false,再進行無限循環,重新獲取當前值,然後再基於CAS進行加減操作。
- 如果還是不懂CAS,類比數據庫的樂觀鎖。
1 // setup to use Unsafe.compareAndSwapInt for updates 2 private static final Unsafe unsafe = Unsafe.getUnsafe(); 3 private static final long valueOffset; 4 5 static { 6 try { 7 valueOffset = unsafe.objectFieldOffset 8 (AtomicInteger.class.getDeclaredField("value")); 9 } catch (Exception ex) { throw new Error(ex); } 10 } 11 12 private volatile int value;
這是AtomicInteger的所有屬性,其中value存的是當前值,而當前值存放的內存地址可以通過valueOffset來確定。實際上是“value字段相對Java對象的起始地址的偏移量”
1 public final boolean compareAndSet(int expect, int update) { 2 return unsafe.compareAndSwapInt(this, valueOffset, expect, update); 3 }
CAS方法:通過對比“valueOffset上的value”與expect是否相同,來決定是否修改value值為update值。
/*************************************************************************************************************************************************************/
lock包
/*************************************************************************************************************************************************************/
/*************************************************************************************************************************************************************/
concurrent源生包
/*************************************************************************************************************************************************************/
java 並發(concurrent)包源碼分析