1. 程式人生 > >Java 併發程式設計——Callable+Future+FutureTask

Java 併發程式設計——Callable+Future+FutureTask

專案中經常有些任務需要非同步(提交到執行緒池中)去執行,而主執行緒往往需要知道非同步執行產生的結果,這時我們要怎麼做呢?用runnable是無法實現的,我們需要用callable實現。

複製程式碼
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public
class AddTask implements Callable<Integer> { private int a,b; public AddTask(int a, int b) { this.a = a; this.b = b; } @Override public Integer call() throws Exception { Integer result = a + b; return result; } public static void
main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newSingleThreadExecutor(); //JDK目前為止返回的都是FutureTask的例項 Future<Integer> future = executor.submit(new AddTask(1, 2)); Integer result = future.get();// 只有當future的狀態是已完成時(future.isDone() = true),get()方法才會返回
} }
複製程式碼

Callable介面                                                                                                     

Callable介面Runable介面可謂是兄弟關係,只不過Callable是帶返回值的。

複製程式碼
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
複製程式碼

Future 介面                                                                                                        

介面函式及含義 :public interface Future<V>

 boolean  cancel(boolean mayInterruptIfRunning)

取消當前執行的任務,如果已經執行完畢或者已經被取消/由於某種原因不能被取消 則取消任務失敗。

引數mayInterruptIfRunning: 當任務正在執行,如果引數為true ,則嘗試中斷任務,否則讓任務繼續執行知道結束。

boolean isCancelled()
Returns {@code true} if this task was cancelled before it completed
* normally.
boolean isDone();
/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/

V get() throws InterruptedException, ExecutionException;

/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
由註釋可以看出,當沒有執行完成時,需要等待任務執行完成了才會將計算結果返回。

V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.

如果等待的時間超過設定的時間則會報 TimeoutException異常

FutureTask                                                                                                     

public class FutureTask<V> implements RunnableFuture<V>

由定義可以看出它實現了RunnableFuture介面,那麼這個介面又是什麼呢?看下面的介面定義,其實很簡單

複製程式碼
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
複製程式碼

再回到FutureTask,它其實就是實現了Runnable和Future介面,FutureTask的執行是 狀態轉換的過程,原始碼中有七種狀態如下:

複製程式碼
  * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
複製程式碼

當FutureTask剛剛被建立時,它的狀態是NEW,其它狀態檢視原始碼。

其它成員變數:

複製程式碼
 /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
複製程式碼

callable是待執行的任務,FutureTask 的 run()函式中執行callable中的任務。

outcome : 是callable的執行結果,當正常執行完成後會將結果set到outcome中

runner:是執行callable 的執行緒

WaitNode : 是的受阻塞的執行緒連結串列,當cancel一個任務後,阻塞的執行緒會被喚醒。

建構函式:

複製程式碼
public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
複製程式碼

從建構函式可以看出,不光可以通過callable構造FutureTask還可以通過Runnable介面轉化為callable來構造。關鍵函式為黃色標記部分,Executors中的實現原始碼如下:

複製程式碼
/**
     * A callable that runs given task and returns given result.
     */
    private static final class RunnableAdapter<T> implements Callable<T> {
        private final Runnable task;
        private final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }
複製程式碼

這裡面不懂result到底有什麼意義,明明就是預先設定好的。

其它具體的方法說明這裡不再細說,裡面用到了很多sun.misc.Unsafe中的方法以及其他SDK底層介面,後續有時間再學習。下面貼出了整個原始碼及說明

 View Code

FutureTask簡單應用:

複製程式碼
public class FutureMain {
    public static void main(String[] args)
            throws ExecutionException, InterruptedException {
        //構造FutureTask
        FutureTask<String> futureTask = new FutureTask<String>(new CallableClass("xxx"));
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        //執行FutureTask,傳送請求
        //在這裡開啟執行緒進行RealData的call()執行
        executorService.submit(futureTask);

        System.out.println("請求完畢。。。");
        try {
            //這裡可以進行其他額外的操作,這裡用sleep代替其他業務的處理
            Thread.sleep(200);
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
        //獲取call()方法的返回值
        //如果此時call()方法沒有執行完成,則依然會等待
        System.out.println("真實資料:"+futureTask.get());
    }
}
複製程式碼

 www.kzo4326.cn
www.tso8557.cn
www.nwm1536.cn
www.tae4138.cn
www.dxc3579.cn
www.ffp5727.cn
www.lhb4836.cn
www.xdi0113.cn
www.bpa2365.cn
www.fks8445.cn
www.aht8537.cn
www.cun5054.cn
www.gdk7028.cn
www.ypk8666.cn
www.wan2959.cn
www.sit9945.cn
www.zmj4226.cn
www.ccn6233.cn
www.jck8045.cn
www.ckk6213.cn
www.mak1390.cn
www.vii0197.cn
www.pwj5001.cn
www.wvh4263.cn
www.mvg0339.cn
www.yif9712.cn
www.jta0960.cn
www.omx8816.cn
www.nlc4773.cn
www.dep9137.cn
www.vlq7732.cn
www.umg2515.cn
www.kog1435.cn
www.nxf9936.cn
www.hqh7518.cn
www.hij5984.cn
www.vui9639.cn
www.fzl7156.cn
www.wue0833.cn
www.dye6768.cn
www.ryh7899.cn
www.lij0467.cn
www.epv8502.cn
www.lru8400.cn
www.mtr5072.cn
www.kbs9896.cn
www.qfk7654.cn
www.myb6827.cn
www.tcr0461.cn
www.xua4102.cn
www.tzn6024.cn
www.kme4313.cn
www.bnb6875.cn
www.yio4898.cn
www.yat8046.cn