1. 程式人生 > >聊聊高並發(三十二)實現一個基於鏈表的無鎖Set集合

聊聊高並發(三十二)實現一個基於鏈表的無鎖Set集合

target 方向 刪除 元素 min 集合 date 變量 find

Set表示一種沒有反復元素的集合類,在JDK裏面有HashSet的實現,底層是基於HashMap來實現的。這裏實現一個簡化版本號的Set,有下面約束:

1. 基於鏈表實現。鏈表節點依照對象的hashCode()順序由小到大從Head到Tail排列。

2. 如果對象的hashCode()是唯一的。這個如果實際上是不成立的,這裏為了簡化實現做這個如果。實際情況是HashCode是基於對象地址進行的一次Hash操作。目的是把對象依據Hash散開。所以可能有多個對象地址相應到一個HashCode。也就是哈希沖突,要做沖突的處理。

常見的處理就是使用外部拉鏈法,在相應的Hash的節點再創建1個鏈表,同樣HashCode的節點在這個鏈表上排列,再依據equals()方法來識別是否是同一個節點。這個是HashMap的基本原理。


有了以上如果,這篇從最簡單的加鎖的方式到終於的無鎖展示一下怎樣一步一步演進的過程,以及終於無鎖實現要考慮的要點。

1. 粗粒度鎖

2. 細粒度鎖

3. 樂觀鎖

4. 懶惰鎖

5. 無鎖


先看一下Set接口,1個加入方法。1個刪除方法,1個檢查方法

package com.lock.test;

public interface Set<T> {
	public boolean add(T item);
	
	public boolean remove(T item);
	
	public boolean contains(T item);
}

第一個實現的是使用粗粒度的鎖,就是對整個鏈表加鎖。這樣肯定是線程安全的。可是效率肯定是最差的

鏈表節點類的定義,三個元素

1. 增加節點的元素

2. next指針指向下一個節點,明顯這是個單鏈表結構

3. key表示item的HashCode

class Node<T>{
		T item;
		Node<T> next;
		int key;
		
		public Node(T item){
			this.item = item;
			this.key = item.hashCode();
		}
		
		public Node(){}
	}
粗粒度鏈表的實現

1. 鏈表維護了一個頭節點,頭節點始終指向最小的HashCode,頭節點初始的next指針指向最大的HashCode,表示尾節點,整個鏈表是依照HashCode從小往大排列

2. 鏈表維護了一個總體的鎖。add, remove, contains都加鎖,保證線程安全,簡單粗暴。可是效率低下

class CoarseSet<T> implements Set<T>{
		private final Node<T> head;
		private java.util.concurrent.locks.Lock lock = new ReentrantLock();
		
		public CoarseSet(){
			head = new Node<T>();
			head.key = Integer.MIN_VALUE;
			Node<T> MAX = new Node<T>();
			MAX.key = Integer.MAX_VALUE;
			head.next = MAX;
		}
		
		public boolean add(T item){
			Node<T> pred, curr;
			int key = item.hashCode();
			lock.lock();
			try{
				pred = head;
				curr = head.next;
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				if(curr.key == key){
					return false;
				}
				Node<T> node = new Node<T>(item);
				node.next = curr;
				pred.next = node;
				return true;
			}finally{
				lock.unlock();
			}
		}
		
		public boolean remove(T item){
			Node<T> pred, curr;
			int key = item.hashCode();
			lock.lock();
			try{
				pred = head;
				curr = head.next;
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				if(curr.key == key){
					pred.next = curr.next;
					curr.next = null;
					return true;
				}
				return false;
			}finally{
				lock.unlock();
			}
		}
		
		public boolean contains(T item){
			Node<T> pred, curr;
			int key = item.hashCode();
			lock.lock();
			try{
				pred = head;
				curr = head.next;
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				return curr.key == key;
			}finally{
				lock.unlock();
			}
		}
	}

對粗粒度鏈表的優化能夠細化鎖的粒度。粗粒度鎖的問題在於使用了一把鎖鎖住了整個鏈表,那麽能夠使用多把鎖,每一個節點維護一把鎖,這樣單個節點上鎖理論上不會影響其它節點。

單鏈表最簡單add操作須要做兩步

1. 把新增加節點的next指向當前節點

2. 把前驅節點的next指向新增加的節點

node.next = curr;
pred.next = node

單鏈表的刪除操作僅僅須要做一步

pred.next = curr.next

假設使用細粒度鎖的話。加入和刪除操作要同一時候鎖住兩個節點才幹保證加入和刪除的正確性。不然有可能加入進來的節點指向了一個已經被刪除的節點。

所以須要同一時候控制兩把鎖,這也叫鎖耦合,須要先獲取前驅節點的鎖,再獲取當前節點的鎖。

因為這樣的鎖的獲取是依照從前往後的順序獲取的,是一個方向的,所以不會引起死鎖的問題。

死鎖有兩種:

1. 由獲取鎖的順序引起的,順序形成了環

2. 由資源問題引起的

來看看細粒度鎖的實現,首先是帶鎖的Node

class NodeWithLock<T>{
		T item;
		NodeWithLock<T> next;
		int key;
		java.util.concurrent.locks.Lock lock = new ReentrantLock();
		
		public NodeWithLock(T item){
			this.item = item;
			this.key = item.hashCode();
		}
		
		public NodeWithLock(){}
		
		public void lock(){
			lock.lock();
		}
		
		public void unlock(){
			lock.unlock();
		}
	}

細粒度鏈表實現的Set

1. 獲取鎖時。先獲取pred鎖。然後獲取curr的鎖

2. 當pred和curr的指針往後繼節點移動時,要先釋放pred鎖,然後讓pred指向curr,然後curr再指向next節點,然後curr再上鎖,這樣保證了同一時候前後兩把鎖存在

class FineList<T> implements Set<T>{
		private final NodeWithLock<T> head;
		
		public FineList(){
			head = new NodeWithLock<T>();
			head.key = Integer.MIN_VALUE;
			NodeWithLock<T> MAX = new NodeWithLock<T>();
			MAX.key = Integer.MAX_VALUE;
			head.next = MAX;
		}
		
		public boolean add(T item){
			NodeWithLock<T> pred, curr;
			int key = item.hashCode();
			head.lock();
			pred = head;
			try{
				curr = pred.next;
				curr.lock();
				try{
					while(curr.key < key){
						pred.unlock();
						pred = curr;
						curr = curr.next;
						curr.lock();
					}
					if(curr.key == key){
						return false;
					}
					NodeWithLock<T> node = new NodeWithLock<T>(item);
					node.next = curr;
					pred.next = node;
					return true;
				}finally{
					curr.unlock();
				}
			}finally{
				pred.unlock();
			}
		}
		
		public boolean remove(T item){
			NodeWithLock<T> pred, curr;
			int key = item.hashCode();
			head.lock();
			pred = head;
			try{
				curr = pred.next;
				curr.lock();
				try{
					while(curr.key < key){
						pred.unlock();
						pred = curr;
						curr = curr.next;
						curr.lock();
					}
					if(curr.key == key){
						pred.next = curr.next;
						curr.next = null;
						return true;
					}
					return false;
				}finally{
					curr.unlock();
				}
			}finally{
				pred.unlock();
			}
		}
		
		public boolean contains(T item){
			NodeWithLock<T> pred, curr;
			int key = item.hashCode();
			head.lock();
			pred = head;
			try{
				curr = pred.next;
				curr.lock();
				try{
					while(curr.key < key){
						pred.unlock();
						pred = curr;
						curr = curr.next;
						curr.lock();
					}
					return curr.key == key;
				}finally{
					curr.unlock();
				}
			}finally{
				pred.unlock();
			}
		}
	}

細粒度的鎖對粗粒度的鎖有一定的優化,可是還存在問題:

當前面的節點被鎖住時,後面的節點無法操作。必須等待前面的鎖釋放


所以一種自然而然的想法就是“僅僅在須要加鎖的時候再加鎖”,也就是樂觀鎖

1. 僅僅有在尋找到要加鎖位置的時候才加鎖。之前不加鎖

2. 須要加鎖時,先加鎖,再進行驗證是否現場已經被改動

3. 假設被改動就須要從頭開始再次查找。是一個輪詢的過程

所以樂觀鎖也是一個輪詢檢查的過程。

來看看檢查的方法,檢查的目的有兩個:

1. 確認要操作的現場沒有被改動。也就是說pred.next == curr

2. 確認pred和curr都是能夠到達的。沒有被物理刪除,也就是node = node.next能夠到達

private boolean validate(NodeWithLock<T> pred, NodeWithLock<T> curr){
			NodeWithLock<T> node = head;
			while(node.key <= pred.key){
				if(node.key == pred.key){
					return pred.next == curr;
				}
				node = node.next;
			}
			return false;
		}

再來看看基於樂觀鎖的Set實現

1. 先不加鎖。直到找到要處理的現場,也就是while(curr.key < key)退出的地方。退出之後有兩種情況。curr.key == key和curr.key > key。

2. 找到現場後,要對pred和curr都加鎖,加鎖順序也是從前往後的順序

3. 驗證現場未被改動,然後進行操作,假設被改動了,就釋放鎖。再次從頭節點開始輪詢操作

class OptimisticSet<T> implements Set<T>{

		private final NodeWithLock<T> head;
		
		public OptimisticSet(){
			head = new NodeWithLock<T>();
			head.key = Integer.MIN_VALUE;
			NodeWithLock<T> MAX = new NodeWithLock<T>();
			MAX.key = Integer.MAX_VALUE;
			head.next = MAX;
		}
		
		@Override
		public boolean add(T item) {
			NodeWithLock<T> pred, curr;
			int key = item.hashCode();
			while(true){
				pred = head;
				curr = pred.next;
				
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				pred.lock();
				curr.lock();
				try{
					if(validate(pred, curr)){
						if(curr.key == key){
							return false;
						}
						NodeWithLock<T> node = new NodeWithLock<T>(item);
						node.next = curr;
						pred.next = node;
						return true;
					}
				}finally{
					pred.unlock();
					curr.unlock();
				}
			}
		}

		@Override
		public boolean remove(T item) {
			NodeWithLock<T> pred, curr;
			int key = item.hashCode();
			while(true){
				pred = head;
				curr = pred.next;
				
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				pred.lock();
				curr.lock();
				try{
					if(validate(pred, curr)){
						if(curr.key == key){
							pred.next = curr.next;
							curr.next = null;
							return true;
						}
						return false;
					}
				}finally{
					pred.unlock();
					curr.unlock();
				}
			}
		}

		@Override
		public boolean contains(T item) {
			NodeWithLock<T> pred, curr;
			int key = item.hashCode();
			while(true){
				pred = head;
				curr = pred.next;
				
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				pred.lock();
				curr.lock();
				try{
					if(validate(pred, curr)){
						return curr.key == key;
					}
				}finally{
					pred.unlock();
					curr.unlock();
				}
			}
		}
		
		private boolean validate(NodeWithLock<T> pred, NodeWithLock<T> curr){
			NodeWithLock<T> node = head;
			while(node.key <= pred.key){
				if(node.key == pred.key){
					return pred.next == curr;
				}
				node = node.next;
			}
			return false;
		}
	}


樂觀鎖降低了非常大的一部分鎖的爭用問題,可是它還是存在一定的鎖的沖突。尤其是contains操作時也須要加鎖。實際上假設contains一個節點要被刪除的話,能夠先標記成邏輯刪除。再進行物理刪除。由於要刪除時是須要加鎖的。所以最後的物理刪除肯定會成功。假設先標記成邏輯刪除。那麽contains操作的時候事實上是能夠不須要鎖的,假設它看到了節點被標記邏輯刪除了。那麽肯定就contains失敗了,假設沒有看到邏輯刪除。那麽表示在contains操作的時候是還沒有被刪除的,即使它可能已經被加鎖準備刪除了。可是contains看到的是一個狀態的快照。當contains操作的時候確實是存在的。所以返回true也是正確的。

快照這個思想在並發編程中是非常重要的思想。由於並發的存在,假設不使用同步手段,比方加鎖,CAS操作,那麽看到的都非常可能是快照。是一瞬間的狀態。不能全然依據這一瞬間的狀態來決定興許操作,可是看到快照的那一瞬間的操作確實是成功的


所以能夠通過把刪除操作分為邏輯刪除和物理刪除兩個節點來把contains操作的鎖去掉,由於contains操作也是一個非常頻繁的操作。

能夠通過給節點加入一個狀態來表示是否被刪除,最簡單的做法就是加入一個volatile字段保證狀態的可見性

static class NodeWithLockAndMark<T>{
		T item;
		NodeWithLockAndMark<T> next;
		int key;
		java.util.concurrent.locks.Lock lock = new ReentrantLock();
                // 標記邏輯刪除的狀態   
                volatile boolean marked;
		
		public NodeWithLockAndMark(T item){
			this.item = item;
			this.key = item.hashCode();
		}
		
		public NodeWithLockAndMark(){}
		
		public void lock(){
			lock.lock();
		}
		
		public void unlock(){
			lock.unlock();
		}
	}

懶惰的Set實現是在樂觀鎖的基礎上實現的。有了邏輯狀態,validate方法就簡化了。僅僅須要推斷現場的節點是否被標記刪除了,而且現場未被改動過

private boolean validate(NodeWithLockAndMark<T> pred, NodeWithLockAndMark<T> curr){
			return !pred.marked && !curr.marked && pred.next == curr;
		}
看看懶惰Set的實現

1. add和remove操作和樂觀鎖的過程基本一致,僅僅是在remove時,先標記節點的邏輯刪除狀態,再物理刪除

2. contains方法能夠去掉鎖了。註意的是它也是保證快照的正確性

class LazySet<T> implements Set<T>{

		private final NodeWithLockAndMark<T> head;
		
		public LazySet(){
			head = new NodeWithLockAndMark<T>();
			head.key = Integer.MIN_VALUE;
			NodeWithLockAndMark<T> MAX = new NodeWithLockAndMark<T>();
			MAX.key = Integer.MAX_VALUE;
			head.next = MAX;
		}
		
		@Override
		public boolean add(T item) {
			NodeWithLockAndMark<T> pred, curr;
			int key = item.hashCode();
			while(true){
				pred = head;
				curr = pred.next;
				
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				pred.lock();
				curr.lock();
				try{
					if(validate(pred, curr)){
						if(curr.key == key){
							return false;
						}
						NodeWithLockAndMark<T> node = new NodeWithLockAndMark<T>(item);
						node.next = curr;
						pred.next = node;
						return true;
					}
				}finally{
					pred.unlock();
					curr.unlock();
				}
			}
		}

		@Override
		public boolean remove(T item) {
			NodeWithLockAndMark<T> pred, curr;
			int key = item.hashCode();
			while(true){
				pred = head;
				curr = pred.next;
				
				while(curr.key < key){
					pred = curr;
					curr = curr.next;
				}
				pred.lock();
				curr.lock();
				try{
					if(validate(pred, curr)){
						if(curr.key == key){
							// logical remove Node, use volatile to make sure visibility
							curr.marked = true;
							pred.next = curr.next;
							curr.next = null;
							return true;
						}
						return false;
					}
				}finally{
					pred.unlock();
					curr.unlock();
				}
			}
		}

		@Override
		public boolean contains(T item) {
			NodeWithLockAndMark<T> curr = head;
			int key = item.hashCode();
			while(curr.key < key){
				curr = curr.next;
			}
			return curr.key == key && !curr.marked;
		}
		
		private boolean validate(NodeWithLockAndMark<T> pred, NodeWithLockAndMark<T> curr){
			return !pred.marked && !curr.marked && pred.next == curr;
		}
	}

最後來看看真正無鎖的Set的實現,在懶惰Set實現中已經把contains方法的鎖去了,無鎖實現須要把add和remove中的鎖也去掉。無鎖的Set須要保證的是假設一個節點被標記邏輯刪除了,那麽它的next字段就不能被使用了 ,也就是不能被改動了。否則的話可能出現一個節點被標記邏輯刪除了。可是其它現場沒看到,還在繼續使用它的next字段,這樣就出現了無效的add或remove。

AtomicMarkableReference能夠保證這個場景的原子性。它維護了marked狀態和next引用的一個二元狀態。這個二元狀態的改動是原子的。AtomicMarkableReference的很多其它內容請看這篇聊聊高並發(二十)解析java.util.concurrent各個組件(二) 12個原子變量相關類

所以我們對Node再次進行改進。把next字段改成了AtomicMarkableReference

這裏隱含了三個理解無鎖實現的關鍵點

1. next的標記狀態表示的是當前節點是否被邏輯刪除,通過CAS推斷這個狀態能夠知道當前節點是否被邏輯刪除了

2. 通過CAS比較next的Reference就能夠知道當前節點和next節點的物理關系是否被改動了,也是就是能夠知道下一個節點是否被物理刪除了

3. 由於next標記的狀態是當前節點的狀態。所以當前節點是無法知道下一個節點是否被邏輯刪除了的。這個點非常重要,由於無鎖的加入可能會出現加入的節點的興許節點是一個已經被邏輯刪除,可是還沒有物理刪除的節點

static class NodeWithAtomicMark<T>{
		T item;
		AtomicMarkableReference<NodeWithAtomicMark<T>> next = new AtomicMarkableReference<NodeWithAtomicMark<T>>(null, false);
		int key;
		
		public NodeWithAtomicMark(T item){
			this.item = item;
			this.key = item.hashCode();
		}
		
		public NodeWithAtomicMark(){}
		
	}

理解上面的3點是理解無鎖實現的關鍵。前兩點能保證節點知道自己是否被標記了,而且知道後繼節點是否被刪除了。

第三點的存在要求無鎖實現必須有一個單獨的物理刪除節點的過程。

所以抽取出了一個單獨的類來尋找要操作的現場,而且尋找過程中物理地刪除節點。

Position類表示要操作的現場,它返回pred和curr節點的指針。

findPosition方法從頭節點往後尋找位置,邊尋找邊物理刪除被標記邏輯刪除的節點。

最後返回的時候的Postion的curr.key >= 要操作節點的HashCode

class Position<T>{
		NodeWithAtomicMark<T> pred, curr;
		public Position(NodeWithAtomicMark<T> pred, NodeWithAtomicMark<T> curr){
			this.pred = pred;
			this.curr = curr;
		}
		
		public Position(){}
		
		public Position<T> findPosition(NodeWithAtomicMark<T> head, int key){
			boolean[] markHolder = new boolean[1];
			NodeWithAtomicMark<T> pred, curr, succ;
			boolean success;
			retry: while(true){
				pred = head;
				curr = pred.next.getReference();
				// 清除被邏輯刪除的節點。也就是被標記的節點
				while(true){
					succ = curr.next.get(markHolder);
					if(markHolder[0]){
						success = pred.next.compareAndSet(curr, succ, false, false);
						if(!success){
							continue retry;
						}
						curr = succ;
						succ = curr.next.get(markHolder);
					}
				
					if(curr.key >= key){
						return new Position<T>(pred, curr);
					}
					pred = curr;
					curr = succ;
				}
			}
		}
	}

來看看無鎖Set的實現

1. 無鎖加入的時候,先用Position來尋找現場。隱含了一點是尋找的時候同一時候物理刪除了節點

2. 推斷是否存在的時候也是推斷的快照狀態。僅僅保證弱一致性

3. 假設找到了要插入的現場。新建一個節點。並把next指向curr。這裏須要理解curr可能被標記了邏輯刪除,後面pred的next字段的CAS操作會保證curr沒有被物理刪除。可是如上面第三點說的,無法知道curr是否被邏輯刪除了。所以新增加的節點的next可能指向了一個被邏輯刪除的節點。可是不影響邏輯。由於下一個Postion.findPosition操作會正確地刪除被標記邏輯刪除的節點。

contains操作也會推斷節點是否被邏輯刪除

4. pred.next通過AtomicMarkableReference的CAS操作保證了curr節點沒有被物理刪除。假設CAS成功,說明加入成功了

5. 刪除也一樣,先尋找現場,同一時候物理刪除被標記的節點

6. 找到現場後,先CAS改動當前節點的狀態為被標記邏輯刪除,

7. 嘗試物理刪除一次,假設這裏物理刪除成功了,那麽假設正好有一個add在相同的現場操作。那麽add的Pred的CAS操作會失敗。假設這裏物理刪除失敗,那麽就把邏輯刪除的節點保留在鏈表中,等下一次findPosition操作來真正的物理刪除

8. contains操作從頭節點開始遍歷,推斷節點內容,而且沒有被標記邏輯刪除

9. 全部的CAS操作都配合了輪詢,這樣保證終於CAS操作的成功

class LockFreeSet<T> implements Set<T>{

		private final NodeWithAtomicMark<T> head;
		
		public LockFreeSet(){
			head = new NodeWithAtomicMark<T>();
			head.key = Integer.MIN_VALUE;
			NodeWithAtomicMark<T> MAX = new NodeWithAtomicMark<T>();
			MAX.key = Integer.MAX_VALUE;
			head.next.set(MAX, false);
		}
		@Override
		public boolean add(T item) {
			NodeWithAtomicMark<T> pred, curr;
			int key = item.hashCode();
			Position p = new Position();
			while(true){
				Position position = p.findPosition(head, key);
				pred = position.pred;
				curr = position.curr;
				// 假設已經存在,僅僅能保證弱一致性,這裏僅僅是一個當時狀態的快照。有可能相等的節點在這之後被其它線程已經被刪除了。可是這裏不能看到
				if(curr.key == key){
					return false;
				}
				NodeWithAtomicMark<T> node = new NodeWithAtomicMark<T>(item);
				node.next.set(curr, false);
				// 二元狀態保證:
				// 1.pred是未被標記的;
				// 2.curr未被物理刪除。還是pred的興許節點,這時候即使curr已經被邏輯刪除了。也不影響加入成功。

curr會在下一次find被物理刪除 // 二元狀態無法知道curr是否被邏輯刪除,由於mark表示的是自己節點的狀態。

可是curr是否被邏輯刪除不影響加入成功,僅僅要不被物理刪除即可 if(pred.next.compareAndSet(curr, node, false, false)){ return true; } } } @Override public boolean remove(T item) { NodeWithAtomicMark<T> pred, curr; int key = item.hashCode(); Position p = new Position(); boolean success = false; while(true){ Position position = p.findPosition(head, key); pred = position.pred; curr = position.curr; // 假設已經存在,僅僅能保證弱一致性。這裏僅僅是一個當時狀態的快照。有可能相等的節點在這之後被其它線程已經被刪除了,可是這裏不能看到 if(curr.key == key){ NodeWithAtomicMark<T> succ = curr.next.getReference(); success = curr.next.compareAndSet(succ, succ, false, true); if(!success){ continue; } pred.next.compareAndSet(curr, succ, false, false); return true; }else{ return false; } } } @Override public boolean contains(T item) { NodeWithAtomicMark<T> curr = head; int key = item.hashCode(); boolean[] markHolder = new boolean[1]; while(curr.key < key){ curr = curr.next.getReference(); // 檢查是否被刪除 curr.next.get(markHolder); } return curr.key == key && !markHolder[0]; } }



聊聊高並發(三十二)實現一個基於鏈表的無鎖Set集合