1. 程式人生 > >並發工具類(五) Phaser類

並發工具類(五) Phaser類

dex 如果 例子 ride 模擬 問題 個數 在線 運行時

前言

??JDK中為了處理線程之間的同步問題,除了提供鎖機制之外,還提供了幾個非常有用的並發工具類:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;
??CountDownLatch、CyclicBarrier、Semphore、Phaser 這四個工具類提供一種並發流程的控制手段;而Exchanger工具類則提供了在線程之間交換數據的一種手段。

簡介

??Phaser 是JDK1.7版本中新增的,是一個可重用的同步barrier,它的功能與 CountDownLatch、CyclicBarrier 相似,但是使用起來更加靈活。可以用來解決控制多個線程分階段共同完成任務的情景問題。


??Phaser中有兩個重要的計數:

  • phase:當前的周期索引(或者 階段索引),初始值為0,當所有線程執行完本階段的任務後,phase就會加一,進入下一階段;可以結合onAdvance()方法,在不同的階段,執行不同的屏障方法。
  • parties:註冊的線程數,即Phaser要監控的線程數量,或者說是 建立的屏障的數量。屏障的數量不是固定的,每個階段的屏障的數量都可以是不一樣。

下面詳細介紹Phaser一些機制

1、Registration(註冊機制):與其他barrier不同的是,Phaser中的“註冊的同步者(parties)”會隨時間而變化,Phaser可以通過構造器初始化parties個數,也可以在Phaser運行期間隨時加入(方法 register( ), bulkRegister(int)

)新的parties,以及在運行期間註銷(方法 arriveAndDeregister( ) )parties。運行時可以隨時加入、註銷parties,只會影響Phaser內部的計數器,它建立任何內部的bookkeeping(賬本),因此task不能查詢自己是否已經註冊了,當然你可以通過實現子類來達成這一設計要求。

2、Synchronization(同步機制):類似於CyclicBarrier,Phaser也可以awaited多次,它的arrivedAndAwaitAdvance()方法的效果類似於CyclicBarrier的await()。Phaser的每個周期(generation)都有一個phase數字,phase 從0開始,當所有的已註冊的parties都到達後(arrive)將會導致此phase數字自增(advance),當達到Integer.MAX_VALUE後繼續從0開始。這個phase數字用於表示當前parties所處於的“階段周期”,它既可以標記和控制parties的wait行為、喚醒等待的時機。

  • Arrival:Phaser中的arrive()、arriveAndDeregister()方法,這兩個方法不會阻塞(block),但是會返回相應的phase數字,當此phase中最後一個party也arrive以後,phase數字將會增加,即phase進入下一個周期,同時觸發(onAdvance)那些阻塞在上一phase的線程。這一點類似於CyclicBarrier的barrier到達機制;更靈活的是,我們可以通過重寫onAdvance方法來實現更多的觸發行為。
  • Waiting:Phaser中的awaitAdvance()方法,需要指定一個phase數字,表示此Thread阻塞直到phase推進到此周期,arriveAndAwaitAdvance()方法阻塞到下一周期開始(或者當前phase結束)。不像CyclicBarrier,即使等待Thread已經interrupted,awaitAdvance方法會繼續等待。Phaser提供了Interruptible和Timout的阻塞機制,不過當線程Interrupted或者timout之後將會拋出異常,而不會修改Phaser的內部狀態。如果必要的話,你可以在遇到此類異常時,進行相應的恢復操作,通常是在調用forceTermination()方法之後。
    Phaser通常在ForJoinPool中執行tasks,它可以在有task阻塞等待advance時,確保其他tasks的充分並行能力。

3、Termination(終止):Phaser可以進入Termination狀態,可以通過isTermination()方法判斷;當Phaser被終止後,所有的同步方法將會立即返回(解除阻塞),不需要等到advance(即advance也會解除阻塞),且這些阻塞方法將會返回一個負值的phase值(awaitAdvance方法、arriveAndAwaitAdvance方法)。當然,向一個termination狀態的Phaser註冊party將不會有效;此時onAdvance()方法也將會返回true(默認實現),即所有的parties都會被deregister,即register個數為0。

4、Tiering(分層):Phaser可以“分層”,以tree的方式構建Phaser來降低“競爭”。如果一個Phaser中有大量parties,這會導致嚴重的同步競爭,所以我們可以將它們分組並共享一個parent Phaser,這樣可以提高吞吐能力;Phaser中註冊和註銷parties都會有Child 和parent Phaser自動管理。當Child Phaser中中註冊的parties變為非0時(在構造函數Phaser(Phaser parent,int parties),或者register()方法),Child Phaser將會註冊到其Parent上;當Child Phaser中的parties變為0時(比如由arrivedAndDegister()方法),那麽此時Child Phaser也將從其parent中註銷出去。

5、Monitoring.(監控):同步的方法只會被register操作調用,對於當前state的監控方法可以在任何時候調用,比如getRegisteredParties()獲取已經註冊的parties個數,getPhase()獲取當前phase周期數等;因為這些方法並非同步,所以只能反映當時的瞬間狀態。

Phaser的API介紹

構造方法

方法名 描述
Phaser() 構建一個Phaser
Phaser(int parties) 創建一個指定屏障數量的Phaser
Phaser(Phaser parent) 相當於 Phaser(parent, 0)
Phaser(Phaser parent, int parties) 創建一個指定屏障數量的Phaser,此phaser是註冊在另一個Phaser parent下

方法摘要

方法名 描述
public int arrive() 到達此phaser的屏障點,使phaser的到達的線程數加一,但不會阻塞等待其他線程。
返回:phase值,即當前階段(周期)的索引,或者是負值(當Phaser 停止時)
public int arriveAndDeregister() 到達此phaser的屏障點,使phaser的到達的線程數加一,並且會取消一個屏障點的註冊。也不會阻塞等待其他線程。
返回:phase值,即當前階段(周期)的索引,或者是負值(當Phaser 停止時)
public int arriveAndAwaitAdvance() 到達此phaser的屏障點,並且阻塞等待其他線程到達此屏障點。註意:這是非中斷的阻塞,此方法與awaitAdvance(arrive())等同。如果你希望阻塞機制支持timeout、interrupted響應,可以使用類似的其他方法(參見下文)。如果你希望到達後且註銷,而且阻塞等到當前phase下其他的parties到達,可以使用awaitAdvance(arriveAndDeregister())方法組合。
返回:phase值,即當前階段(周期)的索引;如果Phaser 停止,則返回負值
public int awaitAdvance(int phase) 在指定的階段(周期)phase下等待其他線程到達屏障點,註意:這是非中斷的阻塞。如果指定的phase與Phaser當前的phase不一致,或者Phaser 停止了,則立即返回。
參數 phase:通常就是arrive()、arriveAndDeregister()的返回值;
public int awaitAdvanceInterruptibly(int phase)
throws InterruptedException
此方法是可中斷的,其他與awaitAdvance()一致
public int awaitAdvanceInterruptibly(
int phase, long timeout,TimeUnit unit)
throws InterruptedException, TimeoutException
超時等待方法,其他與awaitAdvance()一致
public int register() 新註冊一個party,導致Phaser內部registerPaties數量加1;如果此時onAdvance方法正在執行,此方法將會等待它執行完畢後才會返回。此方法返回當前的phase周期數,如果Phaser已經中斷,將會返回負數。
public int bulkRegister(int parties) 批量註冊多個party,與register()相似
protected boolean onAdvance(int phase, int registeredParties) barrier action(屏障方法)。如果需要,則必須繼承Phaser類,重寫此方法。如果返回true表示此Phaser應該終止(此後將會把Phaser的狀態為termination,即isTermination()將返回true。),否則可以繼續進行。phase參數表示當前周期數,registeredParties表示當前已經註冊的parties個數。
默認實現為:return registeredParties == 0;在很多情況下,開發者可以通過重寫此方法,來實現自定義的
public void forceTermination() 強制終止,此後Phaser對象將不可用,即register等將不再有效。此方法將會導致Queue中所有的waiter線程被喚醒。這個方法對於在一個或多個任務遇到意外異常之後協調恢復是很有用的。
public int getArrivedParties() 獲取已經到達的parties個數。
public int getUnarrivedParties() 獲取沒有到達的parties個數。
public Phaser getParent() 獲取其父親類Phaser,沒有則返回null
public Phaser getRoot() 返回該phaser的根祖先,如果沒有父類,返回此phaser。
public boolean isTerminated() 如果該phaser被終止,則返回true。

@ Example1 多階段(周期)、帶屏障事件示例

??例子很簡單,模擬跑步比賽的過程,分為三個階段:1、參賽者到達起跑點,並在起跑點等待其他參賽者;2、參賽者齊人後,開始準備,並等待槍聲。3、參賽這到達終點,並結束比賽,不再等待任何情況。

public class PhaserTest{

public static MyPhaser myPhaser = new MyPhaser();

    public static void main(String[] args) {
        MyPhaser myPhaser = new MyPhaser();
        // 一次性註冊5個party,即建立5個屏障點
        myPhaser.bulkRegister(5);
        for (int i = 0; i < 5; i++) {
            Thread runner = new Thread(new Runnable() {

                @Override
                public void run() {
                    // 第一階段(周期),phaser的周期數初始值為0
                    System.out.println(Thread.currentThread().getName() + "到達了起跑點!");
                    // 到達了屏障點(起跑點),阻塞等待其他線程
                    myPhaser.arriveAndAwaitAdvance();

                    // 繼續運行,將進入第二階段,phaser的周期數加一
                    System.out.println(Thread.currentThread().getName() + "準備起跑!");
                    // 到達了屏障點(準備起跑),阻塞等待其他線程
                    myPhaser.arriveAndAwaitAdvance();

                    // 進入第三階段
                    System.out.println(Thread.currentThread().getName() + "到達了終點!");
                    // 參數者到達了終點,結束比賽,不再等待其他參賽者
                    myPhaser.arriveAndDeregister();// 取消註冊一個party
                }
            }, "參賽者" + i + "號");
            runner.start();
        }
    }
    }

MyPhaser類,定制 barrier action(屏障事件)

public class MyPhaser extends Phaser {

    @Override
    //改寫onAdvance方法
    public boolean onAdvance(int phase, int registeredParties) {
        //判斷當前的Phaser是否終止
        if (!isTerminated()) {
            // 分成三個階段,在不同的階段(周期),執行不同的屏障事件
            if (phase == 0) {
                // ....
                System.out.println("第一階段:所有參賽者都到達了起跑點!");
            } else if (phase == 1) {
                // ....
                System.out.println("第二階段:所有參賽者都已經就位,並準備好!比賽正式開始");
            } else if (phase == 2) {
                // ....
                System.out.println("第三階段:所有參賽者都到達終點,比賽結束!!");
            }
        }
        return super.onAdvance(phase, registeredParties);
    }
    }

運行結果:

參賽者0號到達了起跑點!
參賽者3號到達了起跑點!
參賽者4號到達了起跑點!
參賽者2號到達了起跑點!
參賽者1號到達了起跑點!
第一階段:所有參賽者都到達了起跑點!
參賽者0號準備起跑!
參賽者1號準備起跑!
參賽者2號準備起跑!
參賽者3號準備起跑!
參賽者4號準備起跑!
第二階段:所有參賽者都已經就位,並準備好!比賽正式開始
參賽者4號到達了終點!
參賽者1號到達了終點!
參賽者0號到達了終點!
參賽者2號到達了終點!
參賽者3號到達了終點!
第三階段:所有參賽者都到達終點,比賽結束!


@ Example2?分層示例

下面的例子:每一個Phaser周期類註冊的線程數目不能超過TASKS_PER_PHASER(例子中是4個),否則就要增加一層子phaser層。

public class PhaserTest6 {  
    //  
    private static final int  = 4;  
  
    public static void main(String args[]) throws Exception {  
        //  
        final int phaseToTerminate = 3;  
        //創建一個Phaser父類對象
        final Phaser phaser = new Phaser() {  
            @Override  
            protected boolean onAdvance(int phase, int registeredParties) { //屏障方法 
                System.out.println("====== " + phase + " ======");  
                return phase == phaseToTerminate || registeredParties == 0;  
            }  
        };  
          
        //創建10個任務  
        final Task tasks[] = new Task[10];  
        build(tasks, 0, tasks.length, phaser);  
        for (int i = 0; i < tasks.length; i++) {  
            System.out.println("starting thread, id: " + i);  
            final Thread thread = new Thread(tasks[i]);  
            thread.start();  
        }  
    }  
  
  //遞歸分層,
    public static void build(Task[] tasks, int lo, int hi, Phaser ph) {  
        
        //如果任務的數量超過每一層的phaser的閾值TASKS_PER_PHASER,則要繼續分層
        if (hi - lo > TASKS_PER_PHASER) {  
            for (int i = lo; i < hi; i += TASKS_PER_PHASER) {  
                int j = Math.min(i + TASKS_PER_PHASER, hi);  
                //當前的phaser(ph)作為父周期,來創建一個子phaser
                build(tasks, i, j, new Phaser(ph));  
            }  
        } else { 
            //線程的數量在閾值內,無需分成,可以直接註冊線程到當前的Phaser
            for (int i = lo; i < hi; ++i)  
                tasks[i] = new Task(i, ph);  
        }  
    }  
  
    public static class Task implements Runnable {  
        //  
        private final int id;  
        private final Phaser phaser;  
  
        public Task(int id, Phaser phaser) {  
            this.id = id;  
            this.phaser = phaser;  
            this.phaser.register();  
        }  
  
        @Override  
        public void run() {  
            while (!phaser.isTerminated()) {  
                try {  
                    Thread.sleep(200);  
                } catch (InterruptedException e) {  
                    // NOP  
                }  
                System.out.println("in Task.run(), phase: " + phaser.getPhase()    + ", id: " + this.id);  
                phaser.arriveAndAwaitAdvance();  
            }  
        }  
    }  
}  

需要註意的是,TASKS_PER_PHASER的值取決於具體的Task實現。對於Task執行時間很短的場景(也就是競爭相對激烈),可以考慮使用較小的TASKS_PER_PHASER值,例如4。反之可以適當增大

運行結果:

in Task.run(), phase: 0, id: 2
in Task.run(), phase: 0, id: 1
in Task.run(), phase: 0, id: 3
in Task.run(), phase: 0, id: 0
in Task.run(), phase: 0, id: 8
in Task.run(), phase: 0, id: 5
in Task.run(), phase: 0, id: 9
in Task.run(), phase: 0, id: 7
in Task.run(), phase: 0, id: 4
in Task.run(), phase: 0, id: 6
====== 0 ======
in Task.run(), phase: 1, id: 9
in Task.run(), phase: 1, id: 6
in Task.run(), phase: 1, id: 1
in Task.run(), phase: 1, id: 7
in Task.run(), phase: 1, id: 8
in Task.run(), phase: 1, id: 5
in Task.run(), phase: 1, id: 0
in Task.run(), phase: 1, id: 4
in Task.run(), phase: 1, id: 3
in Task.run(), phase: 1, id: 2
====== 1 ======
in Task.run(), phase: 2, id: 6
in Task.run(), phase: 2, id: 0
in Task.run(), phase: 2, id: 2
in Task.run(), phase: 2, id: 3
in Task.run(), phase: 2, id: 7
in Task.run(), phase: 2, id: 5
in Task.run(), phase: 2, id: 8
in Task.run(), phase: 2, id: 9
in Task.run(), phase: 2, id: 1
in Task.run(), phase: 2, id: 4
====== 2 ======
in Task.run(), phase: 3, id: 3
in Task.run(), phase: 3, id: 4
in Task.run(), phase: 3, id: 9
in Task.run(), phase: 3, id: 5
in Task.run(), phase: 3, id: 8
in Task.run(), phase: 3, id: 1
in Task.run(), phase: 3, id: 7
in Task.run(), phase: 3, id: 0
in Task.run(), phase: 3, id: 2
in Task.run(), phase: 3, id: 6
====== 3 ======




參考文獻:

  • http://shift-alt-ctrl.iteye.com/blog/2302923
  • http://whitesock.iteye.com/blog/1135457

並發工具類(五) Phaser類