1. 程式人生 > >Android 併發二三事之AsyncTask

Android 併發二三事之AsyncTask

Android 併發第四篇

前言:

本篇主要詳解AsyncTask 的原始碼,關於 AsyncTask 的原始碼其實有太多人都寫過了。這裡為什麼還要寫,
是因為博主在併發系列中寫AsyncTask的原始碼,是想通過從併發的角度去理解AsyncTask 為什麼這樣設計。
我們可以看到 AsyncTask 其中用到了 之前文章中設計到的 FutureTask, 以及Callable,執行緒池等等。
我們也可以藉此複習一下。

一 AsyncTask 的使用:
程式碼:

private void requestAsy() {
        AsyncTask<String, Integer, String> asyncTask = new
AsyncTask<String, Integer, String>() { @Override protected String doInBackground(String... params) { List<ResponInfo> responInfos = getInfos(); long price = 0; for(ResponInfo responInfo : responInfos) { if
(responInfo.getName().equals(params[0])) { price = responInfo.getPrice(); } } return String.valueOf(price); } @Override protected void onPreExecute() { super.onPreExecute(); } @Override
protected void onPostExecute(String s) { super.onPostExecute(s); Log.d(TAG,"Asy : "+s); } }; //無參 asyncTask.execute(); //或 有參 asyncTask.execute("BMW"); //或 傳入我們自定義的執行緒池 asyncTask.executeOnExecutor(Executors.newFixedThreadPool(10)); } private List<ResponInfo> getInfos() { List<ResponInfo> list = new ArrayList<>(); list.add(new ResponInfo("BMW", 2000)); list.add(new ResponInfo("BYD", 200)); try { Thread.sleep(3000); }catch (Exception e){ } return list; }

二 原始碼:

1、 execute() 方法:

    @MainThread
    public final AsyncTask<Params, Progress, Result> execute(Params... params) {
        return executeOnExecutor(sDefaultExecutor, params);
    }

execute 呼叫的也是 executeOnExecutor()方法,只不過傳入的是程式碼內的預設執行緒池 sDefaultExecutor。
sDefaultExecutor 這個引數我們稍後再說,現在只要記得它是一個執行緒池就好了,可以傳入 Runnable,或者 Callable。

2、 executeOnExecutor原始碼:

    @MainThread
    public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
            Params... params) {
        if (mStatus != Status.PENDING) {
            switch (mStatus) {
                case RUNNING:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task is already running.");
                case FINISHED:
                    throw new IllegalStateException("Cannot execute task:"
                            + " the task has already been executed "
                            + "(a task can be executed only once)");
            }
        }

        mStatus = Status.RUNNING;

        onPreExecute();

        mWorker.mParams = params;
        exec.execute(mFuture);

        return this;
    }

當我們呼叫executeOnExecutor() 方法,將不會用AsyncTask 自己的執行緒池,而改用我們所傳入的執行緒池。

在原始碼中還有這樣的方法:

/** @hide */
    public static void setDefaultExecutor(Executor exec) {
        sDefaultExecutor = exec;
    }

設定預設的執行緒池。
不過在最新的版本中被隱藏了,因為有executeOnExecutor() 方法,就用不上這個方法了。

在 executeOnExecutor() 方法中我們看到其呼叫了onPreExecute() 方法,所以當我們重寫 onPreExecute() 方法後, onPreExecute() 會執行在主
執行緒之中。

我們會將傳入的引數傳給 mWorker.mParams 。
然後呼叫執行緒池執行 mFuture。mFuture 是 FutureTask 根據第一篇文章中的介紹 mFuture 既是 Runnable, 又是 Future。
所以其可以作為一個任務(Runnable)提交給執行緒池,也可以作為一個Future,從執行緒池中獲取結果。

3 、看構造方法原始碼:

public AsyncTask() {
        mWorker = new WorkerRunnable<Params, Result>() {
            public Result call() throws Exception {
                mTaskInvoked.set(true);
        //設定執行緒的優先順序
                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
        //在子執行緒中執行耗時操作
                Result result = doInBackground(mParams);
                Binder.flushPendingCommands();
        //將結果post出來
                return postResult(result);
            }
        };

        mFuture = new FutureTask<Result>(mWorker) {
            @Override
            protected void done() {
                try {
                    postResultIfNotInvoked(get());
                } catch (InterruptedException e) {
                    android.util.Log.w(LOG_TAG, e);
                } catch (ExecutionException e) {
                    throw new RuntimeException("An error occurred while executing doInBackground()",
                            e.getCause());
                } catch (CancellationException e) {
                    postResultIfNotInvoked(null);
                }
            }
        };
    }
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
        Params[] mParams;
    }

mWorker 本身是一個Callable ,mFuture 是 FutureTask 。FutureTask 在第二篇文章中,有過介紹,其實現了RunnableFuture介面,
而RunnableFuture 介面同時繼承了Runnable和Future 。所以FutureTask 可以作為一個任務被提交給執行緒池。這裡FutureTask 包裝了 mWorker。
mWorker 才是真正的任務主體。
mWorker以及 mFuture都是在建構函式中初始化的。

mWorker 作為任務主體,被提交到執行緒池中,其call() 方法將會執行在子執行緒之中。而在call() 方法中,呼叫了doInBackground()方法
所以doInBackground()方法將會執行在子執行緒之中。並且將mWorker.mParams 傳給 doInBackground()方法,所以doInBackground()方法
的引數就是我們 在呼叫 execute() 和 executeOnExecutor()傳入的引數,其型別便是我們在 new AsyncTask() 時的三個泛型中第一個。

在執行過耗時任務之後。會呼叫 postResult() 方法將結果post出來。

    private Result postResult(Result result) {
        @SuppressWarnings("unchecked")
        Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
                new AsyncTaskResult<Result>(this, result));
    //傳送訊息
        message.sendToTarget();
        return result;
    }

這裡getHandler()方法返回的是InternalHandler。

    private static class InternalHandler extends Handler {
        public InternalHandler() {
            super(Looper.getMainLooper());
        }

        @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
        @Override
        public void handleMessage(Message msg) {
            AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
            switch (msg.what) {
                case MESSAGE_POST_RESULT:
                    // There is only one result
                    result.mTask.finish(result.mData[0]);
                    break;
                case MESSAGE_POST_PROGRESS:
                    result.mTask.onProgressUpdate(result.mData);
                    break;
            }
        }
    }

傳送了 MESSAGE_POST_RESULT 的訊息後,在handleMessage() 中呼叫了result.mTask.finish(result.mData[0]);
result 是 new AsyncTaskResult(this, result) 。

而AsyncTaskResult 只是封裝了真正的資料,以及當前 AsyncTask 物件。

    private static class AsyncTaskResult<Data> {
        final AsyncTask mTask;
        final Data[] mData;

        AsyncTaskResult(AsyncTask task, Data... data) {
            mTask = task;
            mData = data;
        }
    }

result.mTask.finish(result.mData[0]);
這行程式碼實際上呼叫了 AsyncTask 中的finish() 方法。

    private void finish(Result result) {
        if (isCancelled()) {
            onCancelled(result);
        } else {
            onPostExecute(result);
        }
        mStatus = Status.FINISHED;
    }

finish() 方法中則呼叫了 onPostExecute() 方法。這個流程就是典型的,子執行緒傳送訊息通知主執行緒了,所以onPostExecute() 方法會執行在主執行緒中。

那麼說到這裡,整個流程差不多快說完了。但是我們還沒有介紹 FutureTask 也就是mFuture 到底是用來做什麼,為什麼要用他呢?
按照之前的羅邏輯完全可以不用 mFuture 來包裝 mWorker 。mWorker 就可以直接被提交給執行緒池來完成任務。

4 、下面是 FuturenTask 部分原始碼:

    //FuturenTask 包裝了一個 Callable。
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public void run() {
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
            //在這裡執行了 Callable 的 call() 方法。獲取結果。
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    protected void set(V v) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = v;
            U.putOrderedInt(this, STATE, NORMAL); // final state
            finishCompletion();
        }
    }

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
    //最後會呼叫done()方法。
        done();

        callable = null;        // to reduce footprint
    }

在 AsyncTask 的原始碼中重寫了done() 方法,其中呼叫了 postResultIfNotInvoked() 方法。

    private void postResultIfNotInvoked(Result result) {
        final boolean wasTaskInvoked = mTaskInvoked.get();
        if (!wasTaskInvoked) {
            postResult(result);
        }
    }

postResultIfNotInvoked() 也會呼叫 postResult() 方法,但是其不一定會執行。
因為在mWorker 的 call() 方法中已經設定 為 true 了。

mTaskInvoked.set(true);

所以我在這裡推測之所以這麼寫,是防止出現異常,導致出現沒有回撥結果的情況。在正常的情況下,postResultIfNotInvoked() 方法是不會
呼叫 postResult() 方法的。

5 、那麼為什麼要用 FutureTask 呢? (當然這是我認為,可能有錯)

下看 AsyncTask 中的其他幾個方法:

    public final boolean cancel(boolean mayInterruptIfRunning) {
        mCancelled.set(true);
        return mFuture.cancel(mayInterruptIfRunning);
    }

    public final Result get() throws InterruptedException, ExecutionException {
        return mFuture.get();
    }

    public final Result get(long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException {
        return mFuture.get(timeout, unit);
    }

當呼叫這想通過 get() 方法直接獲取結果時,可以直接呼叫 AsyncTask.get() 等待結果的返回。
可以看到 AsyncTask 中的這幾個方法,全部都是依託於 FutureTask 實現。當前執行緒阻塞,等待結果的返回。
這就是我猜測其用 FutureTask 的原因。可以直接獲取結果。

6、 下面來看最開始放在一邊的預設的執行緒池 : SERIAL_EXECUTOR

    public static final Executor SERIAL_EXECUTOR = new SerialExecutor();

    private static class SerialExecutor implements Executor {
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;

        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }

可以看到的是 SerialExecutor 只是實現了 Executor 介面,它只是重寫了execute() 方法。將Runnable 包裝一下,放進了佇列之中。
它並不是一個真正的執行緒池。將Runnable 放入佇列中後,呼叫了 scheduleNext() 方法。

其實在 scheduleNext() 方法中才是真正的呼叫執行緒池執行任務。
THREAD_POOL_EXECUTOR:

    public static final Executor THREAD_POOL_EXECUTOR
            = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
                    TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);


    private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);

THREAD_POOL_EXECUTOR 才是一個真正的執行緒池。關於 THREAD_POOL_EXECUTOR 到底是一個什麼樣的執行緒池,可以去看第一篇文章,
那裡有詳細的介紹。

那為什麼 AsyncTask 要搞出 SerialExecutor 這麼個鬼呢?

仔細研究其邏輯:

第一此呼叫時,會把Runnable 放到佇列中,然後執行 scheduleNext() 方法,在 scheduleNext() 中判斷佇列是否存在,是否有數值。
有數值則呼叫執行緒池執行。如果這個時候又添加了一個任務有是被加到佇列中,mActive 不為空,所以不會呼叫 scheduleNext() 方法執行任務。
所以這個任務會放在佇列之中。這時上一個任務執行完了,會再次呼叫 scheduleNext() 方法,從佇列中獲取任務,如果有,則執行。

所以廢了這麼大的勁,其實就是想維護一個每次只能執行一個任務的執行緒池,所有的任務會依次執行。

當然如何你不想這樣,希望每次可以執行很多工,那就呼叫 executeOnExecutor() 方法,傳入自定義的執行緒池,或者 THREAD_POOL_EXECUTOR 每次執行
多個任務。

三 總結:
OK , AsyncTask 的原始碼就分析到這裡。其實到最後我們會發現,真正實現併發的其實還是執行緒池,AsyncTask 只不過是將其封裝一下,暴露一些介面以及引數。方便於呼叫者使用。我覺得更應該值得學習應該是它封裝的思想或者說是思路吧,即如何能通過封裝,提供更好的使用,滿足更多的需求。

四 下一篇

下一篇會介紹 Android 中另一個可以實現非同步操作的類:HandlerThread。並且嘗試自定義 HandlerThread ,使其用法更加趨於簡單。