1. 程式人生 > >聊聊高併發(八)實現幾種自旋鎖(三)

聊聊高併發(八)實現幾種自旋鎖(三)

聊聊高併發(七)實現幾種自旋鎖(二) 這篇中介紹了兩種佇列鎖,一種是有界佇列鎖,一種是無界佇列鎖。其中無界佇列鎖CLHLock採用了連結串列的方式來組織多執行緒,使用了兩個ThreadLocal做指標指向自身的node和前一個node。它的特點是在前一個node的lock狀態自旋,當前一個node的鎖釋放時,會自動通知下一個執行緒去獲得鎖。

CLHLock是無飢餓的,保證先來先服務公平性,只有少量的快取一致性流量,在SMP系統結構中,是一種比較完善的鎖。但是在沒有cache的NUMA系統架構中,由於在前一個節點的lock狀態上自旋,NUMA架構中處理器訪問本地記憶體的速度高於通過網路訪問其他節點的記憶體,所以CLHLock在NUMA架構上不是最優的自旋鎖。

這篇介紹一種適合在無cache的NUMA系統架構中比較完善的佇列鎖MCSLock。它的特點是:

1. 使用1個ThreadLocal指標來做連結串列,由QNode自身維護下一個節點的指標

2. 執行緒在自身節點自旋,而不是CLHLock那樣在前一個節點自旋

3. 在釋放鎖時需要判斷是否是唯一節點,需要做一次CAS操作,如果不是唯一節點,要稍微等待連結串列關係的建立

package com.zc.lock;

import java.util.concurrent.atomic.AtomicReference;

/**
 * 無界佇列鎖,使用一個連結串列來組織執行緒
 * 假設L把鎖,n個執行緒,那麼鎖的空間複雜度為O(L+n)
 * **/
public class MCSLock implements Lock{
	// 原子變數指向隊尾
	private AtomicReference<QNode> tail;
	// 兩個指標,一個指向自己的Node,一個指向前一個Node
	ThreadLocal<QNode> myNode;
	
	public MCSLock(){
		tail = new AtomicReference<QNode>(null);
		myNode = new ThreadLocal<QNode>(){
			protected QNode initialValue(){
				return new QNode();
			}
		};
	}
	
	@Override
	public void lock() {
		QNode node = myNode.get();
		// CAS原子操作,保證原子性
		QNode preNode = tail.getAndSet(node);
		// 如果preNode等於空,證明是第一個獲取鎖的
		if(preNode != null){
			node.lock = true;
			preNode.next = node;
			// 對執行緒自己的node進行自旋,對無cache的NUMA系統架構來說,訪問本地記憶體速度優於其他節點的記憶體
			while(node.lock){
				
			}
		}
	}

	@Override
	public void unlock() {
		QNode node = myNode.get();
		if(node.next == null){
			// CAS操作,判斷是否沒有新加入的節點
			if(tail.compareAndSet(node, null)){
				// 沒有新加入的節點,直接返回
				return;
			}
			// 有新加入的節點,等待設定鏈關係
			while(node.next == null){
				
			}
		}
		// 通知下一個節點獲取鎖
		node.next.lock = false;
		// 設定next節點為空,為下次獲取鎖清理狀態
		node.next = null;
	}
	
	public static class QNode {
		volatile boolean lock;
		volatile QNode next;
	}
	
	public String toString(){
		return "MCSLock";
	}
}


下面採用和上一篇同樣的測試用例來測試MCSLock的正確性
package com.zc.lock;

public class Main {
	//private static Lock lock = new TimeCost(new ArrayLock(150));
	
	private static Lock lock = new MCSLock();
	
	//private static TimeCost timeCost = new TimeCost(new TTASLock());
	
	private static volatile int value = 0;
	public static void method(){
		lock.lock();
		System.out.println("Value: " + ++value);
		lock.unlock();
	}
	
	public static void main(String[] args) {
		for(int i = 0; i < 50; i ++){
			Thread t = new Thread(new Runnable(){
	
				@Override
				public void run() {
					method();
				}
				
			});
			t.start();
		}
	}

}

測試結果:順序地打印出volatile變數++的結果,證明同一時刻只有一個執行緒在做volatile++操作,證明加鎖成功。
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Value: 16
Value: 17
Value: 18
Value: 19
Value: 20
Value: 21
Value: 22
Value: 23
Value: 24
Value: 25
Value: 26
Value: 27
Value: 28
Value: 29
Value: 30
Value: 31
Value: 32
Value: 33
Value: 34
Value: 35
Value: 36
Value: 37
Value: 38
Value: 39
Value: 40
Value: 41
Value: 42
Value: 43
Value: 44
Value: 45
Value: 46
Value: 47
Value: 48
Value: 49
Value: 50