1. 程式人生 > >java多執行緒——併發測試

java多執行緒——併發測試

這是java多執行緒第6篇:

--------------------------------------------------------------------

編寫併發程式時候,可以採取和序列程式相同的程式設計方式。

唯一的難點在於,併發程式存在不確定性,這種不確定性會令程式出錯的地方遠比序列程式多,出現的方式也沒有固定規則。那麼如何在測試中,儘可能的暴露出這些問題,並且瞭解其效能瓶頸,本篇針對這些問題來做個簡要總結。

本篇主要包含以下內容:

1. 併發測試分類

2. 正確性測試

3. 安全性測試

4. 效能測試

1併發測試分類測試流程

併發測試和序列測試有相同的部分,比如都需要線測試其在序列情況下的正確性,這個是保證後續測試的基礎。

當然了,正確性測試和我們的序列測試方沒有什麼不同,都是在保證其程式在單執行緒情況下執行和序列執行有相同的結果,這個我們不再陳述。 

一般的併發測試,我們按照以下流程來進行。

分類

併發測試大致可以分為兩類:安全性測試與活躍性測試。

安全性測試我們可以定義為“不發生任何錯誤的行為”,也可以理解為保持一致性。比如i++操作,但單執行緒情況下,迴圈20次,i=20,可是在多執行緒情況下,如果總共迴圈20次,結果不為20,那麼這個結果就是錯誤的,說明出現了錯誤的執行緒安全問題。

我們在測試這種問題的時候,必須要增加一個”test point”保證其原子性同時又不影響程式的正確性。以此為判斷條件執行測試程式碼,關於“test point”如何做,我們後續再討論。

活躍性測試定義為“某個良好的行為終究會發生”,也可以為理解為程式執行有必然的結果,不會出現因某個方法阻塞,而執行緩慢,或者是發生了執行緒死鎖,導致一直等待的狀態等。

與活躍性測試相關的是效能測試。主要有以下幾個方面進行衡量:吞吐量,響應性,可伸縮性。

吞吐量:一組併發任務中已完成任務所佔的比例。或者說是一定時間內完成任務的數量。

響應性:請求從發出到完成之間的時間。

可伸縮性:在增加更多資源(CPU,IO,記憶體),吞吐量的提升情況。

2安全性測試

安全性測試,如前面所說是“不發生任何錯誤的行為”,也是要對其資料競爭可能引發的錯誤進行測試。這也是我們需要找到一個功能中併發的的“test point”,並對其額外的構造一些測試。而且這些測試最好不需要任何同步機制。

我們通過一個例子來進行說明。

採用ArrayBlockingQueue,我們知道這個列表是採用一個有界的阻塞佇列來實現的生產-消費模式的。如果對其測試併發問題的,重要的就是對put和take方法進行測試,一種有效的方法就是檢查被放入佇列中和出佇列中的各個元素是否相等。

如果出現數據安全性的問題,那麼必然入佇列的值和出佇列的值沒有發生對應,結果也不盡相同。比如多執行緒情況下,我們把所有入列元素和出列元素的校檢和進行比較,如果二者相等,那麼表明測試成功。

為了保證其能夠測試到所有要點,需要對入隊的值進行隨機生成,令每次測試得到的結果不盡相同。另外為了保證其公平性,要保證所有的執行緒一起開始運算,防止先進行的程式進行序列運算。

public class PutTakeTest {

    protected static final ExecutorService pool = Executors.newCachedThreadPool();

    //柵欄,通過它可以實現讓一組執行緒等待至某個狀態之後再全部同時執行

    protected CyclicBarrier barrier;

    protected final ArrayBlockingQueue<Integer> bb ;

    protected final int nTrials , nPairs;

    //入列總和

    protected final AtomicInteger putSum = new AtomicInteger(0) ;

    //出列總和

    protected final AtomicInteger takeSum = new AtomicInteger(0) ;

    public static void main(String[] args) throws Exception {

        new PutTakeTest(10 , 10, 100000).test() ; // 10個承載因子,10個執行緒,執行100000

        pool.shutdown() ;

    }

    public PutTakeTest(int capacity, int npairs, int ntrials) {

        this.bb = new ArrayBlockingQueue<Integer>(capacity);

        this.nTrials = ntrials;

        this.nPairs = npairs;

        this.barrier = new CyclicBarrier(npairs * 2 + 1) ;

    }

    void test() {

        try {

            for (int i = 0; i < nPairs ; i++) {

                pool.execute( new Producer());

                pool.execute(new Consumer()) ;

            }

            barrier.await() ; // 等待所有的執行緒就緒

            barrier.await() ; // 等待所有的執行緒執行完成

            System. out.println("result,put==take :"+(putSum.get()== takeSum.get()));

        } catch (Exception e) {

            throw new RuntimeException(e) ;

        }

    }

    static int xorShift( int y) {

        y ^= (y << 6);

        y ^= (y >>> 21) ;

        y ^= (y << 7) ;

        return y;

    }

    //生產者

    class Producer implements Runnable {

        public void run() {

            try {

                int seed = (this.hashCode() ^ (int) System.nanoTime()) ;

                int sum = 0;

                barrier.await();

                for (int i = nTrials; i > 0; --i) {

                    bb.put(seed) ;

                    sum += seed;

                    seed = xorShift(seed);

                }

                putSum.getAndAdd(sum) ;

                barrier.await();

            } catch (Exception e) {

                throw new RuntimeException(e);

            }

        }

    }

    //消費者

    class Consumer implements Runnable {

        public void run() {

            try {

                barrier.await() ;

                int sum = 0;

                for (int i = nTrials; i > 0; --i) {

                    sum += bb.take() ;

                }

                takeSum.getAndAdd(sum) ;

                barrier.await();

            } catch (Exception e) {

                throw new RuntimeException(e);

            }

        }

    }

}

以上程式中,我們增加putSum和takeSum變數,用來統計put和take資料的校檢和。同時採用 CyclicBarrier(迴環柵欄)令所有的執行緒同一時間從相同的位置開始執行。每個執行緒的入列資料,為了保證其唯一性,都生成一個唯一的seed,在下列程式碼執行出,必然是多執行緒競爭的地方。

    for (int i = nTrials; i > 0; --i) {

         bb.put(seed);

                    sum += seed;

                    seed = xorShift(seed);

       }

如果此處出現執行緒安全問題,那麼最終take出來的資料和put的資料必然是不相同的,最終putSum和takeSum的值必然不同,相反則相同。

由於併發程式碼中大多數錯誤都是一些低概率的事件,因此在測試的時候,還是需要反覆測試多次,以提高發現錯誤的概率。

3效能測試

效能測試通常是功能測試的延伸。雖然效能測試與功能測試之間會有重疊之處,但它們的目標是不同的。

首先效能測試需要反映出被測試物件在應用程式中的實際用法以及它的吞吐量。另外需要根據經驗值來調整各種不同的限值,比如執行緒數,併發數等,從而令程式更好的在系統上執行。

我們對上述的PutTakeTest進行擴充套件,增加以下功能:

1、為了保證時間精確性,增加一個執行一組操作的執行時間。

採用BarrierTimer來維護單組執行時間,它implements Runnable,在計數達到柵欄(CyclicBarrier)一組的數量之後,會呼叫一次該回調,設定結束時間。

我們用它來記錄,一組的總時間。有總時間了,單次操作的時間就可以計算出來了。如此我們就可以計算出單個測試的吞吐量。

吞吐量=1ms/單次操作的時間=每秒可以執行的次數。

以下是基於柵欄的計時器。

public class BarrierTimer implements Runnable{

    private boolean started ;

    private long startTime , endTime;

    public synchronized void run() {

        long t = System.nanoTime() ;

        if (!started) {

            started = true;

            startTime = t;

        } else

            endTime = t;

    }

    public synchronized void clear() {

        started = false;

    }

    public synchronized long getTime() {

        return endTime - startTime;

    }

}

2、效能測試需要針對不同引數組合進行測試。

通過不同引數來進行組合測試,以此來獲得在不同引數下的吞吐率,以及不同執行緒數量下的可伸縮性。在putTakeTest裡面,我們只只針對安全性測試。

我們看加強版本的TimedPutTakeTest,這裡我們把ArrayBlockingQueue的容量分別設定為1、10、100、1000,令其線上程數量分別為1、2、4、8、16、32、64、128的情況下,看其連結串列的吞吐率。

    public static void main(String[] args) throws Exception {

        int tpt = 100000 ; // trials per thread

        for (int cap = 1; cap <= 1000; cap *= 10) {

            System.out.println( "Capacity: " + cap);

            for (int pairs = 1; pairs <= 128 ; pairs *= 2) {

                TimedPutTakeTest t = new TimedPutTakeTest(cap, pairs, tpt);

                System.out.print("Pairs: " + pairs + "\t") ;

                t.test();

                System.out.print("\t ");

                Thread.sleep(1000);

                t.test();

                System.out.println();

                Thread.sleep(1000);

            }

        }

        PutTakeTest.pool.shutdown() ;

    }

以下是我們針對ArrayBlockingQueue的效能測試結果,我的電腦硬體環境是:

CPU: i7 4核8執行緒

memory:  16G

硬碟: SSD110G

jdk 環境

java version “1.8.0_45"

Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)

從上面可以看到以下幾點情況

1、在ArrayBlockingQueue的快取容量在1的情況下,無論線性併發數為多少,都不能顯著的提升其吞吐率。這是因為每個執行緒在阻塞等待另外執行緒執行任務。

2、當嘗試把快取容量提升至10、100、1000的時候,吞吐率都得到了極大的提高,特別是在1000的時候,最高可達到900w次/s。

3、當執行緒增加到16個的時候,吞吐率會達到頂峰,然後再增加執行緒吞吐率不生反而下降,當然沒有很大的下降,這是因為,當執行緒增多的時候,大部分時間耗費在阻塞和解除阻塞上面了。

其他阻塞佇列的比較

以下是針對ArrayBlockingQueue、LinkedBlockingQueue、LinkedBlockingDeQue、PriorityBlockingQueue幾種阻塞佇列進行的橫向測評。硬體環境還是和上述相同。jdk還是採用1.8的API。

每個佇列的快取容量是1000。然後分別在1、2、4、8、16、32、64、128的執行緒併發下,檢視其吞吐率。

從上述資料中,我們可以看到:

1、ArrayBlockingQueue在jdk1.8的優化下效能高於LinkedBlockingQueue,雖然兩者差別不是太大,這個是1.6之前,LinkedBlockingQueue是要優於ArrayBlockingQueue的。

2、PriorityBlockingQueue在達到290w的吞吐高峰之後,效能開始持續的下降,這是因為優先佇列需要不斷的優化優先列表,而需要一定的排序時間。

以上測試的主要目的是,測試生產者和消費者在通過有界put和take傳送資料時,那些約束條件將對整個吞吐量產生影響。所以會忽略了許多實際的因素。另外由於jit的動態編譯,會直接將編譯後的程式碼直接編譯為機器程式碼。

所以以上測試需要經過預熱處理,執行更多的次數,以保證所有的程式碼都是編譯完成之後,才統計測試的執行時間。

4最後

測試併發程式的正確性可能會特別困難的,因為併發程式的許多故障都是一些低概率的事情,並且它們對執行時序、負載情況以及其他難以重現的條件比較敏感。

要想盡可能的發現這些錯誤,就需要我們做更多的工作來進行分析測試,期待今天的介紹能夠幫助大家開闊一些思路。