1. 程式人生 > >非阻塞同步演算法實戰(一)

非阻塞同步演算法實戰(一)

前言

本文寫給對ConcurrentLinkedQueue的實現和非阻塞同步演算法的實現原理有一定了解,但缺少實踐經驗的朋友,文中包括了實戰中的嘗試、所走的彎路,經驗和教訓。

背景介紹

上個月,我被安排獨自負責一個聊天系統的服務端,因為一些原因,我沒使用現成的開源框架,網路那塊直接使用AIO,收資料時,因為只會從channel裡過來,所以不需要考慮同步問題;但是傳送資料時,因為有聊天訊息的轉發,所以必需處理這個同步問題。AIO中,是處理完一個註冊的操作後,再執行我們定義的方法,此時,如果還有資料需要寫,則繼續註冊寫操作,如果沒有則不註冊;提交資料時,如果當前沒有註冊寫操作,則註冊一個,否則僅提交(此時再註冊則會報異常)。這樣,需要同步的點就是:如何知道當前還有沒有資料需要傳送(因為其它執行緒也可以提交資料到此處),和如何知道此次提交時,有沒有註冊寫操作。總之,要完成:有資料要傳送時,必需有寫操作被註冊,並且只能註冊一次;沒有資料時,不能有寫操作被註冊。

問題分析

經過分析,上面的問題,可以抽象成:我需要知道當往佇列裡插入一條資料之前,該佇列是否為空,如果為空則應該註冊新的寫操作。當從佇列裡取出一條資料後,該佇列是否為非空,如果非空則應該繼續註冊寫操作。(本文之後以“關注的操作”來表示這種場景下的插入或取出操作)

目前的問題是,我使用的佇列是ConcurrentLinkedQueue,但是它的取出資料的方法,沒有返回值告訴我們從佇列裡取出資料之後佇列是否為空,如果是使用size或peek方法來執行判斷,那就必需加鎖了,否則在拿到佇列大小時,可能佇列大小已經變化了。所以我首先想到的是,如何對該佇列進行改造,讓它提供該資訊。

注意:這裡指的不是當次而是之後,所以如果我們使用佇列的peek()方法返回null,就知道佇列是否為空,但是不知道之後是否為空 ,並且,當關注的操作發生時,在插入或取出操作的返回值裡告知此資訊,來指導是否繼續註冊寫操作。

用鎖實現

如果使用鎖的話很容易處理,用鎖同步插入和取出方法,下面是鎖實現的參考:

    public E poll() {
        synchronized (this) {
            E re = q.poll();
            // 獲取元素後,佇列是空,表示是我關注的操作
            if (q.peek() == null) {

            }
            return re;
        }
    }

    public void offer(E e) {
        synchronized (this) {
            // 插入元素前,佇列是空,表示是我關注的操作
            if (q.peek() == null) {

            }
            q.offer(e);
        }
    }

但因為是服務端,我想用非阻塞同步演算法來實現。

嘗試方案一

我第一次想到的改造辦法是,將head佔位節點改成固定的,頭節點移動時,只將head節點的next指向新的節點,在插入資料時,如果是在head節點上成功執行的該操作,那麼該插入就是關注的的操作;在取出時,如果將head節點的next置為了null,那麼該取出就是關注的操作( 因為之前的佔位節點是變化的,所以沒法比較,必需用同步,現在是固定的了,所以可以直接與head節點進行比較 )。如此一來,問題好像被解決了。改造完之後,出於嚴謹,我仔細讀了一遍程式碼,發現引入了新的問題,我的取出操作是這樣寫的

/**
 * @author [email protected]
 */
public E poll(){
    for(;;){
        Node n = head.nextRef.get();//head指向固定的head節點,為final
        if(n == null)
            return null;
        Node m = n.nextRef.get();
        if(head.next.compareAndSet(n,m){
            if(m==null)
                ;//此時為關注的操作(為了簡化程式碼顯示,不將該資訊當作返回值返回了,僅註釋)
            return n.itemRef.get();
        }
    }
}

這裡有一個致命的問題:如果m為null,在CAS期間,插入了新節點,n的next由null變成了非null,緊接著又把head的next更新為了null,那麼鏈就斷了,該方法還存在一些其它的問題,如當佇列為空的時候,尾節點指向了錯誤的位置,本應該是head的。我認為最根本的原因在於,head不能設為固定的,否則會引發一堆問題。第一次嘗試宣告失敗。

嘗試方案二

這次我嘗試將head跟tail兩個引用包裝成一個物件,然後對這個物件進行CAS更新(這僅為一處理論上的嘗試,因為從效能上面來講,已經大打折扣了,建立了很多額外的物件),如果head跟tail指向了同一個節點,則認為佇列是空的,根據此資訊來判斷一個操作是不是關注的操作。但該嘗試僅停留在了理論分析階段,期間發現了一些小問題,沒法解決,後來我發現,我把ConcurrentLinkedQueue原本可以分成兩步執行的插入和取出操作(更新節點的next或item引用,然後再更新head或tail引用),變成了必需一步完成,ConcurrentLinkedQueue尚不能一步完成,我何德何能,可將它們一步完成?所以直接放棄了。

解決方案一

經過兩次的失敗嘗試,我幾乎絕望了,我懷疑這是不是不能判斷出是否為關注的操作。

因為是在做專案,週末已經過去了,不能再搞這些“研究”了,所以我退而求其次,想了個不太漂亮的辦法,在佇列外部維護一個變數,記錄佇列有多大,在插入或取出後,更新該變數,使用的是AtomicInteger,如果更新時,將變數從1變成0,或從0變成了1,就認為該插入或取出為關注的操作。

    private AtomicInteger size = new AtomicInteger(0);
    public E poll() {
        E re = q.poll();
        if (re == null)
            return null;
        for(int old;;){
            old = size.get();
            if(size.compareAndSet(old,old-1)){
                // 獲取元素後,佇列是空,表示是我關注的操作
                if(old == 1){

                }
                break;
            }
        }
        return re;
    }

    public void offer(E e) {
        q.offer(e);
        for(int old;;){
            old = size.get();
            if(size.compareAndSet(old,old+1)){
                // 插入元素前,佇列是空,表示是我關注的操作
                if(old == 0){

                }
                break;
            }
        }
    }

此時,也許細心的朋友會問,因為沒有使用鎖,這個變數並不能真實反映佇列的大小,也就不能確定它是不是關注的操作。沒錯,是不能真實反映,但是,我獲取關注的操作的目的是用來指導我:該不該註冊新的寫操作,該變數的值變化就能提供正確的指導,所以,同樣是正確的,只不過途徑不同而已。理論上的分析和後來的專案正確執行都印證了該方法的正確性。

解決方案二

因為上面的方法額外加了一次lock-free級別的CAS操作,我心裡總不太舒服,空餘時間總在琢磨,真的就沒有辦法,在不增加額外lock-free級別CAS開支的情況下,知曉一個操作是不是關注的操作?

後來經分析,如果要知曉是不是關注的操作,跟兩個資料有關,真實的頭節點跟尾節點(不同於head跟tail,因為它們是滯後的,之前將它們包裝成一個物件就是犯了該錯誤),ConcurrentLinkedQueue的實現中,這兩個節點是沒有競爭的,一個是更新item,一個是更新next,必需得讓他們競爭同一個東西,才能解決該問題,於是我想到了一個辦法,取出完成後,如果該節點的next為null,就將其用CAS置為一個特殊的值,若成功則認為是關注的操作;插入成功後,如果next被替換掉的值不是null而是這個特殊值,那麼該插入也為關注的操作。這僅增加了一次wait-free級別的CAS操作(取出後的那次CAS),perfect!

因為ConcurrentLinkedQueue的很多變數、內部類都是私有的,沒法通過繼承來改造,沒辦法,只得自行實現。對於佇列裡使用的Node,實現的方式有很多種,可以使用AtomicReference、AtomicReferenceFieldUpdater來實現,如果你願意的話,甚至是像ConcurrentLinkedQueue一樣,用sun.misc.Unsafe來實現(注意:一般來說,sun包下的類是不推薦使用的),各有優缺點吧,所以我就不提供該佇列的具體實現了,下面給出在ConcurrentLinkedQueue(版本:1.7.0_10)基礎上進行的改造,供參考。注意,如果需要用到size等方法,因為特殊值的引入,影響了之前的判斷邏輯,應重新編寫。

   /**
    * @author [email protected]
    */
    private static final Node MARK_NODE = new Node(null);

    public boolean offer(E e) {
        checkNotNull(e);
        final Node newNode = new Node(e);

        for (Node t = tail, p = t;;) {
            Node q = p.next;
            if (q == null || q == MARK_NODE) {//修改1:加入條件:或q == MARK_NODE
                // p is last node
                if (p.casNext(q, newNode)) {//修改2:將null改為q
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (q == MARK_NODE)//修改3:
                        ;//此時為關注的操作(為了簡化程式碼顯示,僅註釋)
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode); // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list. If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable. Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

    public E poll() {
        restartFromHead:
        for (;;) {
            for (Node h = head, p = h, q;;) {
                E item = p.item;

                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    if (p.casNext(null,MARK_NODE))//修改1:
                        ;//此時為關注的操作
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

小結

設計非阻塞演算法的關鍵在於,找出競爭點,如果獲取的某個資訊跟兩個操作有關,那麼應該讓這兩個操作競爭同一個東西,這樣才能反應出它們的關係。