解讀Java8中ConcurrentHashMap是如何保證執行緒安全的
HashMap是工作中使用頻度非常高的一個K-V儲存容器。在多執行緒環境下,使用HashMap是不安全的,可能產生各種非期望的結果。
關於HashMap執行緒安全問題,可參考筆者的另一篇文章: 深入解讀HashMap執行緒安全性問題
針對HashMap在多執行緒環境下不安全這個問題,HashMap的作者認為這並不是bug,而是應該使用執行緒安全的HashMap。
目前有如下一些方式可以獲得執行緒安全的HashMap:
- Collections.synchronizedMap
- HashTable
- ConcurrentHashMap
其中,前兩種方式由於全域性鎖的問題,存在很嚴重的效能問題。所以,著名的併發程式設計大師Doug Lea在JDK1.5的java.util.concurrent包下面添加了一大堆併發工具。其中就包含ConcurrentHashMap這個執行緒安全的HashMap。
本文就來簡單介紹一下ConcurrentHashMap的實現原理。
PS:基於JDK8
0 ConcurrentHashMap在JDK7中的回顧
ConcurrentHashMap在JDK7和JDK8中的實現方式上有較大的不同。首先我們先來大概回顧一下ConcurrentHashMap在JDK7中的原理是怎樣的。
0.1 分段鎖技術
針對HashTable會鎖整個hash表的問題,ConcurrentHashMap提出了分段鎖的解決方案。
分段鎖的思想就是:鎖的時候不鎖整個hash表,而是隻鎖一部分。
如何實現呢?這就用到了ConcurrentHashMap中最關鍵的Segment。
ConcurrentHashMap中維護著一個Segment陣列,每個Segment可以看做是一個HashMap。
而Segment本身繼承了ReentrantLock,它本身就是一個鎖。
在Segment中通過HashEntry陣列來維護其內部的hash表。
每個HashEntry就代表了map中的一個K-V,用HashEntry可以組成一個連結串列結構,通過next欄位引用到其下一個元素。
上述內容在原始碼中的表示如下:
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { // ... 省略 ... /** * The segments, each of which is a specialized hash table. */ final Segment<K,V>[] segments; // ... 省略 ... /** * Segment是ConcurrentHashMap的靜態內部類 * * Segments are specialized versions of hash tables.This * subclasses from ReentrantLock opportunistically, just to * simplify some locking and avoid separate construction. */ static final class Segment<K,V> extends ReentrantLock implements Serializable { // ... 省略 ... /** * The per-segment table. Elements are accessed via * entryAt/setEntryAt providing volatile semantics. */ transient volatile HashEntry<K,V>[] table; // ... 省略 ... } // ... 省略 ... /** * ConcurrentHashMap list entry. Note that this is never exported * out as a user-visible Map.Entry. */ static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; // ... 省略 ... } } 複製程式碼
所以,JDK7中,ConcurrentHashMap的整體結構可以描述為下圖這樣子。

由上圖可見,只要我們的hash值足夠分散,那麼每次put的時候就會put到不同的segment中去。 而segment自己本身就是一個鎖,put的時候,當前segment會將自己鎖住,此時其他執行緒無法操作這個segment, 但不會影響到其他segment的操作。這個就是鎖分段帶來的好處。
0.2 執行緒安全的put
ConcurrentHashMap的put方法原始碼如下:
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; // 根據key的hash定位出一個segment,如果指定index的segment還沒初始化,則呼叫ensureSegment方法初始化 if ((s = (Segment<K,V>)UNSAFE.getObject// nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) //in ensureSegment s = ensureSegment(j); // 呼叫segment的put方法 return s.put(key, hash, value, false); } 複製程式碼
最終會呼叫segment的put方法,將元素put到HashEntry陣列中,這裡的註釋中只給出鎖相關的說明
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 因為segment本身就是一個鎖 // 這裡呼叫tryLock嘗試獲取鎖 // 如果獲取成功,那麼其他執行緒都無法再修改這個segment // 如果獲取失敗,會呼叫scanAndLockForPut方法根據key和hash嘗試找到這個node,如果不存在,則建立一個node並返回,如果存在則返回null // 檢視scanAndLockForPut原始碼會發現他在查詢的過程中會嘗試獲取鎖,在多核CPU環境下,會嘗試64次tryLock(),如果64次還沒獲取到,會直接呼叫lock() // 也就是說這一步一定會獲取到鎖 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) // 擴容 rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // 釋放鎖 unlock(); } return oldValue; } 複製程式碼
0.3 執行緒安全的擴容(Rehash)
HashMap的執行緒安全問題大部分出在擴容(rehash)的過程中。
ConcurrentHashMap的擴容只針對每個segment中的HashEntry陣列進行擴容。
由上述put的原始碼可知,ConcurrentHashMap在rehash的時候是有鎖的,所以在rehash的過程中,其他執行緒無法對segment的hash表做操作,這就保證了執行緒安全。
1 JDK8中ConcurrentHashMap的初始化
以無引數建構函式為例,來看一下ConcurrentHashMap類初始化的時候會做些什麼。
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); 複製程式碼
首先會執行靜態程式碼塊和初始化類變數。 主要會初始化以下這些類變數:
// Unsafe mechanics private static final sun.misc.Unsafe U; private static final long SIZECTL; private static final long TRANSFERINDEX; private static final long BASECOUNT; private static final long CELLSBUSY; private static final long CELLVALUE; private static final long ABASE; private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } } 複製程式碼
這裡用到了Unsafe類,其中objectFieldOffset方法用於獲取指定Field(例如sizeCtl)在記憶體中的偏移量。
獲取的這個偏移量主要用於幹啥呢?不著急,在下文的分析中,遇到的時候再研究就好。
PS:關於Unsafe的介紹和使用,可以檢視筆者的另一篇文章Unsafe類的介紹和使用
2 內部資料結構
先來從原始碼角度看一下JDK8中是怎麼定義的儲存結構。
/** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. * * hash表,在第一次put資料的時候才初始化,他的大小總是2的倍數。 */ transient volatile Node<K,V>[] table; /** * 用來儲存一個鍵值對 * * Key-value entry.This class is never exported out as a * user-mutable Map.Entry (i.e., one supporting setValue; see * MapEntry below), but can be used for read-only traversals used * in bulk tasks.Subclasses of Node with a negative hash field * are special, and contain null keys and values (but are never * exported).Otherwise, keys and vals are never null. */ static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; } 複製程式碼
可以發現,JDK8與JDK7的實現由較大的不同,JDK8中不在使用Segment的概念,他更像HashMap的實現方式。
PS:關於HashMap的原理,可以參考筆者的另一篇文章 HashMap原理及內部儲存結構
這個結構可以通過下圖描述出來

3 執行緒安全的hash表初始化
由上文可知ConcurrentHashMap是用table這個成員變數來持有hash表的。
table的初始化採用了延遲初始化策略,他會在第一次執行put的時候初始化table。
put方法原始碼如下(省略了暫時不相關的程式碼):
/** * Maps the specified key to the specified value in this table. * Neither the key nor the value can be null. * * <p>The value can be retrieved by calling the {@code get} method * with a key that is equal to the original key. * * @param key key with which the specified value is to be associated * @param value value to be associated with the specified key * @return the previous value associated with {@code key}, or *{@code null} if there was no mapping for {@code key} * @throws NullPointerException if the specified key or value is null */ public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 計算key的hash值 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 如果table是空,初始化之 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 省略... } // 省略... } 複製程式碼
initTable原始碼如下
/** * Initializes table, using the size recorded in sizeCtl. */ private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; // #1 while ((tab = table) == null || tab.length == 0) { // sizeCtl的預設值是0,所以最先走到這的執行緒會進入到下面的else if判斷中 // #2 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin // 嘗試原子性的將指定物件(this)的記憶體偏移量為SIZECTL的int變數值從sc更新為-1 // 也就是將成員變數sizeCtl的值改為-1 // #3 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 雙重檢查,原因會在下文分析 // #4 if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 預設初始容量為16 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // #5 table = tab = nt; // 建立hash表,並賦值給成員變數table sc = n - (n >>> 2); } } finally { // #6 sizeCtl = sc; } break; } } return tab; } 複製程式碼
成員變數sizeCtl在ConcurrentHashMap中的其中一個作用相當於HashMap中的threshold,當hash表中元素個數超過sizeCtl時,觸發擴容; 他的另一個作用類似於一個標識,例如,當他等於-1的時候,說明已經有某一執行緒在執行hash表的初始化了,一個小於-1的值表示某一執行緒正在對hash表執行resize。
這個方法首先判斷sizeCtl是否小於0,如果小於0,直接將當前執行緒變為就緒狀態的執行緒。
當sizeCtl大於等於0時,當前執行緒會嘗試通過CAS的方式將sizeCtl的值修改為-1。修改失敗的執行緒會進入下一輪迴圈,判斷sizeCtl<0了,被yield住;修改成功的執行緒會繼續執行下面的初始化程式碼。
在new Node[]之前,要再檢查一遍table是否為空,這裡做雙重檢查的原因在於,如果另一個執行緒執行完#1程式碼後掛起,此時另一個初始化的執行緒執行完了#6的程式碼,此時sizeCtl是一個大於0的值,那麼再切回這個執行緒執行的時候,是有可能重複初始化的。關於這個問題會在下圖的併發場景中說明。
然後初始化hash表,並重新計算sizeCtl的值,最終返回初始化好的hash表。
下圖詳細說明了幾種可能導致重複初始化hash表的併發場景,我們假設Thread2最終成功初始化hash表。
- Thread1模擬的是CAS更新sizeCtl變數的併發場景
- Thread2模擬的是table的雙重檢查的必要性

由上圖可以看出,在Thread1中如果不對sizeCtl的值更新做併發控制,Thread1是有可能走到new Node[]這一步的。 在Thread3中,如果不做雙重判斷,Thread3也會走到new Node[]這一步。
4 執行緒安全的put
put操作可分為以下兩類
- 當前hash表對應當前key的index上沒有元素時
- 當前hash表對應當前key的index上已經存在元素時(hash碰撞)
4.1 hash表上沒有元素時
對應原始碼如下
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break;// no lock when adding to empty bin } static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } 複製程式碼
tabAt方法通過Unsafe.getObjectVolatile()的方式獲取陣列對應index上的元素,getObjectVolatile作用於對應的記憶體偏移量上,是具備volatile記憶體語義的。
如果獲取的是空,嘗試用cas的方式在陣列的指定index上建立一個新的Node。
4.2 hash碰撞時
對應原始碼如下
else { V oldVal = null; // 鎖f是在4.1中通過tabAt方法獲取的 // 也就是說,當發生hash碰撞時,會以連結串列的頭結點作為鎖 synchronized (f) { // 這個檢查的原因在於: // tab引用的是成員變數table,table在發生了rehash之後,原來index上的Node可能會變 // 這裡就是為了確保在put的過程中,沒有收到rehash的影響,指定index上的Node仍然是f // 如果不是f,那這個鎖就沒有意義了 if (tabAt(tab, i) == f) { // 確保put沒有發生在擴容的過程中,fh=-1時表示正在擴容 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { // 在連結串列後面追加元素 pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 如果連結串列長度超過8個,將連結串列轉換為紅黑樹,與HashMap相同,相對於JDK7來說,優化了查詢效率 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } 複製程式碼
不同於JDK7中segment的概念,JDK8中直接用連結串列的頭節點做為鎖。 JDK7中,HashMap在多執行緒併發put的情況下可能會形成環形連結串列,ConcurrentHashMap通過這個鎖的方式,使同一時間只有有一個執行緒對某一連結串列執行put,解決了併發問題。
5 執行緒安全的擴容
put方法的最後一步是統計hash表中元素的個數,如果超過sizeCtl的值,觸發擴容。
擴容的程式碼略長,可大致看一下里面的中文註釋,再參考下面的分析。 其實我們主要的目的是弄明白ConcurrentHashMap是如何解決HashMap的併發問題的。 帶著這個問題來看原始碼就好。關於HashMap存在的問題,參考本文一開始說的筆者的另一篇文章即可。
其實HashMap的併發問題多半是由於put和擴容併發導致的。
這裡我們就來看一下ConcurrentHashMap是如何解決的。
擴容涉及的程式碼如下:
/** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. * 業務中使用的hash表 */ transient volatile Node<K,V>[] table; /** * The next table to use; non-null only while resizing. * 擴容時才使用的hash表,擴容完成後賦值給table,並將nextTable重置為null。 */ private transient volatile Node<K,V>[] nextTable; /** * Adds to count, and if table is too small and not already * resizing, initiates transfer. If already resizing, helps * perform transfer if work is available.Rechecks occupancy * after a transfer to see if another resize is already needed * because resizings are lagging additions. * * @param x the count to add * @param check if <0, don't check resize, if <= 1 only check if uncontended */ private final void addCount(long x, int check) { // ----- 計算鍵值對的個數 start ----- CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } // ----- 計算鍵值對的個數 end ----- // ----- 判斷是否需要擴容 start ----- if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 當上面計算出來的鍵值對個數超出sizeCtl時,觸發擴容,呼叫核心方法transfer while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 如果有已經在執行的擴容操作,nextTable是正在擴容中的新的hash表 // 如果併發擴容,transfer直接使用正在擴容的新hash表,保證了不會出現hash表覆蓋的情況 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 更新sizeCtl的值,更新成功後為負數,擴容開始 // 此時沒有併發擴容的情況,transfer中會new一個新的hash表來擴容 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } // ----- 判斷是否需要擴容 end ----- } /** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) {// initiating try { @SuppressWarnings("unchecked") // 初始化新的hash表,大小為之前的2倍,並賦值給成員變數nextTable Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) {// try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; // 擴容完成時,將成員變數nextTable置為null,並將table替換為rehash後的nextTable if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { // 接下來是遍歷每個連結串列,對每個連結串列的元素進行rehash // 仍然用頭結點作為鎖,所以在擴容的時候,無法對這個連結串列執行put操作 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // setTabAt方法呼叫了Unsafe.putObjectVolatile來完成hash表元素的替換,具備volatile記憶體語義 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } } 複製程式碼
根據上述程式碼,對ConcurrentHashMap是如何解決HashMap併發問題這一疑問進行簡要說明。
- 首先new一個新的hash表(nextTable)出來,大小是原來的2倍。後面的rehash都是針對這個新的hash表操作,不涉及原hash表(table)。
- 然後會對原hash表(table)中的每個連結串列進行rehash,此時會嘗試獲取頭節點的鎖。這一步就保證了在rehash的過程中不能對這個連結串列執行put操作。
- 通過sizeCtl控制,使擴容過程中不會new出多個新hash表來。
- 最後,將所有鍵值對重新rehash到新表(nextTable)中後,用nextTable將table替換。這就避免了HashMap中get和擴容併發時,可能get到null的問題。
- 在整個過程中,共享變數的儲存和讀取全部通過volatile或CAS的方式,保證了執行緒安全。
6 總結
多執行緒環境下,對共享變數的操作一定要小心。要充分從Java記憶體模型的角度考慮問題。
ConcurrentHashMap中大量的用到了Unsafe類的方法,我們自己雖然也能拿到Unsafe的例項,但在生產中不建議這麼做。 多數情況下,我們可以通過併發包中提供的工具來實現,例如Atomic包下面的可以用來實現CAS操作,lock包下可以用來實現鎖相關的操作。
善用執行緒安全的容器工具,例如ConcurrentHashMap、CopyOnWriteArrayList、ConcurrentLinkedQueue等,因為我們在工作中無法像ConcurrentHashMap這樣通過Unsafe的getObjectVolatile和setObjectVolatile原子性的更新陣列中的元素,所以這些併發工具是很重要的。
