1. 程式人生 > >從Java future 到 Guava ListenableFuture實現異步調用

從Java future 到 Guava ListenableFuture實現異步調用

boolean .com 裏的 任務 mem 包含 緩存 extend tags

從Java future 到 Guava ListenableFuture實現異步調用

置頂 2016年04月24日 09:11:14 皮斯特勞沃 閱讀數:17570 標簽: java異步調用線程非阻塞 個人分類: 多線程異步調用總結 版權聲明:本文為博主原創文章,未經博主允許不得轉載。 https://blog.csdn.net/pistolove/article/details/51232004

本文是在學習中的總結,歡迎轉載但請註明出處:http://blog.csdn.net/pistolove/article/details/51232004


前言

隨著移動互聯網的蓬勃發展,手機App層出不窮,其業務也隨之變得錯綜復雜。針對於開發人員來說,可能之前的一個業務只需要調取一次第三方接口以獲取數據,而如今隨著需求的增加,該業務需調取多個不同的第三方接口。通常,我們處理方法是讓代碼同步順序的去調取這些接口。顯然,調取接口數量的增加必然會造成響應時間的增加,勢必會對系統性能造成一定影響。

為了保證系統響應迅速,需要尋找一種方法能夠使調取接口能夠異步執行,而java正好提供了類似的方法,在java.util.concurrent中包含了Future相關的類,運用其中的一些類可以進行異步計算,以減少主線程的等待時間。比如啟動一個main方法,main中又包含了若幹個其它任務,在不使用java future的情況下,main方法中的任務會同步阻塞執行,一個執行完成後,才能去執行另一個;如果使用java future,則main方法中的任務會異步執行,main方法不用等待一個任務的執行完成,只需往下執行就行。一個任務的執行結果又該怎麽獲取呢?這裏就需要用到Future接口中的isDone()方法來判斷任務是否執行完,如果完成完成則可獲取結果,如果沒有完成則需要等待,可見雖然主線程中的多個任務是異步執行,但是無法確定任務什麽時候執行完成,只能通過不斷去監聽以獲取結果,所以這裏是阻塞的。這樣,可能某一個任務執行時間很長會拖累整個主任務的執行。

針對這樣的情況,google對java.util.concurrent中的許多類進行封裝,最終產生了google guava框架,其中com.google.common.util中的ListenableFuture就是本文要敘述的重點。查看com.google.common.util,發現其中的很多類都是對java.util.concurrent的封裝,以增加特有的方法。ListenableFuture擴展了future方法,增加了addListener方法,該方法可以監聽線程,並通過回調函數來獲取結果,達到線程之間異步非阻塞執行。

首先,了解下同步、異步、阻塞、非阻塞相關概念;其次,簡單介紹java future和guava future相關技術,並通過示例代碼進一步對其進行理解;最後,對java future和guava future進行比較。


同步、異步、阻塞、非阻塞

同步:所謂同步,就是在發出一個功能調用時,在沒有得到結果之前,該調用就不返回。也就是必須一件一件事做,等前一件做完了才能做下一件事。

異步:異步的概念和同步相對。當一個異步過程調用發出後,調用者不能立刻得到結果。實際處理這個調用的部件在完成後,通過狀態、通知和回調來通知調用者。

阻塞:阻塞調用是指調用結果返回之前,當前線程會被掛起(線程進入非可執行狀態,在這個狀態下,cpu不會給線程分配時間片,即線程暫停運行)。函數只有在得到結果之後才會返回。

非阻塞:非阻塞和阻塞的概念相對應,指在不能立刻得到結果之前,該函數不會阻塞當前線程,而會立刻返回。


Java future

減少主函數的等待時間,使得原本需要等待的時間段可以處理其它事情

1、Executors創建線程池的幾種常見方式

通過Executors可以創建不同類似的線程池,常見的大概有下表幾種類型,還有些可能未被列出。在實際應用中,個人感覺主要使用newCachedThreadPool和newFixedThreadPool來創建線程池。

類名說明
newCachedThreadPool 緩存型池子,先查看池中有沒有以前建立的線程,如果有,就reuse;如果沒有,就建一個新的線程加入池中。緩存型池子通常用於執行一些生存期很短的異步型任務。因此在一些面向連接的daemon型SERVER中用得不多。能reuse的線程,必須是timeout IDLE內的池中線程,缺省timeout為60s,超過這個IDLE時長,線程實例將被終止並移出池子。註意:放入CachedThreadPool的線程超過TIMEOUT不活動,其會自動被終止。
newFixedThreadPool 和cacheThreadPool類似,有可用的線程就使用,但不能隨時建新的線程。其獨特之處:任意時間點,最多只能有固定數目的活動線程存在,此時如果有新的線程要建立,只能放在另外的隊列中等待,直到當前的線程中某個線程終止直接被移出池子。cache池和fixed池調用的是同一個底層池,只不過參數不同:fixed池線程數固定,並且是0秒IDLE(無IDLE)。所以FixedThreadPool多數針對一些很穩定很固定的正規並發線程,多用於服務器。cache池線程數支持0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE。
ScheduledThreadPool 調度型線程池。這個池子裏的線程可以按schedule依次delay執行,或周期執行。
SingleThreadExecutor 單例線程,任意時間池中只能有一個線程。用的是和cache池和fixed池相同的底層池,但線程數目是1-1,0秒IDLE(無IDLE)。

2、Executors創建線程池源碼

//調用newCachedThreadPool方法,可以創建一個緩沖型線程池,而在改方法中通過傳參創建一個ThreadPoolExecutor,我麽你會很奇怪明明返回的是一個ExecutorService,怎麽會創建了一個ThreadPoolExecutor呢?
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, 
                   TimeUnit.SECONDS, new SynchronousQueue<Runnable());
}

// ThreadPoolExecutor繼承了抽象的service類AbstractExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService {}

//AbstractExecutorService實現了ExecutorService接口
public abstract class AbstractExecutorService implements ExecutorService {}

//所以ExecutorService其實是ThreadPoolExecutor的基類,這也就解釋清楚了

3、ExecutorService(線程池)

ExecutorService是一個接口,它繼承了Executor,在原有execute方法的基礎上新增了submit方法,傳入一個任務,該方法能夠返回一個Future對象,可以獲取異步計算結果。

//ExecutorService繼承了Executor,並擴展了新方法。
public interface ExecutorService extends Executor { }

//Executor中的方法
void execute(Runnable command);

//增加了submit方法,該方法傳任務來獲取Future對象,而Future對象中可以獲取任務的執行結果
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);


4、Future(獲取異步計算結果)

Future接口中有下表所示方法,可以獲取當前正在執行的任務相關信息。

方法說明
boolean cancel(boolean interruptIf) 取消任務的執行
boolean isCancelled() 任務是否已取消,任務正常完成前將其取消,返回 true
boolean isDone() 任務是否已完成,任務正常終止、異常或取消,返回true
V get() 等待任務結束,然後獲取V類型的結果
V get(long timeout, TimeUnit unit) 獲取結果,設置超時時間

5、FutureTask

Executor框架利用FutureTask來完成異步任務,並可以用來進行任何潛在的耗時的計算。一般FutureTask多用於耗時的計算,主線程可以在完成自己的任務後,再去獲取結果。

FutureTask包裝了Callable和Runnable接口對象,提供對Future接口的基本實現,開始、取消計算、查詢計算是否完成、獲取計算結果。僅當計算完成時才能檢索結果,當計算沒有完成時,該方法會一直阻塞直到任務轉入完成狀態。一旦完成計算,不能夠重新開始或取消計算。通過Excutor(線程池)來執行,也可傳遞給Thread對象執行。如果在主線程中需要執行比較耗時的操作時,但又不想阻塞主線程時,可以把這些作業交給Future對象在後臺完成,當主線程將來需要時,就可以通過Future對象獲得後臺作業的計算結果或者執行狀態。

//通過傳入任務來構造FutureTask
public FutureTask(Callable<V> callable) {}
public FutureTask(Runnable runnable, V result) {}

//FutureTask中同樣有獲取當前任務狀態的方法
public boolean isCancelled(){}
public boolean isDone() {}
public boolean cancel(boolean mayInterruptIfRunning) {}

//FutureTask實現RunnableFuture
public class FutureTask<V> implements RunnableFuture<V> {}

//RunnableFuture繼承Runnable和Future
public interface RunnableFuture<V> extends Runnable, Future<V> 

6、示例代碼

package future.java;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestFuture {
    // 創建線程池
    final static ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Long t1 = System.currentTimeMillis();

        // 任務1
        Future<Boolean> booleanTask = service.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                return true;
            }
        });

        while (true) {
            if (booleanTask.isDone() && !booleanTask.isCancelled()) {
                //模擬耗時
                Thread.sleep(500);
                Boolean result = booleanTask.get();
                System.err.println("BooleanTask: " + result);
                break;
            }
        }

        // 任務2
        Future<String> stringTask = service.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Hello World";
            }
        });

        while (true) {
            if (stringTask.isDone() && !stringTask.isCancelled()) {
                String result = stringTask.get();
                System.err.println("StringTask: " + result);
                break;
            }
        }



        // 任務3
        Future<Integer> integerTask = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return new Random().nextInt(100);
            }
        });

        while (true) {
            if (integerTask.isDone() && !integerTask.isCancelled()) {
                Integer result = integerTask.get();
                System.err.println("IntegerTask: " + result);
                break;
            }
        }

        // 執行時間
        System.err.println("time: " + (System.currentTimeMillis() - t1));
    }

}

Guava future

(減少主函數的等待時間,使得多任務能夠異步非阻塞執行)


ListenableFuture是可以監聽的Future,它是對java原生Future的擴展增強。Future表示一個異步計算任務,當任務完成時可以得到計算結果。如果希望計算完成時馬上就拿到結果展示給用戶或者做另外的計算,就必須使用另一個線程不斷的查詢計算狀態。這樣做會使得代碼復雜,且效率低下。如果使用ListenableFuture,Guava會幫助檢測Future是否完成了,如果完成就自動調用回調函數,這樣可以減少並發程序的復雜度。

1、MoreExecutors

該類是final類型的工具類,提供了很多靜態方法。例如listeningDecorator方法初始化ListeningExecutorService方法,使用此實例submit方法即可初始化ListenableFuture對象。


2、ListeningExecutorService

該類是對ExecutorService的擴展,重寫ExecutorService類中的submit方法,返回ListenableFuture對象。


3、ListenableFuture

該接口擴展了Future接口,增加了addListener方法,該方法在給定的excutor上註冊一個監聽器,當計算完成時會馬上調用該監聽器。不能夠確保監聽器執行的順序,但可以在計算完成時確保馬上被調用。


4、FutureCallback

該接口提供了OnSuccess和OnFailuren方法。獲取異步計算的結果並回調。


5、Futures

該類提供和很多實用的靜態方法以供使用。


6、ListenableFutureTask

該類擴展了FutureTask類並實現ListenableFuture接口,增加了addListener方法。


7、實例代碼

package future.guava;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

public class TestListenableFuture2 {
    // 創建線程池
    final static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

    public static void main(String[] args) throws Exception {
        Long t1 = System.currentTimeMillis();
        // 任務1
        ListenableFuture<Boolean> booleanTask = service.submit(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                return true;
            }
        });

        Futures.addCallback(booleanTask, new FutureCallback<Boolean>() {
            @Override
            public void onSuccess(Boolean result) {
                System.err.println("BooleanTask: " + result);
            }

            @Override
            public void onFailure(Throwable t) {
            }
        });

        // 任務2
        ListenableFuture<String> stringTask = service.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Hello World";
            }
        });

        Futures.addCallback(stringTask, new FutureCallback<String>() {
            @Override
            public void onSuccess(String result) {
                System.err.println("StringTask: " + result);
            }

            @Override
            public void onFailure(Throwable t) {
            }
        });

        // 任務3
        ListenableFuture<Integer> integerTask = service.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return new Random().nextInt(100);
            }
        });

        Futures.addCallback(integerTask, new FutureCallback<Integer>() {
            @Override
            public void onSuccess(Integer result) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.err.println("IntegerTask: " + result);
            }

            @Override
            public void onFailure(Throwable t) {
            }
        });

        // 執行時間
        System.err.println("time: " + (System.currentTimeMillis() - t1));
    }

}

Java future 和 Guava future的對比

Future 具有局限性。在實際應用中,當需要下載大量圖片或視頻時,可以使用多線程去下載,提交任務下載後,可以從多個Future中獲取下載結果,由於Future獲取任務結果是阻塞的,所以將會依次調用Future.get()方法,這樣的效率會很低。很可能第一個下載速度很慢,則會拖累整個下載速度。

Future主要功能在於獲取任務執行結果和對異步任務的控制。但如果要獲取批量任務的執行結果,從上面的例子我們已經可以看到,單使用 Future 是很不方便的。其主要原因在於:一方面是沒有好的方法去判斷第一個完成的任務;另一方面是 Future的get方法 是阻塞的,使用不當會造成線程的浪費。第一個問題可以用 CompletionService 解決,CompletionService 提供了一個 take() 阻塞方法,用以依次獲取所有已完成的任務。第二個問題可以用 Google Guava 庫所提供的 ListeningExecutorService 和 ListenableFuture 來解決。除了獲取批量任務執行結果時不便,Future另外一個不能做的事便是防止任務的重復提交。要做到這件事就需要 Future 最常見的一個實現類 FutureTask 了。Future只實現了異步,而沒有實現回調,主線程get時會阻塞,可以輪詢以便獲取異步調用是否完成。

在實際的使用中建議使用Guava ListenableFuture來實現異步非阻塞,目的就是多任務異步執行,通過回調的方方式來獲取執行結果而不需輪詢任務狀態。

希望本文對你有所幫助。下一篇文章將講解RestTemplate和AsyncRestTemplate相關技術。

從Java future 到 Guava ListenableFuture實現異步調用