1. 程式人生 > >Java Executor併發框架(十三)Executor框架執行緒池關於異常的處理

Java Executor併發框架(十三)Executor框架執行緒池關於異常的處理

關於為什麼要寫這篇文章,是因為我對Executor執行緒池的兩種提交任務的方式的不同產生的好奇,我們知道,可以通過execute和submit兩種方式往執行緒池提交我們的任務,但是這兩種任務提交的方式到底有什麼區別呢?通過execute方式提交的任務,我們不能獲取任務執行後的返回值,而通過submit提交任務的方式,我們是可以獲取任務執行結束後產生的結果。那麼另一個區別就是關於異常的問題了,請看下文。

二、問題的研究

ok, 我們寫一個類,繼承ThreadPoolExecutor,命名為MyThreadPoolExecutor,並重寫然後使用MyThreadPoolExecutor建立一個執行緒池。

MyThreadPoolExecutor程式碼:

package com.npf;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

	public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}

	@Override
	protected void afterExecute(Runnable r, Throwable t) {
		super.afterExecute(r, t);
		System.out.println("MyThreadPoolExecutor.afterExecute()");
	}

}

一看afterExecute這個方法,想當然的以為如果執行緒池中出了問題,異常自然回在第二個引數 t 中傳過來。也許的確是這樣的,但是這裡有一個區別。我們知道ExecutorServcie中執行一個Runnable有三個方法,分別是:
第一種,Executor介面中定義的execute方法:


第二種,AbstractExecutorService實現的submit(Runnable task)方法。


第三種,AbstractExecutorService實現的submit(Runnable task,T result)方法。


後面兩種提交runnable任務的方式,內部都呼叫了newTaskFor(Callable<T> callable)方法:


通過上面,我們知道,後面兩種提交runnable任務的方式,內部都是將runnable任務封裝成了FutureTask,然後再執行execute方法,我們知道execute方法是定義在Executor介面中的方法,它接收的引數是Runnable型別,為什麼我們的FutureTask也符合execute方法的引數定義呢?
請看FutureTask的定義:



FutureTask實現了RunnableFuture,而RunnableFuture繼承了Runnable, Future<V>介面。


綜上我們知道,到最後都是會呼叫execute方法。execute方法對進來的Runnable又包裝成了worker然後進入runWorker
runWorker
方法中有這麼幾行:
try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
       task.run();
    } catch (RuntimeException x) {
       thrown = x; throw x;
    } catch (Error x) {
       thrown = x; throw x;
    } catch (Throwable x) {
       thrown = x; throw new Error(x);
    } finally {
       afterExecute(task, thrown);
    }
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}

好了,到了最關鍵的afterExecute這個步驟,我滿心以為這裡所有的異常都會通過thrown傳遞進來,看來我還是太年輕了,之前我們分析過,這個Runnable已經被submit封裝成了FutureTask,那麼這個task.run()除了我們自己定義的run任務之外,到底還幹了啥呢?
因為如果我們通過submit的方式提交Runnable的物件被封裝成了FutureTask, 那麼這裡的task.run()這句程式碼的task物件實際上就是FutureTask物件,那麼這個run也就是FutureTask的方法,下面我們貼出FutureTask的run方法的原始碼:
public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
實際我們自己定義的任務已經變成了Callable物件,那麼這裡的Callable物件的實現類到底是誰呢?
也就是Callable<V> c = callable; 這個callable的具體型別到底是什麼?請看下面的分析。

我們知道,通過submit的方式提交Runnable的物件被封裝成了FutureTask的關鍵地方就是:


那麼我們進入到FutureTask的這個構造方法裡面去看:


再進入到Executors.callable(runnable, result)方法裡面去看:


再進入到RunnableAdapter裡面去看,很簡單的發現,這就是一個典型的介面卡模式:

到這裡已經很清楚了,這個callable物件就是RunnableAdapter。

也就是說,如果我們使用submit的方式提交Runnable的物件,那麼這個callable物件就是RunnableAdapter,然後程式下面的result = c.call(); 的時候,實際就是執行RunnableAdapter的call()方法, 在這個call方面裡面,實際呼叫了我們提交的Runnable物件的run方法。我們寫的任務全部在這句程式碼裡面執行完畢了,看看外面都wrap了啥? OK,我們所有的Throwable全部已經被setException吃掉了,怎麼還會丟擲到外面那層的execute中呢?所以在submit中提交任務無論任務怎麼拋異常,在afterExecute中的第二個引數是取不到的,原因就在這。

如果我們需要獲取這個異常怎麼辦呢?通過FutureTask的get方法,能把剛剛setException中的異常給丟擲來,這樣我們就能真的拿到這些異常了。
protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    Future<?> f = (Future<?>) r;
    try {
        f.get();
    } catch (InterruptedException e) {
        logger.error("執行緒池中發現異常,被中斷", e);
    } catch (ExecutionException e) {
        logger.error("執行緒池中發現異常,被中斷", e);
    }

}

如果我們直接通過execute方法提交任務,那麼上面剛才我們分析的程式碼片段中:
try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
       task.run();
    } catch (RuntimeException x) {
       thrown = x; throw x;
    } catch (Error x) {
       thrown = x; throw x;
    } catch (Throwable x) {
       thrown = x; throw new Error(x);
    } finally {
       afterExecute(task, thrown);
    }
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}
task.run()中,這個task就是實際的Runnable物件,那麼如果這裡發生異常的話,afterExecute這個方法的第二個引數thrown就會有異常資訊了。

三、結論

如果我們關心執行緒池執行的結果,則需要使用submit來提交task,那麼在afterExecute中對異常的處理也需要通過Future介面呼叫get方法去取結果,才能拿到異常,如果我們不關心這個任務的結果,可以直接使用ExecutorService中的execute方法(實際是繼承Executor介面)來直接去執行任務,這樣的話,我們的Runnable沒有經過多餘的封裝,在runWorker中得到的異常也直接能在afterExecute中捕捉。




參考文獻:
1. Java ExecutorService執行緒池中的小坑——關於執行緒池中丟擲的異常處理