1. 程式人生 > >【Java併發程式設計】深入分析ConcurrentHashMap(九)

【Java併發程式設計】深入分析ConcurrentHashMap(九)

 本章是提高教程可能對於剛入門同學來說會有些難度,讀懂本章你需要了解以下知識點:

一、Concurrent原始碼分析

ConcurrentHashMap是由Segment(桶)、HashEntry(節點)2大資料結構組成。如下圖所示:

   1.1 Segment類和屬性

//Segment內部維護了一個連結串列陣列
static final class Segment<K,V> extends ReentrantLock implements Serializable {

	//連結串列陣列,陣列中的每一個元素代表了一個連結串列的頭部
	transient volatile HashEntry<K,V>[] table;

	//Segment中元素的數量
	transient int count;

	//對table的大小造成影響的操作的數量(比如put或者remove操作)
	transient int modCount;

	//閾值,Segment裡面元素的數量超過這個值會對Segment進行擴容,擴容後大小=old*2*負載因子
	transient int threshold;

	//負載因子,用於確定threshold
	final float loadFactor;
}

  Segment繼承了ReentrantLock,這意味著每個segment都可以當做一個鎖,每一把鎖只鎖住整個容器中的部分資料,這樣不影響執行緒訪問其它資料,當然如果是對全域性改變時會鎖定所有的segment段。比如:size()和containsValue(),注意的是要按順序鎖定所有段,操作完畢後,再按順序釋放所有段的鎖。如果不按順序的話,有可能會出現死鎖。

   1.2 HashEntry類和屬性

//HashEntry是一個單向連結串列  
static final class HashEntry<K,V> {
    //雜湊值  
    final int hash;
    //儲存的key和值value  
    final K key;
    volatile V value;
    //指向的下一個HashEntry,即連結串列的下一個節點  
    volatile HashEntry<K,V> next;
}

  類似與HashMap節點Entry,HashEntry也是一個單向連結串列,它包含了key、hash、value和下一個節點資訊。HashEntry和Entry的不同點:
 不同點一:使用了多個final關鍵字(final class 、final hash) ,這意味著不能從連結串列的中間或尾部新增或刪除節點,後面刪除操作時會講到。
 不同點二:使用volatile,是為了更新值後能立即對其它執行緒可見。這裡沒有使用鎖,效率更高。

   1.3 類的初始化

/**
 * 
 * @param initialCapacity  初始容量
 * @param loadFactor	負載因子
 * @param concurrencyLevel 代表ConcurrentHashMap內部的Segment的數量,
 * ConcurrentLevel 併發級別
 */
public ConcurrentHashMap(int initialCapacity,
		float loadFactor, int concurrencyLevel) {
	if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
		throw new IllegalArgumentException();
	if (concurrencyLevel > MAX_SEGMENTS)
		concurrencyLevel = MAX_SEGMENTS;
	// Find power-of-two sizes best matching arguments
	int sshift = 0;
	int ssize = 1;
	while (ssize < concurrencyLevel) {
		++sshift;
		ssize <<= 1;//ssize左移一位也就是每次ssize=2*ssize。
	}
	//主要使用於put()和segmentForHash()方法,結合hash計算出元素在哪一個Segment中。
        //假如concurrencyLevel是16,那麼sshift=4、segmentShift=28、segmentMask=15;
	this.segmentShift = 32 - sshift;
	this.segmentMask = ssize - 1;
	if (initialCapacity > MAXIMUM_CAPACITY)
		initialCapacity = MAXIMUM_CAPACITY;
	int c = initialCapacity / ssize;
	if (c * ssize < initialCapacity)
		++c;
	int cap = MIN_SEGMENT_TABLE_CAPACITY;
	while (cap < c)
		cap <<= 1;
	// create segments and segments[0]
	Segment<K,V> s0 =
			new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
					(HashEntry<K,V>[])new HashEntry[cap]);
	Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
	UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
	this.segments = ss;
}

  ConcurrencyLevel預設情況下內部按併發級別為16來建立。對於每個segment的容量,預設情況也是16。其中concurrentLevel和segment的初始容量都是可以通過建構函式設定的。要注意的是ConcurrencyLevel一經指定,不可改變,後續如果ConcurrentHashMap的元素數量增加導致ConrruentHashMap需要擴容,ConcurrentHashMap不會增加Segment的數量,而只會增加Segment中連結串列陣列的容量大小,這樣的好處是擴容過程不需要對整個ConcurrentHashMap做rehash,而只需要對Segment裡面的元素做一次rehash就可以了。

   1.4 ensureSegment()方法

 該方法返回給定索引位置的Segment,如果Segment不存在,則參考Segment表中的第一個Segment的引數建立一個Segment並通過CAS操作將它記錄到Segment表中去。
  private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

   1.5 entryAt()方法

 entryAt()方法是從連結串列中查詢節點。在方法引數裡注意到有傳入tab連結串列陣列和index索引,那為什麼還要呼叫entryAt()方法獲取陣列項的值而不是通過tab[index]方式直接獲取?那我們從源頭(put)開始分析,見1.6put()操作。
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
        return (tab == null) ? null :
            (HashEntry<K,V>) UNSAFE.getObjectVolatile
            (tab, ((long)i << TSHIFT) + TBASE);
   }

   1.6 put()操作

   1.6.1 鎖分離技術

  大家知道HashTable是使用了synchronized來保證執行緒安全,但是其效率非常差。它效率非常差的原因是多個執行緒訪問HashTable時需要競爭同一把鎖,如果我們有多把鎖,每一把鎖只鎖住一部分資料,那麼多執行緒在訪問不同的資料時也就不會存在競爭,能提高訪問效率。這種做法我們稱為鎖分離技術。在《Java併發程式設計實戰》一書中作者提到過分拆鎖和分離鎖技術:
 分拆鎖(lock spliting)就是若原先的程式中多處邏輯都採用同一個鎖,但各個邏輯之間又相互獨立,就可以拆(Spliting)為使用多個鎖,每個鎖守護不同的邏輯。
 分拆鎖有時候可以被擴充套件,分成可大可小加鎖塊的集合,並且它們歸屬於相互獨立的物件,這樣的情況就是分離鎖(lock striping)。
而ConcurrentHashMap就是使用了分離鎖技術,對每個Segment配置一把鎖,如下圖所示:

   1.6.2 原始碼分析

  Segment的put操作原理如下圖所示,圖中展示的不是很詳細,其中關於加鎖的步驟沒有加上去,原因是加了幾次覺得加鎖後看著很複雜。用圖片展示是為了更加簡單和明瞭,如果看著複雜也就沒有意義了,我儘量用文字說清楚它的步驟。

步驟一:進入Segment的put操作時先進行加鎖保護。如果加鎖沒有成功,呼叫scanAndLockForPut方法(詳細步驟見下面scanAndLockForPut()原始碼分析)進入自旋狀態,該方法持續查詢key對應的節點鏈中是已存在該機節點,如果沒有找到,則預建立一個新節點,並且嘗試n次,直到嘗試次數操作限制,才真正進入加鎖等待狀態,自旋結束並返回節點(如果返回了一個非空節點,則表示在連結串列中沒有找到相應的節點)。對最大嘗試次數,目前的實現單核次數為1,多核為64。

步驟二:使用(tab.length - 1) & hash計算第一個節點位置,再通過entryAt()方法去查詢第一個節點。如果節點存在,遍歷連結串列找到key值所在的節點,如果找到了這個節點則直接更新舊value,結束迴圈。其中value使用了volatile,它更新後的值立馬對其它執行緒可見。如果節點不存在,將步驟一預建立的新節點(如果沒有則重新建立)新增到連結串列中,新增前先檢查新增後節點數量是否超過容器大小,如果超過了,則rehash操作。沒有的話呼叫
setNext或setEntryAt方法新增新節點;
要注意的是在更新連結串列時使用了Unsafe.putOrderedObject()方法,這個方法能夠實現非堵塞的寫入,這些寫入不會被Java的JIT重新排序指令(instruction reordering),使得它能更加快速的儲存。

解決1.5問題:為什麼還要呼叫entryAt()方法獲取陣列項的值而不是通過tab[index]方式直接獲取?
  雖然在開始時volatile table將引用賦值給了變數tab,但是多執行緒下table裡的值可能發生改變,使用tab[index]並不能獲得最新的值。。為了保證接下來的put操作能夠讀取到上一次的更新結果,需要使用volatile的語法去讀取節點鏈的鏈頭.

public V put(K key, V value) {
	Segment<K,V> s;
	if (value == null)
		throw new NullPointerException();
	int hash = hash(key);
	//計算Segment的位置,在初始化的時候對segmentShift和segmentMask做了解釋
	int j = (hash >>> segmentShift) & segmentMask;
	//從Segment陣列中獲取segment元素的位置
	if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
			(segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
		s = ensureSegment(j);
	//
	return s.put(key, hash, value, false);
}

//往Segment的HashEntry中新增元素,使用了分鎖機制
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
	//tryLock 僅在呼叫時鎖為空閒狀態才獲取該鎖。如果鎖可用,則獲取鎖,並立即返回值 true。否則是false
	//scanAndLockForPut 下面單獨說scanAndLockForPut
	HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
	V oldValue;
	try {
		HashEntry<K,V>[] tab = table;
		int index = (tab.length - 1) & hash;
		HashEntry<K,V> first = entryAt(tab, index);
		for (HashEntry<K,V> e = first;;) {
			if (e != null) {
				K k;
				if ((k = e.key) == key ||
						(e.hash == hash && key.equals(k))) {
					oldValue = e.value;
					if (!onlyIfAbsent) {
						e.value = value;
						++modCount;
					}
					break;
				}
				e = e.next;
			}
			else {
				if (node != null)
					node.setNext(first);
				else
					node = new HashEntry<K,V>(hash, key, value, first);
				int c = count + 1;
				if (c > threshold && tab.length < MAXIMUM_CAPACITY)
					rehash(node);
				else
					setEntryAt(tab, index, node);
				++modCount;
				count = c;
				oldValue = null;
				break;
			}
		}
	} finally {
		unlock();
	}
	return oldValue;
}

   1.5 segmentForHash()方法

/**
 * 查詢Segment物件,這裡Unsafe的主要作用是提供原子操作。
 */
@SuppressWarnings("unchecked")
private Segment<K,V> segmentForHash(int h) {
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
}

   1.6 scanAndLockForPut()方法

  在下面程式碼中,它先獲取key對應的頭節點,進入連結串列迴圈。如果連結串列中不存在要插入的節點,則預建立一個新節點,否則retries值遞增,直到操作最大嘗試次數而進入等待狀態。這個方法要注意的是:當在自旋過程中發現連結串列鏈頭髮生了變化,則更新節點鏈的鏈頭,並重置retries值為-1,重新為嘗試獲取鎖而自旋遍歷。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
	//第一步:先找到HashEntry連結串列中的頭節點
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        //第一次一定執行該條件內程式碼
        if (retries < 0) {
        	//第二步:分三種情景
        	//情景一:沒有找到,建立一個新的節點。
            if (e == null) {
                if (node == null) // speculatively create node
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            //情景二:找到相同key的節點
            else if (key.equals(e.key))
                retries = 0;
            else
            //情景三:沒找到key值對應的節點,指向下一個節點繼續
                e = e.next;
        }
        //嘗試次數達到限制進入加鎖等待狀態。 對最大嘗試次數,目前的實現單核次數為1,多核為64:
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        //retries是偶數並且不是頭節點。在自旋中鏈頭可能會發生變化
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

   1.7 get()操作

 public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }
  從程式碼可以看出get方法並沒有呼叫鎖,它使用了volatile的可見性來實現執行緒安全的。參考資料