1. 程式人生 > >Java多執行緒 -- JUC包原始碼分析18 -- ConcurrentSkipListMap(Set)/TreeMap(Set)/無鎖鏈表

Java多執行緒 -- JUC包原始碼分析18 -- ConcurrentSkipListMap(Set)/TreeMap(Set)/無鎖鏈表

-為什麼是SkipList,不是TreeMap的紅黑樹?
-無鎖鏈表的精髓
-ConcurrentSkipListMap
-ConcurrentSkipListSet

為什麼是SkipList?

大家都知道,在Java集合類中,有一個TreeMap,相對於HashMap,它的一個最大特點是:key是有序的。TreeMap的內部是用紅黑樹實現的。同HashMap一樣,它也是非執行緒安全的。

然後在JUC包中,提供了一個執行緒安全的、有序的HashMap,也就是今天所要講的ConcurrentSkipListMap。(關於什麼是SkipList,或者“跳躍表“,此處不再詳述,大家自行google之)

那為什麼實現這個執行緒安全的、有序的HashMap,不用紅黑樹,而用SkipList呢?

借用Doug Lea的原話:

The reason is that there are no known efficient lock-free insertion and deletion algorithms for search trees.

也就是目前計算機領域沒找到一種高效的、作用在樹上的、無鎖的、增加和刪除結點的辦法。

那為什麼SkipList就可以無鎖的實現結點的增加、刪除呢?那就要從以下的故事說起:

無鎖鏈表

問題的提出

在Doug Lea的註釋中,他引用了下面這篇無鎖鏈表的Paper:
“A pragmatic implementation of non-blocking linked lists” (大家自行google之)

咋一看,無鎖鏈表不是很簡單嘛,還值得寫一篇paper專門論述?我們在前面講AQS的時候,已反覆用到無鎖佇列,其實現也是連結串列。那究竟區別在哪呢?

我們前面所講的無鎖佇列、棧,都是隻在頭、尾做cas操作,這個問題不大。而玄機就在,如果你在連結串列的中間做insert/delete操作,照通常的cas做法,就會出問題!

關於這個問題,那篇論文中有清晰的論述,此處引用如下:
操作1: 在結點10後面插入結點20,沒什麼問題
這裡寫圖片描述

操作2:刪除結點10,也沒什麼問題
這裡寫圖片描述

但如果2個執行緒同時操作,一個刪除結點10,一個要在10後面插入結點20。這2個操作都各自是CAS的,這個時候就會出問題:新插入的結點20會丟失!這個問題不是簡單的用CAS就可以解決的。
這裡寫圖片描述

那為什麼會出現這個問題呢?
究其原因:我們在刪除結點10的時候,實際操作的是10的前驅H,10本身並沒有任何變化。這樣,在往10後插入20的這個執行緒,不知道10已經刪除了!!

要解決這個問題,論文中提出瞭如下的解決辦法:
這裡寫圖片描述

把10的刪除分2步:第1步,把10的next指標,mark成刪除,也就是軟刪;第2步,找機會,物理刪除。

這個解決辦法有一個關鍵點:就是“把10的next指標指向20(insert操作)“ 和 “判斷10本身是否刪除“這2個,必須是原子的,必須在1個cas裡面完成!!!

解決辦法

辦法1: AtomicMarkableReference
要實現上面所說的,一個辦法是用前面所提到的AtomicMarkableReference,也就是說,每個next要是AtomicMarkableReference型別。但這個辦法,不夠高效。Doug Lea在ConcurrentSkipListMap的實現中,用了另外一種辦法

辦法2:Mark結點
* +——+ +——+ +——+ +——+
* … | b |——>| n |—–>|marker|——>| f | …
* +——+ +——+ +——+ +——+
*
* 3. CAS b’s next pointer over both n and its marker.
* From this point on, no new traversals will encounter n,
* and it can eventually be GCed.
* +——+ +——+
* … | b |———————————–>| f | …
* +——+ +——+

我們的目的,不就是想標記結點n,也就是標記它的next欄位嘛?那就新造一個marker結點,讓n.next = marker。

這樣也達到了,刪除n的時候,標記n的next指標的目的。

ConcurrentSkipListMap

SkipList結構

解決了無鎖鏈表的insert/delete問題,也就解決了SkipList的一個關鍵問題。因為SkipList,通俗點講,就是多層連結串列疊起來的:一個大的單向連結串列,儲存所有的

//最底層Node結點。所有的<k,v>對,都是這個單向連結串列串起來的。
static final class Node<K,V> {
        final K key;
        volatile Object value;
        volatile Node<K,V> next;      
        。。。
}

//上面的Index層的結點
    static class Index<K,V> {
        final Node<K,V> node;  //不儲存實際資料,指向Node
        final Index<K,V> down;  //關鍵點:每個Index結點,必須有個指標,指向其下一個Level對應的結點
        volatile Index<K,V> right; //自己也組成單向連結串列
        。。。
 }

//整個ConcurrentSkipListMap就只用記錄最頂層的head結點
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentNavigableMap<K,V>,
               Cloneable,
               java.io.Serializable {
    。。。
    private transient volatile HeadIndex<K,V> head;
}

    final void initialize() {
        keySet = null;
        entrySet = null;
        values = null;
        descendingMap = null;
        randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
        head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
                                  null, null, 1);
    }

put操作

    public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        return doPut(key, value, false);
    }

    private V doPut(K kkey, V value, boolean onlyIfAbsent) {
        Comparable<? super K> key = comparable(kkey);
        for (;;) {
            Node<K,V> b = findPredecessor(key);  //先找到結點的前驅(一定在base level上)
            Node<K,V> n = b.next;
            for (;;) {
                if (n != null) {
                    Node<K,V> f = n.next;
                    if (n != b.next)               // inconsistent read
                        break;
                    Object v = n.value;
                    if (v == null) {               // n is deleted
                        n.helpDelete(b, f);
                        break;
                    }
                    if (v == n || b.value == null) // b is deleted
                        break;
                    int c = key.compareTo(n.key);
                    if (c > 0) {
                        b = n;
                        n = f;
                        continue;
                    }
                    if (c == 0) {     //結點存在,直接改值
                        if (onlyIfAbsent || n.casValue(v, value))
                            return (V)v;
                        else
                            break; // restart if lost race to replace value
                    }
                    // else c < 0; fall through
                }

                Node<K,V> z = new Node<K,V>(kkey, value, n);
                if (!b.casNext(n, z))  //結點不存在,新建一個,插入進入
                    break;         

                int level = randomLevel();  //關鍵點:判斷是否要給此結點加上Index層。randomLevel有50%概率會返回0,也就是說,50%的結點,不會有Index層,只會在最底層的連結串列上面

                if (level > 0)
                    insertIndex(z, level);   //level > 0,為其建索引
                return null;
            }
        }
    }

//關鍵函式:查詢一個結點的前驅,header開始,從左往右、從上往下遍歷
    private Node<K,V> findPredecessor(Comparable<? super K> key) {
        if (key == null)
            throw new NullPointerException(); // don't postpone errors
        for (;;) {
            Index<K,V> q = head;
            Index<K,V> r = q.right;
            for (;;) {
                if (r != null) {
                    Node<K,V> n = r.node;
                    K k = n.key;
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }
                    if (key.compareTo(k) > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
                Index<K,V> d = q.down;   //跳到下一層
                if (d != null) {
                    q = d;
                    r = d.right;
                } else
                    return q.node;
            }
        }
    }

get操作

    public V get(Object key) {
        return doGet(key);
    }

    private V doGet(Object okey) {
        Comparable<? super K> key = comparable(okey);

        for (;;) {
            Node<K,V> n = findNode(key);
            if (n == null)
                return null;
            Object v = n.value;
            if (v != null)
                return (V)v;
        }
    }

    private Node<K,V> findNode(Comparable<? super K> key) {
        for (;;) {
            Node<K,V> b = findPredecessor(key); //再次用的此函式
            Node<K,V> n = b.next;
            for (;;) {
                if (n == null)
                    return null;
                Node<K,V> f = n.next;
                if (n != b.next)                // inconsistent read
                    break;
                Object v = n.value;
                if (v == null) {                // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (v == n || b.value == null)  // b is deleted
                    break;
                int c = key.compareTo(n.key);
                if (c == 0)
                    return n;
                if (c < 0)
                    return null;
                b = n;
                n = f;
            }
        }
    }

ConcurrentSkipListMap中,關於連結串列的操作,細節非常多,上面只是分析了其整體實現思路。關於insert/delete, insertIndex的具體實現細節,本文不在詳述,留待大家對照程式碼仔細分析。

ConcurrentSkipListSet

我們知道,TreeSet是基於TreeMap實現的,就是對TreeMap的一個簡單封裝。

同樣, ConcurrentSkipListSet也就是對ConcurrentSkipListMap的一個簡單封裝,具體不再詳述。

    public ConcurrentSkipListSet() {
        m = new ConcurrentSkipListMap<E,Object>();
    }