1. 程式人生 > >Android多執行緒-AsyncTask工作流程(原始碼)

Android多執行緒-AsyncTask工作流程(原始碼)

AsyncTask的原始碼是很簡單的,看著並不複雜。只是對Handler和ThreadPoolExecutor進行了一下封裝。

基於api25(7.1)的程式碼,

使用起來也是很簡單的,看上個就知道了。一般要繼承AsyncTask並重寫下面幾個方法,這些方法的執行順序一目瞭然:

//任務執行前呼叫
protected void onPreExecute() {}
//執行後臺任務
protected abstract Result doInBackground(Params... params);
//返回任務執行結果
protected void onPostExecute(Result result) {}
//返回任務執行進度
protected void onProgressUpdate(Progress... values) {} //任務取消時呼叫 protected void onCancelled() {}

只有doInBackground一個方法是抽象的,必須重寫,其他的可以不用重寫。

然後通常的呼叫方法是這樣的:

new MyAsyncTask().execute();

Handler和執行緒池

既然是對Handler和執行緒池的封裝,就先看看封裝的什麼用的Handler和執行緒池。都是定義在AsyncTask類中。

handler

private static InternalHandler sHandler;
private
static class InternalHandler extends Handler { public InternalHandler() { super(Looper.getMainLooper()); } @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"}) @Override public void handleMessage(Message msg) { AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj; switch
(msg.what) { case MESSAGE_POST_RESULT: // There is only one result result.mTask.finish(result.mData[0]); break; case MESSAGE_POST_PROGRESS: result.mTask.onProgressUpdate(result.mData); break; } } } private static Handler getHandler() { synchronized (AsyncTask.class) { if (sHandler == null) { sHandler = new InternalHandler(); } return sHandler; } }

首先有一個變數sHandler。

InternalHandler是一個靜態內部類

在這個構造方法中可以看出,InternalHandler使用了主執行緒也就是UI執行緒的Looper來處理訊息,所以這個Handler收到的訊息會在主執行緒中處理。使用這個Handler就達到了跟主執行緒進行互動的目的。

然後提供了一個方法getHandler,用單例來獲取唯一的一個InternalHandler。

執行緒池

public static final Executor THREAD_POOL_EXECUTOR;
static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }

public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
//預設的執行緒池
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;

private static class SerialExecutor implements Executor {
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;

        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }
/** @hide */
    public static void setDefaultExecutor(Executor exec) {
        sDefaultExecutor = exec;
    }

在這裡可以看見連個執行緒池,THREAD_POOL_EXECUTORSERIAL_EXECUTOR,以及一個預設使用的執行緒池變數sDefaultExecutor

這兩個執行緒池關係到了為什麼AsyncTask的任務是序列的。在ActivityThread中有這樣一段程式碼:

//android.os.Build.VERSION_CODES.HONEYCOMB_MR1=12
if (data.appInfo.targetSdkVersion <= android.os.Build.VERSION_CODES.HONEYCOMB_MR1) {
    AsyncTask.setDefaultExecutor(AsyncTask.THREAD_POOL_EXECUTOR);
}

看來只有當版本小於13的時候,才會將THREAD_POOL_EXECUTOR作為預設執行緒池,可以並行執行任務。

THREAD_POOL_EXECUTOR

其中THREAD_POOL_EXECUTOR是在靜態程式碼塊中定義的,在類載入的時候就執行了,而且只會執行一次。

直接使用ThreadPoolExecutor的構造方法來構造了一個執行緒池,來看一下引數:

  • private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));

    核心執行緒數,其中CPU_COUNT = Runtime.getRuntime().availableProcessors()表示CPU數量。最低兩個,最多四個。

  • private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;

    最大執行緒數,是CPU核心數的2倍+1.

  • private static final int KEEP_ALIVE_SECONDS = 30;

    空閒執行緒存活時間,30 。根據第四個引數TimeUnit.SECONDS知道是30秒。

  • private static final BlockingQueue<Runnable> sPoolWorkQueue =new LinkedBlockingQueue<Runnable>(128);

    使用了LinkedBlockingQueue,超過核心執行緒數量的任務會在佇列中排隊。

  • private static final ThreadFactory sThreadFactory = new ThreadFactory() {
      private final AtomicInteger mCount = new AtomicInteger(1);
    
      public Thread newThread(Runnable r) {
          return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
      }
    };

    這個只是給每個執行緒池中建立的額執行緒起了個名字,叫 AsyncTask # 數字,數字自增長。

然後設定語序核心執行緒空閒超時threadPoolExecutor.allowCoreThreadTimeOut(true)

這是一箇中規中矩的執行緒池,然而預設使用的執行緒池並不是這個。而是下面的SERIAL_EXECUTOR

SERIAL_EXECUTOR

SERIAL_EXECUTOR中維護了一個雙端陣列佇列mTasks,裡面存放的Runnable。

當呼叫他的execute方法執行Runnable時,他會把這個Runnablerun方法和scheduleNext()方法重新加工包裝成一個新的Runnable放在佇列中。

然後下面會判斷mActive是不是空的,第一次肯定是空的,所以會執行scheduleNext()方法。

在這個方法中,會呼叫佇列的poll方法取出一個Runnable,然後呼叫上面的執行緒池THREAD_POOL_EXECUTOR來執行任務。

因為每個任務經過加工都加上了scheduleNext()方法,所以佇列中的任務都會按順序執行完。

  • 由此可見,這個佇列僅僅起到一個排序功能,是各個任務依次執行,真正的執行還是交給了執行緒池SERIAL_EXECUTOR.

AsyncTask的其他內部類

狀態Status

AsyncTask有一個列舉類定義了三個狀態:

public enum Status {
    /**
     * Indicates that the task has not been executed yet.
     * 表明任務尚未執行
     */
    PENDING,
    /**
     * Indicates that the task is running.
     * 表明任務正在執行
     */
    RUNNING,
    /**
     * Indicates that {@link AsyncTask#onPostExecute} has finished.
     * 表明onPostExecute已經結束
     */
    FINISHED,
}

當然就有一個表示狀態的變數,mStatus預設是Status.PENDING:

private volatile Status mStatus = Status.PENDING;

WorkerRunnable

private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
    Params[] mParams;
}

這個類就是個CallAble介面,裡面增加了一個引數陣列。

AsyncTaskResult

這個類其實就是個儲存類,儲存了一個AsyncTaskData[].

private static class AsyncTaskResult<Data> {
    final AsyncTask mTask;
    final Data[] mData;

    AsyncTaskResult(AsyncTask task, Data... data) {
        mTask = task;
        mData = data;
    }
}

執行過程

上面的都準備好了,下面就能運行了。

看著呼叫方法:

new MyAsyncTask().execute();

先實現一個AsyncTask,這時要定義三個引數型別

AsyncTask<Params, Progress, Result> 

先new一個,然後呼叫execute方法

new的時候肯定會呼叫構造方法

構造方法

AsyncTask的構造方法中初始化了兩個變數:

private final WorkerRunnable<Params, Result> mWorker;
private final FutureTask<Result> mFuture;
private final AtomicBoolean mTaskInvoked = new AtomicBoolean();
public AsyncTask() {
    mWorker = new WorkerRunnable<Params, Result>() {
        public Result call() throws Exception {
            mTaskInvoked.set(true);
            Result result = null;
            try {
                Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
                //noinspection unchecked
                result = doInBackground(mParams);
                Binder.flushPendingCommands();
            } catch (Throwable tr) {
                mCancelled.set(true);
                throw tr;
            } finally {
                postResult(result);
            }
            return result;
        }
    };

    mFuture = new FutureTask<Result>(mWorker) {
        @Override
        protected void done() {
            try {
                postResultIfNotInvoked(get());
            } catch (InterruptedException e) {
                android.util.Log.w(LOG_TAG, e);
            } catch (ExecutionException e) {
                throw new RuntimeException("An error occurred while executing doInBackground()",
                        e.getCause());
            } catch (CancellationException e) {
                postResultIfNotInvoked(null);
            }
        }
    };
}

mWorker前面看到了,是一個WorkerRunnable,所以要重寫call方法。

這個任務其實才是後臺任務,所以這個任務就是真正的AsyncTask的任務了。

然後就呼叫了doInBackground(mParams),把引數穿了進去。

mFuture是個FutureTask,他把上面的mWorker又進行了一次包裝,會先執行mWorkercall方法中的內容,再執行done()

execute

public final AsyncTask<Params, Progress, Result> execute(Params... params) {
    return executeOnExecutor(sDefaultExecutor, params);
}

這個方法是真正開始執行任務的方法,一般都會傳入個引數Params。

裡面直接呼叫了executeOnExecutor(sDefaultExecutor, params)方法,使用的執行緒池是預設的 ,也就是上面的SERIAL_EXECUTOR。序列執行任務。

onPreExecute

public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
        Params... params) {
    if (mStatus != Status.PENDING) {
        switch (mStatus) {
            case RUNNING:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task is already running.");
            case FINISHED:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task has already been executed "
                        + "(a task can be executed only once)");
        }
    }

    mStatus = Status.RUNNING;

    onPreExecute();

    mWorker.mParams = params;
    exec.execute(mFuture);

    return this;
}

這裡先判斷任務的狀態mStatus,如果是正在執行或者說執行結束了,都會拋異常。所以一個任務只能執行一次executef方法,一個任務只能執行一次,不能重複執行。

mStatusPENDING的時候,先mStatus = Status.RUNNING表示任務正在執行了。

然後就呼叫到了onPreExecute()方法。

mWorker.mParams = params前面看見WorkerRunnable中有一個變數Params[] mParams,儲存了傳入的引數。

一直到這時候都還是在原來的執行緒中執行,並沒有開啟多執行緒。所以這個方法也是在原來的執行緒中執行的。

然後呼叫傳入的執行緒池的execute方法,來執行構造方法中新建的mFuture。這個時候就使用執行緒池開新執行緒了。

doInBackground

mFuture裡有個mWorker,會執行他的call方法,這個方法中的內容都會在子執行緒中執行,其中包括了doInBackground,他的引數mParamsWorkerRunnable裡的變數。:

mWorker = new WorkerRunnable<Params, Result>() {
    public Result call() throws Exception {
        mTaskInvoked.set(true);
        Result result = null;
        try {
            ...
            result = doInBackground(mParams);
            ...
        } catch (Throwable tr) {
            mCancelled.set(true);
            throw tr;
        } finally {
            postResult(result);
        }
        return result;
    }
};

首先把mTaskInvoked設定為true表示這個任務已經開始了。

然後呼叫重寫過的doInBackground方法,執行咱們需要後臺執行的任務。

doInBackground有一個返回值,也是個泛型。

出現異常時,將mCancelled設定為true,表示任務取消了。

最終都會呼叫postResult(result)方法。

這個方法也很簡單:

private Result postResult(Result result) {
    Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
            new AsyncTaskResult<Result>(this, result));
    message.sendToTarget();
    return result;
}

getHandler()前面知道是通過單例獲取到一個使用主執行緒Looper建立的Handler,就是InnerHandler,所以他對訊息的處理會回到主執行緒中。

然後通過這個Handler傳送訊息,訊息內容是new AsyncTaskResult<Result>(this, result)。前面也看了這是個載體,將這個AsyncTask本身和後臺任務doInBackground的返回結果傳了進去。

然後整個工作就完成了。

onProgressUpdate

這個方法用來更新進度,不能直接呼叫,要通過publishProgress方法來呼叫。

doInBackground,通常會手動呼叫publishProgress方法來更新進度

protected final void publishProgress(Progress... values) {
    if (!isCancelled()) {
        getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
                new AsyncTaskResult<Progress>(this, values)).sendToTarget();
    }
}

這個方法賢惠判斷任務是否取消,如果取消了就什麼都不做。

沒取消就也用InnerHandler傳送訊息,內容是new AsyncTaskResult<Progress>(this, values)。傳入當前的AsyncTask和進度資訊values.

這些方法最終都是以通過Handler傳送個訊息結束,所以後面的就是Handler的事了

之前已經看過了這個類:

private static class InternalHandler extends Handler {
    public InternalHandler() {
        super(Looper.getMainLooper());
    }

    @SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
    @Override
    public void handleMessage(Message msg) {
        AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
        switch (msg.what) {
            case MESSAGE_POST_RESULT:
                // There is only one result
                result.mTask.finish(result.mData[0]);
                break;
            case MESSAGE_POST_PROGRESS:
                result.mTask.onProgressUpdate(result.mData);
                break;
        }
    }
}

handleMessage中能看到,result是個AsyncTaskResult型別,在前面也知道這個裡面儲存了AsyncTask和要傳遞的資料。前面兩個訊息攜帶的資訊也是這個型別。

msg.what==MESSAGE_POST_PROGRESS的時候,表示要更新進度,就先從result中拿到裡面的AsyncTask->result.mTask,然後呼叫他的onProgressUpdate方法,引數是result中的mData->result.mData

如果重寫過這個方法,就可以根據這個值更新進度。

onPostExecute

msg.what==MESSAGE_POST_RESULT的時候,表示有結果了,這是後臺任務已經執行結束了。

呼叫裡面的AsyncTaskfinish方法,引數是訊息裡的mData

private void finish(Result result) {
    if (isCancelled()) {
        onCancelled(result);
    } else {
        onPostExecute(result);
    }
    mStatus = Status.FINISHED;
}

裡面也是先判斷是否取消,取消了就呼叫取消的回撥onCancelled,沒取消就呼叫正常的回撥onPostExecute,並最後把任務狀態改為Status.FINISHED

整個任務結束。