1. 程式人生 > >多線程設計模式:第五篇 - Future模式和兩階段終止模式

多線程設計模式:第五篇 - Future模式和兩階段終止模式

捕獲 數量 如果 data counter 分享 就是 main 多次

一,Future模式

????????Future 的意思是未來,假設有一個方法需要花費很長的時間才能獲取運行結果,那麽與其一直等待,不如先拿到一份最終數據的模板,即 Future 角色,等過一陣子再通過 Future 角色去獲取數據,如果數據已經好了則直接返回,否則就一直等待到有數據返回。

????????這種模式可以用在不是馬上需要一個操作的返回值時,這樣可以提高程序的響應性,使得一個方法的多個步驟可以並行執行。

????????下面的代碼示例用來演示實現 Future 模式:

/**
 * @author koma <[email protected]>
 * @date 2018-11-04
 */
public class Main {
    public static void main(String[] args) {
        Host host = new Host();
        Data data1 = host.request(10, ‘A‘);
        Data data2 = host.request(20, ‘B‘);
        Data data3 = host.request(30, ‘C‘);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("data1 = "+data1.getContent());
        System.out.println("data2 = "+data2.getContent());
        System.out.println("data3 = "+data3.getContent());
    }
}

public interface Data {
    public abstract String getContent();
}

public class Host {
    //這裏使用 ThreadFactory 工廠類來創建線程
    private static final ThreadFactory threadFactory = Executors.defaultThreadFactory();

    public Data request(final int count, final char c) {
        System.out.println("Request BEGIN");

        FutureData futureData = new FutureData();
        threadFactory.newThread(new MakeRealDataTask(count, c, futureData)).start();

        System.out.println("Request END");
        return futureData;
    }
}

public class MakeRealDataTask implements Runnable {
    private final int count;
    private final char c;
    private final FutureData futureData;

    public MakeRealDataTask(int count, char c, FutureData futureData) {
        this.count = count;
        this.c = c;
        this.futureData = futureData;
    }

    @Override
    public void run() {
        RealData realData = new RealData(count, c);
        futureData.setRealData(realData);
    }
}

public class FutureData implements Data {
    private RealData realData = null;
    private boolean ready = false;

    /**
     * 這裏的 synchronized 主要是為了實現 wait/notify 語義
     * 本身該類並不需要同步
     *
     */
    public synchronized void setRealData(RealData realData) { //產生實際數據
        if (ready) {
            return;
        }
        this.realData = realData;
        this.ready = true;
        notifyAll();
    }

    @Override
    public synchronized String getContent() { //獲取實際數據,當數據還未產生時,則需要持續等待
        while (!ready) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return realData.getContent();
    }
}

public class RealData implements Data {
    private final String content;

    public RealData(int count, char c) {
        System.out.println("RealData BEGIN");
        char[] buffer = new char[count];
        for (int i = 0; i < count; i++) {
            buffer[i] = c;
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("RealData END");
        this.content = new String(buffer);
    }

    @Override
    public String getContent() {
        return content;
    }
}

????????Future 模式和 Thread-Per-Message 模式的區別在於 Future 模式可以從異步執行的線程中獲取返回值。

????????Future 模式也可以有變種使用方式,如:當前示例中我們對 Future 返回值的賦值只有一次,而實際上可以賦值多次,這樣通過一個返回對象在不同階段可以獲取到不同的返回值。

????????Java JUC 包中提供了用於支持 Future 模式的接口和類。Callable 接口聲明了 call() 方法,該方法和 Runnable 的 run() 方法類似,不同之處在於,call() 方法有返回值。Future 接口充當了 Future 角色,聲明了 get() 方法用於獲取值,設置值的方法則需要對應的實現類去實現,同時還聲明了 cancel() 方法,用於中斷程序。FutureTask 類實現了 Future 和 Runnable 接口。

????????下面的代碼示例,我們利用 juc 包中已有的類來改造我們的示例程序:

public class FutureData extends FutureTask<RealData> implements Data {
    public FutureData(Callable<RealData> callable) {
        super(callable);
    }

    @Override
    public String getContent() {
        String content = null;
        try {
            content = get().getContent();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return content;
    }
}

public class Host {
    private static final ThreadFactory threadFactory = Executors.defaultThreadFactory();

    public Data request(final int count, final char c) {
        System.out.println("Request BEGIN");

        FutureData futureData = new FutureData(new Callable<RealData>() {
            @Override
            public RealData call() throws Exception {
                return new RealData(count, c);
            }
        });
        threadFactory.newThread(futureData).start();

        System.out.println("Request END");
        return futureData;
    }
}

????????在創建 FutureTask 類實例時,Callable 對象作為構造函數參數傳遞進去,之後當調用 FutureTask 的 run() 方法時,那麽構造參數中接收的 Callable 對象的 call() 方法會被執行,call() 方法會同步的獲取 call() 方法的返回值,然後通過 FutureTask 的 set() 方法來設置該返回值,如果 call() 方法發生了異常,則調用 FutureTask 的 setException() 方法設置的異常處理函數。之後當我們需要時調用 FutureTask 的 get() 方法就能夠獲取到 call() 方法的返回值。

????????Executors 框架中通過 submit() 方法實現了 Future 模式,通過框架我們可以更加簡單方便的使用 Future 模式,改造後的代碼如下:

public class Main {
    public static void main(String[] args) {
        Host host = new Host();
        Future<RealData> data1 = host.request(10, ‘A‘);
        Future<RealData> data2 = host.request(20, ‘B‘);
        Future<RealData> data3 = host.request(30, ‘C‘);
        try {
            Thread.sleep(2000);
            System.out.println("data1 = "+data1.get().getContent());
            System.out.println("data2 = "+data2.get().getContent());
            System.out.println("data3 = "+data3.get().getContent());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace(); //當執行 call() 方法報錯時
        }
    }
}

public class Host {
    public Future<RealData> request(final int count, final char c) {
        System.out.println("Request BEGIN");

        //使用 Executors 框架的 Future 模式
        Future<RealData> future = Executors.newFixedThreadPool(1).submit(new Callable<RealData>() {
            @Override
            public RealData call() throws Exception {
                return new RealData(count, c);
            }
        });

        System.out.println("Request END");
        return future;
    }
}

//這裏只需要 RealData 類就夠了,其它類不再需要

二,兩階段終止模式

????????該模式通常用於優雅的終止線程,它的意思是先執行完終止處理程序再終止線程。

技術分享圖片
????????我們稱線程在進行正常處理時的狀態為操作中。在要求停止該線程時,我們發出終止請求,這樣線程不會突然終止,而是轉為終止處理中狀態,然後執行清理工作,完成之後就會真正的終止線程。

????????從操作中變為終止處理中這是終止的第一階段,在該階段下,線程不會再進行正常操作,而是只執行清理程序,清理完成之後,就會真正的終止線程,終止處理中狀態結束是線程終止的第二階段。

????????該模式的要點如下:

  • 安全的終止線程
  • 必定會終止線程
  • 發起終止請求後會盡快進行終止處理

????????下面是一個兩階段終止的演示程序:

/**
 * @author koma <[email protected]>
 * @date 2018-11-04
 */
public class Main {
    public static void main(String[] args) {
        try {
            CounterThread counterThread = new CounterThread();
            counterThread.start();

            Thread.sleep(10000);

            counterThread.shutdownRequest();

            counterThread.join(); //主線程等待counterThread線程終止
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class CounterThread extends Thread {
    private long counter = 0;
    private volatile boolean shutdownRequested = false;

    /**
     * 該方法是線程安全的
     *
     * 因為 shutdownRequested 只會被設置成 true 沒有別的方法會再把它設置成 false
     * 不存在數據競爭,因此也就允許多線程調用
     *
     */
    public void shutdownRequest() {
        shutdownRequested = true;
        //給線程自己發出中斷信號,確保線程在 sleep 或 wait 中也能正常響應終止請求
        interrupt();
    }

    public boolean isShutdownRequested() {
        return shutdownRequested;
    }

    @Override
    public void run() {
        try {
            while (!isShutdownRequested()) {//判斷終止請求
                doWork();
            }
        } finally {
            doShutdown();
        }
    }

    private void doWork() {
        counter++;
        System.out.println("doWorker: counter = "+counter);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
        }
    }

    private void doShutdown() {
        System.out.println("doShutdown: counter = "+counter);
    }
}

????????Executor 框架中的 ExecutorService 類提供了 shutdown() 方法來優雅的終止線程。該類還提供了 isShutdown() 和 isTerminated() 來判斷線程是否終止和是否實際停止。其區別在於當線程處於終止處理中狀態時,isShutdown() 返回 true,而 isTerminated() 返回 false。

1,中斷狀態和 InterruptedException 異常

????????當 interrupt() 方法被調用之後,線程就可以被中斷了。中斷線程這個行為會帶來以下結果之一:

????????(1) 線程變為中斷狀態,反映為"狀態"
????????(2) 拋出InterruptedException異常,反映為"控制"

????????通常情況下會是結果(1),但是當線程正則 sleep,wait,join 時則會是結果(2)(這時線程不變為中斷狀態)。

2,中斷狀態 轉換為 InterruptedException 異常

if (Thread.interrupted()) {
    throw new InterruptedException();
}

????????這段代碼可以把中斷狀態轉換為異常,其中 if 條件檢查的是當前線程的中斷狀態,同時會清除當前線程的中斷狀態,若不想清除線程的中斷狀態,則可以調用 Thread.currentThread().isInterrupted() 方法。

3,InterruptedException異常 轉換為 中斷狀態

try {
    Thread.sleep(1000);
} catch (InterruptedException e) {
    Thread.currentThread().interrupt(); //將捕獲到的中斷異常轉換為中斷狀態
}

????????這段代碼可以把中斷異常轉換為中斷狀態,如果不想當前線程的已經被中斷這個狀態信息丟失的話可以使用這種方式,即線程再次中斷一次自己。

4,juc 包與線程同步

????????當我們想讓某個線程等待指定的線程終止時,可以調用欲等待線程的 join() 方法,但是由於 join() 方法可以等待的只是線程終止這個一次性操作,當我們想要實現"等待指定次數的某種操作發生"這類需求時,則需要借助 juc 包中的 CountDownLatch 類。

????????CountDownLatch 類只能進行倒數,當想多次重復進行線程同步時,則使用 CyclicBarrier 會更加的方便。CyclicBarrier 可以周期性的創建屏障,在屏障解除之前,碰到屏障的線程無法繼續前進,屏障解除的條件是到達屏障處的線程數,達到了構造函數指定的個數。也就是說,當指定個數的線程達到屏障處後,屏障就會被解除。

????????下面的代碼實現了"讓三個線程處理一項分為0~4共5個階段的工作"的功能,我們要求,除非三個線程都完成了第N個階段,否則哪個線程都不允許進入到第N+1個階段:

/**
 * @author koma <[email protected]>
 * @date 2018-11-04 
 */
public class Main {
    private static final int THREADS = 3; //工作線程數

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(THREADS);

        //屏障解除時的操作
        Runnable barrierAction = new Runnable() {
            @Override
            public void run() {
                System.out.println("barrier action");
            }
        };

        //用來設定在屏障處等待的線程數量
        CyclicBarrier cyclicBarrier = new CyclicBarrier(THREADS, barrierAction);

        //用來設定等待結束線程的數量
        CountDownLatch countDownLatch = new CountDownLatch(THREADS);

        try {
            for (int i = 0; i < THREADS; i++) {
                executorService.execute(new MyTask(cyclicBarrier, countDownLatch, i));
            }
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}

public class MyTask implements Runnable {
    private static final int PAHSE = 5;
    private final CyclicBarrier pahseBarrier;
    private final CountDownLatch downLatch;
    private final int context;
    private static final Random random = new Random(315246);

    public MyTask(CyclicBarrier pahseBarrier, CountDownLatch downLatch, int context) {
        this.pahseBarrier = pahseBarrier;
        this.downLatch = downLatch;
        this.context = context;
    }

    @Override
    public void run() {
        try {
            //任務分 PAHSE 個階段執行,每個線程在進入到某一階段之後就等待其它進入該階段的線程
            //當每個階段的線程數達到設定的數量時,該階段屏障解除,所有線程進入到下一階段
            //pahseBarrier.await() 可以用來循環的創建屏障
            for (int phase = 0; phase < PAHSE; phase++) {
                doPhase(phase);
                pahseBarrier.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } finally {
            downLatch.countDown();
        }
    }

    protected void doPhase(int phase) {
        String name = Thread.currentThread().getName();
        System.out.println(name+"-MyTask BEGIN, context = "+context+", phase = "+phase);
        try {
            Thread.sleep(random.nextInt(3000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println(name+"-MyTask END, context = "+context+", phase = "+phase);
        }
    }
}

多線程設計模式:第五篇 - Future模式和兩階段終止模式