1. 程式人生 > >【並發編程】Future模式及JDK中的實現

【並發編程】Future模式及JDK中的實現

[] 代碼 inter jpg 分離 src span 最終 提高

1.1、Future模式是什麽

先簡單舉個例子介紹,當我們平時寫一個函數,函數裏的語句一行行同步執行,如果某一行執行很慢,程序就必須等待,直到執行結束才返回結果;但有時我們可能並不急著需要其中某行的執行結果,想讓被調用者立即返回。比如小明在某網站上成功創建了一個賬號,創建完賬號後會有郵件通知,如果在郵件通知時因某種原因耗時很久(此時賬號已成功創建),使用傳統同步執行的方式那就要等完這個時間才會有創建成功的結果返回到前端,但此時賬號創建成功後我們並不需要立即關心郵件發送成功了沒,此時就可以使用Future模式,讓安在後臺慢慢處理這個請求,對於調用者來說,則可以先處理一些其他任務,在真正需要數據的場合(比如某時想要知道郵件發送是否成功)再去嘗試獲取需要的數據。

使用Future模式,獲取數據的時候可能無法立即得到需要的數據。而是先拿到一個包裝,可以在需要的時候再去get獲取需要的數據。

1.2、Future模式與傳統模式的區別

先看看請求返回的時序圖,明顯傳統的模式是串行同步執行的,在遇到耗時操作的時候只能等待。反觀Future模式,發起一個耗時操作後,函數會立刻返回,並不會阻塞客戶端線程。所以在執行實際耗時操作時候客戶端無需等待,可以做其他事情,直到需要的時候再向工作線程獲取結果。

技術分享圖片

2.1、動手實現簡易Future模式

下面的DataFuture類只是一個包裝類,創建它時無需阻塞等待。在工作線程準備好數據後使用setRealData方法將數據傳入。客戶端只要在真正需要數據時調用getRealData方法即可,如果此時數據已準備好則立即返回,否則getRealData方法就會等待,直到獲取數據完成。

public class DataFuture<T> {
    private T realData;
    private boolean isOK = false;

    public synchronized T getRealData() {
        while (!isOK) {
            try {
                // 數據未準備好則等待
                wait();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
return realData; } public synchronized void setRealData(T data) { isOK = true; realData = data; notifyAll(); } }

下面實現一服務端,客戶端向服務端請求數據時,服務端並不會立刻去加載真正數據,只是創建一個DataFuture,創建子線程去加載真正數據,服務端直接返回DataFuture即可。

public class Server {
    
    public DataFuture<String> getData() {
        final DataFuture<String> data = new DataFuture<>();
        Executors.newSingleThreadExecutor().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                data.setRealData("最終數據");
            }
        });
        return data;
    }
}

最終客戶端調用 代碼如下:

long start = System.currentTimeMillis();
Server server = new Server();
DataFuture<String> dataFuture = server.getData();

try {
    // 先執行其他操作
    Thread.sleep(5000);
    // 模擬耗時...
} catch (InterruptedException e) {
    e.printStackTrace();
}

System.out.print("結果數據:" + dataFuture.getRealData());
System.out.println("耗時: " + (System.currentTimeMillis() - start));

結果:

結果數據:最終數據
耗時: 5021

執行最終數據耗時都在5秒左右,如果串行執行的話就是10秒左右。

2.2、JDK中的Future與FutureTask

先來看看Future接口源碼:

public interface Future<V> {
/** * 用來取消任務,取消成功則返回true,取消失敗則返回false。 * mayInterruptIfRunning參數表示是否允許取消正在執行卻沒有執行完畢的任務,設為true,則表示可以取消正在執行過程中的任務。 * 如果任務已完成,則無論mayInterruptIfRunning為true還是false,此方法都返回false,即如果取消已經完成的任務會返回false; * 如果任務正在執行,若mayInterruptIfRunning設置為true,則返回true,若mayInterruptIfRunning設置為false,則返回false; * 如果任務還沒有執行,則無論mayInterruptIfRunning為true還是false,肯定返回true。 */ boolean cancel(boolean mayInterruptIfRunning); /** * 表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回true */ boolean isCancelled(); /** * 表示任務是否已經完成,若任務完成,則返回true */ boolean isDone(); /** * 獲取執行結果,如果最終結果還沒得出該方法會產生阻塞,直到任務執行完畢返回結果 */ V get() throws InterruptedException, ExecutionException; /** * 獲取執行結果,如果在指定時間內,還沒獲取到結果,則拋出TimeoutException */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

從上面源碼可看出Future就是對於Runnable或Callable任務的執行進行查詢、中斷任務、獲取結果。下面就以一個計算1到1億的和為例子,看使用傳統方式和使用Future耗時差多少。先看傳統方式代碼:

public class FutureTest {

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        List<Integer> retList = new ArrayList<>();

        // 計算1000次1至1億的和
        for (int i = 0; i < 1000; i++) {
            retList.add(Calc.cal(100000000));
        }
        System.out.println("耗時: " + (System.currentTimeMillis() - start));

        for (int i = 0; i < 1000; i++) {
            try {
                Integer result = retList.get(i);
                System.out.println("第" + i + "個結果: " + result);
            } catch (Exception e) {
            }
        }
        System.out.println("耗時: " + (System.currentTimeMillis() - start));
    }

    public static class Calc implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            return cal(10000);
        }

        public static int cal (int num) {
            int sum = 0;
            for (int i = 0; i < num; i++) {
                sum += i;
            }
            return sum;
        }
    }
}

執行結果(耗時40+秒):

耗時: 43659
第0個結果: 887459712
第1個結果: 887459712
第2個結果: 887459712
...
第999個結果: 887459712
耗時: 43688

再來看看使用Future模式下程序:

public class FutureTest {

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        ExecutorService executorService = Executors.newCachedThreadPool();
        List<Future<Integer>> futureList = new ArrayList<>();

        // 計算1000次1至1億的和
        for (int i = 0; i < 1000; i++) {
            // 調度執行
            futureList.add(executorService.submit(new Calc()));
        }
        System.out.println("耗時: " + (System.currentTimeMillis() - start));

        for (int i = 0; i < 1000; i++) {
            try {
                Integer result = futureList.get(i).get();
                System.out.println("第" + i + "個結果: " + result);
            } catch (InterruptedException | ExecutionException e) {
            }
        }
        System.out.println("耗時: " + (System.currentTimeMillis() - start));
    }

    public static class Calc implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            return cal(100000000);
        }

        public static int cal (int num) {
            int sum = 0;
            for (int i = 0; i < num; i++) {
                sum += i;
            }
            return sum;
        }
    }
}

執行結果(耗時12+秒):

耗時: 12058
第0個結果: 887459712
第1個結果: 887459712
...
第999個結果: 887459712
耗時: 12405

可以看到,計算1000次1至1億的和,使用Future模式並發執行最終的耗時比使用傳統的方式快了30秒左右,使用Future模式的效率大大提高。

2.3、FutureTask

說完Future,Future因為是接口不能直接用來創建對象,就有了下面的FutureTask。

先看看FutureTask的實現:

public class FutureTask<V> implements RunnableFuture<V>

可以看到FutureTask類實現了RunnableFuture接口,接著看RunnableFuture接口源碼:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

可以看到RunnableFuture接口繼承了Runnable接口和Future接口,也就是說其實FutureTask既可以作為Runnable被線程執行,也可以作為Future得到Callable的返回值。

看下面FutureTask的兩個構造方法,可以看出就是為這兩個操作準備的。

public FutureTask(Callable<V> var1) {
    if (var1 == null) {
        throw new NullPointerException();
    } else {
        this.callable = var1;
        this.state = 0;
    }
}

public FutureTask(Runnable var1, V var2) {
    this.callable = Executors.callable(var1, var2);
    this.state = 0;
}

FutureTask使用實例:

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Calc task = new Calc();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();
    }

    public static class Calc implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            return cal(100000000);
        }

        public static int cal (int num) {
            int sum = 0;
            for (int i = 0; i < num; i++) {
                sum += i;
            }
            return sum;
        }
    }
}

2.4、Future不足之處

上面例子可以看到使用Future模式比傳統模式效率明顯提高了,使用Future一定程度上可以讓一個線程池內的任務異步執行;但同時也有個明顯的缺點:就是回調無法放到與任務不同的線程中執行,傳統回調最大的問題就是不能將控制流分離到不同的事件處理器中。比如主線程要等各個異步執行線程返回的結果來做下一步操作,就必須阻塞在future.get()方法等待結果返回,這時其實又是同步了,如果遇到某個線程執行時間太長時,那情況就更糟了。

到Java8時引入了一個新的實現類CompletableFuture,彌補了上面的缺點,在下篇會講解CompletableFuture的使用。

作者註:原文發表在公號(點擊查看),定期分享IT互聯網、金融等工作經驗心得、人生感悟,歡迎訂閱交流,目前就職阿裏-移動事業部,需要大廠內推的也可到公號砸簡歷。(公眾號ID:weknow619)

技術分享圖片

【並發編程】Future模式及JDK中的實現