1. 程式人生 > >29、Java並發性和多線程-非阻塞算法

29、Java並發性和多線程-非阻塞算法

新元素 modify gte boolean algo 信號 調度 alt 有著

以下內容轉自http://ifeve.com/non-blocking-algorithms/:

在並發上下文中,非阻塞算法是一種允許線程在阻塞其他線程的情況下訪問共享狀態的算法。在絕大多數項目中,在算法中如果一個線程的掛起沒有導致其它的線程掛起,我們就說這個算法是非阻塞的。

為了更好的理解阻塞算法和非阻塞算法之間的區別,我會先講解阻塞算法然後再講解非阻塞算法。

阻塞並發算法

一個阻塞並發算法一般分下面兩步:

  • 執行線程請求的操作
  • 阻塞線程直到可以安全地執行操作

很多算法和並發數據結構都是阻塞的。例如,java.util.concurrent.BlockingQueue的不同實現都是阻塞數據結構。如果一個線程要往一個阻塞隊列中插入一個元素,隊列中沒有足夠的空間,執行插入操作的線程就會阻塞直到隊列中有了可以存放插入元素的空間。

下圖演示了一個阻塞算法保證一個共享數據結構的行為:

技術分享

非阻塞並發算法

一個非阻塞並發算法一般包含下面兩步:

  • 執行線程請求的操作
  • 通知請求線程操作不能被執行

Java也包含幾個非阻塞數據結構。AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference都是非阻塞數據結構的例子。

下圖演示了一個非阻塞算法保證一個共享數據結構的行為:

技術分享

非阻塞算法vs阻塞算法

阻塞算法和非阻塞算法的主要不同在於上面兩部分描述的它們的行為的第二步。換句話說,它們之間的不同在於當請求操作不能夠執行時阻塞算法和非阻塞算法會怎麽做。

阻塞算法會阻塞線程知道請求操作可以被執行。非阻塞算法會通知請求線程操作不能夠被執行,並返回。

一個使用了阻塞算法的線程可能會阻塞直到有可能去處理請求。通常,其它線程的動作使第一個線程執行請求的動作成為了可能。 如果,由於某些原因線程被阻塞在程序某處,因此不能讓第一個線程的請求動作被執行,第一個線程會阻塞——可能一直阻塞或者直到其他線程執行完必要的動作。

例如,如果一個線程產生往一個已經滿了的阻塞隊列裏插入一個元素,這個線程就會阻塞,直到其他線程從這個阻塞隊列中取走了一些元素。如果由於某些原因,從阻塞隊列中取元素的線程假定被阻塞在了程序的某處,那麽,嘗試往阻塞隊列中添加新元素的線程就會阻塞,要麽一直阻塞下去,要麽知道從阻塞隊列中取元素的線程最終從阻塞隊列中取走了一個元素。

非阻塞並發數據結構

在一個多線程系統中,線程間通常通過一些數據結構”交流“。例如可以是任何的數據結構,從變量到更高級的俄數據結構(隊列,棧等)。為了確保正確,並發線程在訪問這些數據結構的時候,這些數據結構必須由一些並發算法來保證。這些並發算法讓這些數據結構成為並發數據結構。

如果某個算法確保一個並發數據結構是阻塞的,它就被稱為是一個阻塞算法。這個數據結構也被稱為是一個阻塞,並發數據結構。

如果某個算法確保一個並發數據結構是非阻塞的,它就被稱為是一個非阻塞算法。這個數據結構也被稱為是一個非阻塞,並發數據結構。

每個並發數據結構被設計用來支持一個特定的通信方法。使用哪種並發數據結構取決於你的通信需要。在接下裏的部分,我會引入一些非阻塞並發數據結構,並講解它們各自的適用場景。通過這些並發數據結構工作原理的講解應該能在非阻塞數據結構的設計和實現上一些啟發。

Volatile變量

Java中的volatile變量是直接從主存中讀取值的變量。當一個新的值賦給一個volatile變量時,這個值總是會被立即寫回到主存中去。這樣就確保了,一個volatile變量最新的值總是對跑在其他CPU上的線程可見。其他線程每次會從主存中讀取變量的值,而不是比如線程所運行CPU的CPU緩存中。

colatile變量是非阻塞的。修改一個volatile變量的值是一耳光原子操作。它不能夠被中斷。不過,在一個volatile變量上的一個 read-update-write 順序的操作不是原子的。因此,下面的代碼如果由多個線程執行可能導致競態條件。

volatile myVar = 0;

...
int temp = myVar;
temp++;
myVar = temp;

首先,myVar這個volatile變量的值被從主存中讀出來賦給了temp變量。然後,temp變量自增1。然後,temp變量的值又賦給了myVar這個volatile變量這意味著它會被寫回到主存中。

如果兩個線程執行這段代碼,然後它們都讀取myVar的值,加1後,把它的值寫回到主存。這樣就存在myVar僅被加1,而沒有被加2的風險。

你可能認為你不會寫像上面這樣的代碼,但是在實踐中上面的代碼等同於如下的代碼:

myVar++;

執行上面的代碼時,myVar的值讀到一個CPU寄存器或者一個本地CPU緩存中,myVar加1,然後這個CPU寄存器或者CPU緩存中的值被寫回到主存中。

單個寫線程的情景

在一些場景下,你僅有一個線程在向一個共享變量寫,多個線程在讀這個變量。當僅有一個線程在更新一個變量,不管有多少個線程在讀這個變量,都不會發生競態條件。因此,無論時候當僅有一個線程在寫一個共享變量時,你可以把這個變量聲明為volatile

當多個線程在一個共享變量上執行一個read-update-write的順序操作時才會發生競態條件。如果你只有一個線程在執行一個raed-update-write的順序操作,其他線程都在執行讀操作,將不會發生競態條件。

下面是一個單個寫線程的例子,它沒有采取同步手段但任然是並發的。

public class SingleWriterCounter {

    private volatile long count = 0;

    /**
     * Only one thread may ever call this method,
     * or it will lead to race conditions.
     */
    public void inc() {
        this.count++;
    }


    /**
     * Many reading threads may call this method
     * @return
     */
    public long count() {
        return this.count;
    }
}

多個線程訪問同一個Counter實例,只要僅有一個線程調用inc()方法,這裏,我不是說在某一時刻一個線程,我的意思是,僅有相同的,單個的線程被允許去調用inc()方法。多個線程可以調用count()方法。這樣的場景將不會發生任何競態條件。

下圖,說明了線程是如何訪問count這個volatile變量的。

技術分享

基於volatile變量更高級的數據結構

使用多個volatile變量去創建數據結構是可以的,構建出的數據結構中每一個volatile變量僅被一個單個的線程寫,被多個線程讀。每個volatile變量可能被一個不同的線程寫(但僅有一個)。使用像這樣的數據結構多個線程可以使用這些volatile變量以一個非阻塞的方法彼此發送信息。

下面是一個簡單的例子:

public class DoubleWriterCounter {

    private volatile long countA = 0;
    private volatile long countB = 0;

    /**
     * Only one (and the same from thereon) thread may ever call this method,
     * or it will lead to race conditions.
     */
    public void incA() { this.countA++;  }


    /**
     * Only one (and the same from thereon) thread may ever call this method,
     * or it will lead to race conditions.
     */
    public void incB() { this.countB++;  }


    /**
     * Many reading threads may call this method
     */
    public long countA() { return this.countA; }


    /**
     * Many reading threads may call this method
     */
    public long countB() { return this.countB; }
}

如你所見,DoubleWriterCoounter現在包含兩個volatile變量以及兩對自增和讀方法。在某一時刻,僅有一個單個的線程可以調用inc(),僅有一個單個的線程可以訪問incB()。不過不同的線程可以同時調用incA()incB()countA()countB()可以被多個線程調用。這將不會引發競態條件。

DoubleWriterCoounter可以被用來比如線程間通信。countA和countB可以分別用來存儲生產的任務數和消費的任務數。下圖,展示了兩個線程通過類似於上面的一個數據結構進行通信的。

技術分享

聰明的讀者應該已經意識到使用兩個SingleWriterCounter可以達到使用DoubleWriterCoounter的效果。如果需要,你甚至可以使用多個線程和SingleWriterCounter實例。

使用CAS的樂觀鎖

如果你確實需要多個線程區寫同一個共享變量,volatile變量是不合適的。你將會需要一些類型的排它鎖(悲觀鎖)訪問這個變量。下面代碼演示了使用Java中的同步塊進行排他訪問的。

public class SynchronizedCounter {
    long count = 0;

    public void inc() {
        synchronized(this) {
            count++;
        }
    }

    public long count() {
        synchronized(this) {
            return this.count;
        }
    }
}

註意,,inc()count()方法都包含一個同步塊。這也是我們像避免的東西——同步塊和wait()-notify調用等。

我們可以使用一種Java的原子變量來代替這兩個同步塊。在這個例子是AtomicLong。下面是SynchronizedCounter類的AtomicLong實現版本。

import java.util.concurrent.atomic.AtomicLong;

public class AtomicCounter {
    private AtomicLong count = new AtomicLong(0);

    public void inc() {
        boolean updated = false;
        while(!updated){
            long prevCount = this.count.get();
            updated = this.count.compareAndSet(prevCount, prevCount + 1);
        }
    }

    public long count() {
        return this.count.get();
    }
}

這個版本僅僅是上一個版本的線程安全版本。這一版我們感興趣的是inc()方法的實現。inc()方法中不再含有一個同步塊。而是被下面這些代碼替代:

boolean updated = false;
while(!updated){
    long prevCount = this.count.get();
    updated = this.count.compareAndSet(prevCount, prevCount + 1);
}

上面這些代碼並不是一個原子操作。也就是說,對於兩個不同的線程去調用inc()方法,然後執行long prevCount = this.count.get()語句,因此獲得了這個計數器的上一個count。但是,上面的代碼並沒有包含任何的競態條件。

秘密就在於while循環裏的第二行代碼。compareAndSet()方法調用是一個原子操作。它用一個期望值和AtomicLong內部的值去比較,如果這兩個值相等,就把AtomicLong內部值替換為一個新值。compareAndSet()通常被CPU中的compare-and-swap指令直接支持。因此,不需要去同步,也不需要去掛起線程。

假設,這個AtomicLong的內部值是20,。然後,兩個線程去讀這個值,都嘗試調用compareAndSet(20, 20 + 1)。盡管compareAndSet()是一個原子操作,這個方法也會被這兩個線程相繼執行(某一個時刻只有一個)。

第一個線程會使用期望值20(這個計數器的上一個值)與AtomicLong的內部值進行比較。由於兩個值是相等的,AtomicLong會更新它的內部值至21(20 + 1 )。變量updated被修改為true,while循環結束。

現在,第二個線程調用compareAndSet(20, 20 + 1)。由於AtomicLong的內部值不再是20,這個調用將不會成功。AtomicLong的值不會再被修改為21。變量,updated被修改為false,線程將會再次在while循環外自旋。這段時間,它會讀到值21並企圖把值更新為22。如果在此期間沒有其它線程調用inc()。第二次叠代將會成功更新AtomicLong的內部值到22。

為什麽稱它為樂觀鎖

上一部分展現的代碼被稱為樂觀鎖(optimistic locking)。樂觀鎖區別於傳統的鎖,有時也被稱為悲觀鎖。傳統的鎖會使用同步塊或其他類型的鎖阻塞對臨界區域的訪問。一個同步塊或鎖可能會導致線程掛起。

樂觀鎖允許所有的線程在不發生阻塞的情況下創建一份共享內存的拷貝。這些線程接下來可能會對它們的拷貝進行修改,並企圖把它們修改後的版本寫回到共享內存中。如果沒有其它線程對共享內存做任何修改, CAS操作就允許線程將它的變化寫回到共享內存中去。如果,另一個線程已經修改了共享內存,這個線程將不得不再次獲得一個新的拷貝,在新的拷貝上做出修改,並嘗試再次把它們寫回到共享內存中去。

稱之為“樂觀鎖”的原因就是,線程獲得它們想修改的數據的拷貝並做出修改,在樂觀的假在此期間沒有線程對共享內存做出修改的情況下。當這個樂觀假設成立時,這個線程僅僅在無鎖的情況下完成共享內存的更新。當這個假設不成立時,線程所做的工作就會被丟棄,但任然不使用鎖。

樂觀鎖使用於共享內存競用不是非常高的情況。如果共享內存上的內容非常多,僅僅因為更新共享內存失敗,就用浪費大量的CPU周期用在拷貝和修改上。但是,如果砸共享內存上有大量的內容,無論如何,你都要把你的代碼設計的產生的爭用更低。

樂觀鎖是非阻塞的

我們這裏提到的樂觀鎖機制是非阻塞的。如果一個線程獲得了一份共享內存的拷貝,當嘗試修改時,發生了阻塞,其它線程去訪問這塊內存區域不會發生阻塞。

對於一個傳統的加鎖/解鎖模式,當一個線程持有一個鎖時,其它所有的線程都會一直阻塞直到持有鎖的線程再次釋放掉這個鎖。如果持有鎖的這個線程被阻塞在某處,這個鎖將很長一段時間不能被釋放,甚至可能一直不能被釋放。

不可替換的數據結構

簡單的CAS樂觀鎖可以用於共享數據結果,這樣一來,整個數據結構都可以通過一個單個的CAS操作被替換成為一個新的數據結構。盡管,使用一個修改後的拷貝來替換真個數據結構並不總是可行的。

假設,這個共享數據結構是隊列。每當線程嘗試從向隊列中插入或從隊列中取出元素時,都必須拷貝這個隊列然後在拷貝上做出期望的修改。我們可以通過使用一個AtomicReference來達到同樣的目的。拷貝引用,拷貝和修改隊列,嘗試替換在AtomicReference中的引用讓它指向新創建的隊列。

然而,一個大的數據結構可能會需要大量的內存和CPU周期來復制。這會使你的程序占用大量的內存和浪費大量的時間再拷貝操作上。這將會降低你的程序的性能,特別是這個數據結構的競用非常高情況下。更進一步說,一個線程花費在拷貝和修改這個數據結構上的時間越長,其它線程在此期間修改這個數據結構的可能性就越大。如你所知,如果另一個線程修改了這個數據結構在它被拷貝後,其它所有的線程都不等不再次執行 拷貝-修改 操作。這將會增大性能影響和內存浪費,甚至更多。

接下來的部分將會講解一種實現非阻塞數據結構的方法,這種數據結構可以被並發修改,而不僅僅是拷貝和修改。

共享預期的修改

用來替換拷貝和修改整個數據結構,一個線程可以共享它們對共享數據結構預期的修改。一個線程向對修改某個數據結構的過程變成了下面這樣:

  • 檢查是否另一個線程已經提交了對這個數據結構提交了修改
  • 如果沒有其他線程提交了一個預期的修改,創建一個預期的修改,然後向這個數據結構提交預期的修改
  • 執行對共享數據結構的修改
  • 移除對這個預期的修改的引用,向其它線程發送信號,告訴它們這個預期的修改已經被執行

如你所見,第二步可以阻塞其他線程提交一個預期的修改。因此,第二步實際的工作是作為這個數據結構的一個鎖。如果一個線程已經成功提交了一個預期的修改,其他線程就不可以再提交一個預期的修改直到第一個預期的修改執行完畢。

如果一個線程提交了一個預期的修改,然後做一些其它的工作時發生阻塞,這時候,這個共享數據結構實際上是被鎖住的。其它線程可以檢測到它們不能夠提交一個預期的修改,然後回去做一些其它的事情。很明顯,我們需要解決這個問題。

可完成的預期修改

為了避免一個已經提交的預期修改可以鎖住共享數據結構,一個已經提交的預期修改必須包含足夠的信息讓其他線程來完成這次修改。因此,如果一個提交了預期修改的線程從未完成這次修改,其他線程可以在它的支持下完成這次修改,保證這個共享數據結構對其他線程可用。

下圖說明了上面描述的非阻塞算法的藍圖:

技術分享

修改必須被當做一個或多個CAS操作來執行。因此,如果兩個線程嘗試去完成同一個預期修改,僅有一個線程可以所有的CAS操作。一旦一條CAS操作完成後,再次企圖完成這個CAS操作都不會“得逞”。

A-B-A問題

上面演示的算法可以稱之為A-B-A問題。A-B-A問題指的是一個變量被從A修改到了B,然後又被修改回A的一種情景。其他線程對於這種情況卻一無所知。

如果線程A檢查正在進行的數據更新,拷貝,被線程調度器掛起,一個線程B在此期可能可以訪問這個共享數據結構。如果線程對這個數據結構執行了全部的更新,移除了它的預期修改,這樣看起來,好像線程A自從拷貝了這個數據結構以來沒有對它做任何的修改。然而,一個修改確實已經發生了。當線程A繼續基於現在已經過期的數據拷貝執行它的更新時,這個數據修改已經被線程B的修改破壞。

下圖說明了上面提到的A-B-A問題:

技術分享

A-B-A問題的解決方案

A-B-A通常的解決方法就是不再僅僅替換指向一個預期修改對象的指針,而是指針結合一個計數器,然後使用一個單個的CAS操作來替換指針 + 計數器。這在支持指針的語言像C和C++中是可行的。因此,盡管當前修改指針被設置回指向 “不是正在進行的修改”(no ongoing modification),指針+計數器的計數器部分將會被自增,使修改對其它線程是可見的。

在Java中,你不能將一個引用和一個計數器歸並在一起形成一個單個的變量。不過Java提供了AtomicStampedReference類,利用這個類可以使用一個CAS操作自動的替換一個引用和一個標記(stamp)。

一個非阻塞算法模板

下面的代碼意在在如何實現非阻塞算法上一些啟發。這個模板基於這篇教程所講的東西。

註意:在非阻塞算法方面,我並不是一位專家,所以,下面的模板可能錯誤。不要基於我提供的模板實現自己的非阻塞算法。這個模板意在給你一個關於非阻塞算法大致是什麽樣子的一個idea。如果,你想實現自己的非阻塞算法,首先學習一些實際的工業水平的非阻塞算法的時間,在實踐中學習更多關於非阻塞算法實現的知識。

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicStampedReference;

public class NonblockingTemplate {

    public static class IntendedModification {
        public AtomicBoolean completed = new AtomicBoolean(false);
    }

    private AtomicStampedReference<IntendedModification>
        ongoingMod = new AtomicStampedReference<IntendedModification>(null, 0);

    //declare the state of the data structure here.


    public void modify() {
        while(!attemptModifyASR());
    }

    public boolean attemptModifyASR(){

        boolean modified = false;
    
        IntendedModification currentlyOngoingMod = ongoingMod.getReference();
        int stamp = ongoingMod.getStamp();
    
        if(currentlyOngoingMod == null){
            //copy data structure state - for use
            //in intended modification
        
            //prepare intended modification
            IntendedModification newMod = new IntendedModification();
        
            boolean modSubmitted = ongoingMod.compareAndSet(null, newMod, stamp, stamp + 1);
        
            if(modSubmitted){
            
                //complete modification via a series of compare-and-swap operations.
                //note: other threads may assist in completing the compare-and-swap
                // operations, so some CAS may fail
            
                modified = true;
            }
    
        } else {
            //attempt to complete ongoing modification, so the data structure is freed up
            //to allow access from this thread.
        
            modified = false;
        }
    
        return modified;
    }
}

非阻塞算法是不容易實現的

正確的設計和實現非阻塞算法是不容易的。在嘗試設計你的非阻塞算法之前,看一看是否已經有人設計了一種非阻塞算法正滿足你的需求。

Java已經提供了一些非阻塞實現(比如 ConcurrentLinkedQueue),相信在Java未來的版本中會帶來更多的非阻塞算法的實現。

除了Java內置非阻塞數據結構還有很多開源的非阻塞數據結構可以使用。例如,LAMX Disrupter和Cliff Click實現的非阻塞 HashMap。查看我的Java concurrency references page查看更多的資源。

使用非阻塞算法的好處

非阻塞算法和阻塞算法相比有幾個好處。下面讓我們分別看一下:

選擇

非阻塞算法的第一個好處是,給了線程一個選擇當它們請求的動作不能夠被執行時做些什麽。不再是被阻塞在那,請求線程關於做什麽有了一個選擇。有時候,一個線程什麽也不能做。在這種情況下,它可以選擇阻塞或自我等待,像這樣把CPU的使用權讓給其它的任務。不過至少給了請求線程一個選擇的機會。

在一個單個的CPU系統可能會掛起一個不能執行請求動作的線程,這樣可以讓其它線程獲得CPU的使用權。不過即使在一個單個的CPU系統阻塞可能導致死鎖,線程饑餓等並發問題。

沒有死鎖

非阻塞算法的第二個好處是,一個線程的掛起不能導致其它線程掛起。這也意味著不會發生死鎖。兩個線程不能互相彼此等待來獲得被對方持有的鎖。因為線程不會阻塞當它們不能執行它們的請求動作時,它們不能阻塞互相等待。非阻塞算法任然可能產生活鎖(live lock),兩個線程一直請求一些動作,但一直被告知不能夠被執行(因為其他線程的動作)。

沒有線程掛起

掛起和恢復一個線程的代價是昂貴的。沒錯,隨著時間的推移,操作系統和線程庫已經越來越高效,線程掛起和恢復的成本也不斷降低。不過,線程的掛起和戶對任然需要付出很高的代價。

無論什麽時候,一個線程阻塞,就會被掛起。因此,引起了線程掛起和恢復過載。由於使用非阻塞算法線程不會被掛起,這種過載就不會發生。這就意味著CPU有可能花更多時間在執行實際的業務邏輯上而不是上下文切換。

在一個多個CPU的系統上,阻塞算法會對阻塞算法產生重要的影響。運行在CPUA上的一個線程阻塞等待運行在CPU B上的一個線程。這就降低了程序天生就具備的並行水平。當然,CPU A可以調度其他線程去運行,但是掛起和激活線程(上下文切換)的代價是昂貴的。需要掛起的線程越少越好。

降低線程延遲

在這裏我們提到的延遲指的是一個請求產生到線程實際的執行它之間的時間。因為在非阻塞算法中線程不會被掛起,它們就不需要付昂貴的,緩慢的線程激活成本。這就意味著當一個請求執行時可以得到更快的響應,減少它們的響應延遲。

非阻塞算法通常忙等待直到請求動作可以被執行來降低延遲。當然,在一個非阻塞數據數據結構有著很高的線程爭用的系統中,CPU可能在它們忙等待期間停止消耗大量的CPU周期。這一點需要牢牢記住。非阻塞算法可能不是最好的選擇如果你的數據結構哦有著很高的線程爭用。不過,也常常存在通過重構你的程序來達到更低的線程爭用。

29、Java並發性和多線程-非阻塞算法