1. 程式人生 > >執行緒學習二:執行緒池執行Runnable與Callable

執行緒學習二:執行緒池執行Runnable與Callable

1、瞭解執行緒池幾個類之間的關係 (結合圖看後面的分析)

     

FutureTask  繼承 RunnableFuture , RunnableFuture 實現介面 Runnable

2、分析常用呼叫執行緒池程式碼(以下稱為程式碼A

ExecutorService es = Executors.newFixedThreadPool(15);
		es.submit(new Callable<String>(){

			@Override
			public String call() throws Exception {
				return null;
			}});
		es.submit(new myThread());

1)首先我們看看Executors.newFixedThreadPool()這個方法的原始碼

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

  可以看到,它實際返回的是ThreadPoolExecutor物件es

2)再看看es.submit()這個方法是如何實現的

      在原始碼中,我們會發現ThreadPoolExecutor並沒有方法submit,它是呼叫了父類abstractExecutorService.submit()方法,原始碼如下

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }


    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

可以看到,abstractExecutorService是把Callable<T> 物件和 Runnable物件轉換為了RunnbaleFuture<T>的實現類 FutureTask<T>,然後呼叫execute()方法。

而ThreadPoolExecutor類中是重寫了execute()方法的,所以程式碼A中的es.submit實際上執行的是ThreadPoolExecutor.executor().

3)瞭解Callable 與 Runnable轉換為FutureTask<T>   (這裡不截原始碼了) 與 FutureTask的run方法。

FutureTask<T>中有個Callable<T>屬性,如果是Callable轉,那麼直接賦值就好了,如果是Runnable轉,那麼Eexcutors中有個類RunnableAdapter,將Runnable物件賦值給一個RunnableAdapter物件ra,然後把ra賦值給FutureTask中的Callable屬性就好了。

static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }
FutureTask<T>通過父類RunnableFuture也是實現了Runnable介面的 ,  所以執行也是start方法 。 run原始碼
public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
結果還是把Callable的call()方法丟到了Runnable的run()方法中 ,

看到裡面的set(result)沒有,就是通過一個全域性變數,變相的返回執行緒執行結果,獲取是FutureTask.get()方法。

4)再看看ThreadPoolExecutor.executor()是如何實現的 ,具體後續分析

  <span style="color:#000000;"> public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }</span>

上面的addWorker(COMMAND,FLAG)方法中,滿足條件會構建new Thread(FutureTask) t,呼叫t.start();

邏輯圖


5)終於到最後了,可以說我最想說的話了 。

    之前沒看程式碼,狗日的以為Callable是個很神奇的東西,就是Thread的兄弟,使用起來也挺神祕,只見過再ExecutorService.submit()見到過 。

現在才知道,Callable僅僅只是為執行緒執行的時候能夠返回物件,且僅僅只能在ExecutorService.submit()中使用,其他無卵用。

繞來繞去還是Thread與Runnable的start方法與run方法啊。