1. 程式人生 > >執行緒和執行緒池

執行緒和執行緒池

Android中,幾乎完全採用了Java中的執行緒機制。執行緒是最小的排程單位,在很多情況下為了使APP更加流程地執行,我們不可能將很多事情都放在主執行緒上執行,這樣會造成嚴重卡頓(ANR),那麼這些事情應該交給子執行緒去做,但對於一個系統而言,建立、銷燬、排程執行緒的過程是需要開銷的,所以我們並不能無限量地開啟執行緒,那麼對執行緒的瞭解就變得尤為重要了。

Thread/Runnable/Callable

一般實現執行緒的方法有兩種,一種是類繼承Thread,一種是實現介面Runnable。這兩種方式的優缺點如何呢?我們知道Java是單繼承但可以呼叫多個介面,所以看起來Runnable更加好一些。

繼承Thread

class MyThread extend Thread(){
    @Override
    public void run() {
        super.run();
        Log.i(Thread.currentThread().getId());
    }
}
new MyThread().start();

實現Runnable介面

class MyThread implements Runnable{
    @Override
    public void run() {
        Log.i("MyThread", Thread.currentThread().getName());
    }
}
MyThreaed myThread = new MyThread();
new Thread(myThread).start();

當我們呼叫Thread時,會有兩種方式:

Thread myThread = new Thread();
myThread.run();
myThread.start();

我們應該知道,run()方法只是呼叫了Thread例項的run()方法而已,它仍然執行在主執行緒上,而start()方法會開闢一個新的執行緒,在新的執行緒上呼叫run()方法,此時它執行在新的執行緒上。

Runnable只是一個介面,所以單看這個介面它和執行緒毫無瓜葛,可能一部分人會以為Runnable實現了執行緒,這種理解是不對的。Thread呼叫了Runnable介面中的方法用來線上程中執行任務。

public interface Runnable {
    public void run();
}

public interface Callable<V> {
    V call() throws Exception;
}

Runnable 和 Callable 都代表那些要在不同的執行緒中執行的任務。Runnable 從 JDK1.0 開始就有了,Callable 是在 JDK1.5 增加的。它們的主要區別是 Callable 的 call() 方法可以返回值和丟擲異常,而 Runnable 的 run() 方法沒有這些功能。Callable 可以返回裝載有計算結果的 Future 物件。

我們通過對比兩個介面得到這樣的結論:

  • Callable 介面下的方法是 call(),Runnable 介面的方法是 run();
  • Callable 的任務執行後可返回值,而 Runnable 的任務是不能返回值的;
  • call() 方法可以丟擲異常,run()方法不可以的;
  • 執行 Callable 任務可以拿到一個 Future 物件,表示非同步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並檢索計算的結果。通過 Future 物件可以瞭解任務執行情況,可取消任務的執行,還可獲取執行結果;

然而…Thread類只支援Runnable介面,由此引入FutureTask的概念。

FutureTask

FutureTask 實現了 Runnable 和 Future,所以兼顧兩者優點,既可以在 Thread 中使用,又可以在 ExecutorService 中使用。

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

使用 FutureTask 的好處是 FutureTask 是為了彌補 Thread 的不足而設計的,它可以讓程式設計師準確地知道執行緒什麼時候執行完成並獲得到執行緒執行完成後返回的結果。FutureTask 是一種可以取消的非同步的計算任務,它的計算是通過 Callable 實現的,它等價於可以攜帶結果的 Runnable,並且有三個狀態:等待、執行和完成。完成包括所有計算以任意的方式結束,包括正常結束、取消和異常。

除了以上這些,在Android中充當執行緒的角色還有AsyncTask、HandlerThread、IntentService。它們本質上都是由Handler+Thread來構成的,不過不同的設計讓它們可以在不同的場合發揮更好的作用。我們來簡單地說一下它們各自的特點:

AsyncTask,它封裝了執行緒池和Handler,主要為我們在子執行緒中更新UI提供便利。 
HandlerThread,它是個具有訊息佇列的執行緒,可以方便我們在子執行緒中處理不同的事務。 
IntentService,我們可以將它看做為HandlerThread的升級版,它是服務,優先順序更高。

下面我來通過原始碼,用法等來認清這些執行緒。

AsyncTask

AsyncTask是一個輕量級的非同步任務類,它可以線上程池中執行後臺任務,然後把執行的進度和結果傳遞給主執行緒並且在主執行緒中更新UI。

1. 用法

AsyncTask是一個抽象泛型類,宣告:public abstract class AsyncTask<Params, Progress, Result>; 
並且提供了4個核心方法。

  • 引數1,Params,非同步任務的入參;
  • 引數2,Progress,執行任務的進度;
  • 引數3,Result,後臺任務執行的結果;
  • 方法1, onPreExecute(),在主執行緒中執行,任務開啟前的準備工作;
  • 方法2,doInbackground(Params…params),開啟子執行緒執行後臺任務;
  • 方法3,onProgressUpdate(Progress values),在主執行緒中執行,更新UI進度;
  • 方法4,onPostExecute(Result result),在主執行緒中執行,非同步任務執行完成後執行,它的引數是doInbackground()的返回值。

從上面我們可以清晰地瞭解到AsyncTask的具體用法,但是為什麼它是如此表現呢?或者說,為什麼它是這樣而非其它樣,那麼,我們就來看看原始碼吧。

2. 原始碼分析

我們知道開啟AsyncTask非同步任務是通過new MyAsyncTask().execute()開啟的,我們就以此為入口開始分析。

2.1 execute()方法

public final AsyncTask<Params, Progress, Result> execute(Params... params) {
    return executeOnExecutor(sDefaultExecutor, params);
}

分析:從原始碼中,可以看到execute()方法呼叫了executeOnExecutor()方法。

2.2 executeOnExecutor()方法

public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
        Params... params) {
    if (mStatus != Status.PENDING) {
        switch (mStatus) {
            case RUNNING:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task is already running.");
            case FINISHED:
                throw new IllegalStateException("Cannot execute task:"
                        + " the task has already been executed "
                        + "(a task can be executed only once)");
        }
    }
    mStatus = Status.RUNNING;
    onPreExecute();
    mWorker.mParams = params;
    exec.execute(mFuture);
    return this;
}

分析:mStatus代表了當前非同步任務的執行狀態,我們可以看出AsyncTask是一次性的,即不能重複呼叫execute()來開啟非同步任務。當該任務第一次啟動時,狀態設定為RUNNING,並且呼叫onPreExecute()級上文中提到了核心方法1。毫無疑問,此方法是protected void onPreExecute() {} 是需要我們來實現的。

然後我們將引數1Params給了mWorker,

我們從2.1可以看到2.2中方法的引數exec就是sDefaultExecutor。

private static class SerialExecutor implements Executor {
    final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
    Runnable mActive;

    public synchronized void execute(final Runnable r) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if (mActive == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((mActive = mTasks.poll()) != null) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

我們可以看到它用來實現AsyncTask的排隊執行,即AsyncTask是序列而非併發執行的。原始碼的大致執行過程我們會在下面給出流程圖。

2.3 mWork的call()方法

mWorker = new WorkerRunnable<Params, Result>() {
    public Result call() throws Exception {
        mTaskInvoked.set(true);     Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);            
        //noinspection unchecked
        Result result = doInBackground(mParams);
        Binder.flushPendingCommands();
        return postResult(result);
    }
};

我們可以看到此時開始執行AsyncTask的核心方法2,我當然也是需要我們自己實現的。

2.4 sHandler

private Result postResult(Result result) {
    @SuppressWarnings("unchecked")
    Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
            new AsyncTaskResult<Result>(this, result));
    message.sendToTarget();
    return result;
}

我們在2.3中可以看到最後執行到了postResult()方法,此方法就是利用一個靜態sHandler變數將訊息傳送出去並且交由主執行緒處理,這樣一來就實現了子執行緒和主執行緒的切換問題。不僅僅是此處由sHandler處理,當在子執行緒中執行doInbackground()時,如果我們需要更新進度即呼叫核心方法3也需要利用sHandler傳送訊息給主執行緒處理。

2.5 finish

private void finish(Result result) {
    if (isCancelled()) {
        onCancelled(result);
    } else {
        onPostExecute(result);
    }
    mStatus = Status.FINISHED;
}

從2.4中看到,傳送sHandler的MESSAGE_POST_RESULT訊息,執行了finish()方法。該方法很簡單,如果正常結束呼叫了核心方法4結束整個非同步任務。如果在非同步任務執行的過程中被取消了那麼呼叫onCancelled()方法。

流程圖分析如下:

這裡寫圖片描述

3. 基本使用

我們從原始碼分析中,可以看到AsyncTask非同步任務在內部是序列執行的,我們為了提高讓非同步任務的執行效率,在Android3.0之後提供了,executeOnExecutor()方法。

來個一般的寫法吧:

private class MyAsyncTask extends AsyncTask{

    private String mName;

    public MyAsyncTask(String name){
        mName = name;
    }

    @TargetApi(Build.VERSION_CODES.N)
    @Override
    protected Object doInBackground(Object[] params) {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Log.i(mName, dateFormat.format(new Date(System.currentTimeMillis())));
        return null;
    }

}

我們可以通過這樣的呼叫來驗證其是序列還是併發的。

(new MyAsyncTask("MyAsyncTask01")).execute();
(new MyAsyncTask("MyAsyncTask02")).execute();
(new MyAsyncTask("MyAsyncTask03")).execute();

可以看到這三個非同步任務的執行時間是

I/MyAsyncTask01: 2017-05-26 20:35:29
I/MyAsyncTask02: 2017-05-26 20:35:32
I/MyAsyncTask03: 2017-05-26 20:35:35

由此可知它的執行是序列的,如果需要併發執行,呼叫executeOnExecutor()即可。

(new MyAsyncTask("MyAsyncTask01")).executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR, "");
(new MyAsyncTask("MyAsyncTask02")).executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR, "");
(new MyAsyncTask("MyAsyncTask03")).executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR, "");

看時間是這樣的:

I/MyAsyncTask02: 2017-05-26 20:38:52
I/MyAsyncTask03: 2017-05-26 20:38:52
I/MyAsyncTask01: 2017-05-26 20:38:52

HandlerThread

HandlerThread繼承了Thread,我們都知道如果需要線上程中建立一個可接收訊息的Handler,可參考我的另一篇文章Android訊息機制-Handler。所以HandlerThread實際上是一個允許Handler的特殊執行緒。

@Override
public void run() {
    mTid = Process.myTid();
    Looper.prepare();
    synchronized (this) {
        mLooper = Looper.myLooper();
        notifyAll();
    }
    Process.setThreadPriority(mPriority);
    onLooperPrepared();
    Looper.loop();
    mTid = -1;
}

普通執行緒在run()方法中執行耗時操作,而HandlerThread在run()方法建立了一個訊息佇列不停地輪詢訊息,我們可以通過Handler傳送訊息來告訴執行緒該執行什麼操作。

它在Android中是個很有用的類,它常見的使用場景實在IntentService中。當我們不再需要HandlerThread時,我們通過呼叫quit/Safely方法來結束執行緒的輪詢並結束該執行緒。

IntentService

1. 概述

IntentService是一個繼承Service的抽象類,所以我們必須實現它的子類再去使用。

在說到HandlerThread時我們提到,HandlerThread的使用場景是在IntentService上,我們可以這樣來理解IntentService,它是一個實現了HandlerThread的Service。

那麼為什麼要這樣設計呢?這樣設計的好處是Service的優先順序比較高,我們可以利用這個特性來保證後臺服務的優先正常執行,甚至我們還可以為Service開闢一個新的程序。

2. 原始碼分析

我們先來看看onCreate()函式:

@Override
public void onCreate() {
    super.onCreate();
    HandlerThread thread = new HandlerThread("IntentService[" + mName + "]");
    thread.start();
    mServiceLooper = thread.getLooper();
    mServiceHandler = new ServiceHandler(mServiceLooper);
}

我們可以看到建立Service時,實現了一個HandlerThread的例項開啟了一個執行緒,並在執行緒內部進行訊息輪詢,又建立了一個Handler來收發Looper的訊息。

我們每啟動一次服務時,不會開啟新的服務,只是會呼叫onStartCommand()函式,我們又看到該函式呼叫了onStart()方法。

@Override
public void onStart(@Nullable Intent intent, int startId) {
    Message msg = mServiceHandler.obtainMessage();
    msg.arg1 = startId;
    msg.obj = intent;
    mServiceHandler.sendMessage(msg);
}

在該方法中我們看到,這裡用來接收Context傳遞的引數,通過Handler傳送出去,然後再HandlerThread的執行緒上接收訊息並且處理。

private final class ServiceHandler extends Handler {
    public ServiceHandler(Looper looper) {
        super(looper);
    }

    @Override
    public void handleMessage(Message msg) {
        onHandleIntent((Intent)msg.obj);
        stopSelf(msg.arg1);
    }
}

我們可以看到onHandleIntent()方法是我們需要接收訊息處理的。

流程如下

這裡寫圖片描述

初識執行緒池

我們在上面的篇幅中,講到了執行緒的概念以及一些擴充套件執行緒。那麼我們考慮一個問題,我如果需要同時做很多事情,是不是給每一個事件都開啟一個執行緒呢?那如果我的事件無限多呢?頻繁地建立/銷燬執行緒,CPU該吃不消了吧。所以,這時候執行緒池的概念就來了。我們舉個例子來闡述一下執行緒池大致工作原理。

比如,有個老闆戚總開了個飯店,每到中午就有很多人點外賣,一開始戚總招了10個人送外賣,然而由於午飯高峰期可能同時需要派送50份外賣,那如何保證高效地執行呢?

戚總想著那再招40個員工送?我去,那我這店豈不是要賠死,人員工資這麼高,並且大部分時候也只需要同時派送幾份外賣而已,招這麼多人乾瞪眼啊,是啊。但我還得保證高峰期送餐效率,咋辦呢?

經過一番思想鬥爭,戚總想通了,我也不可能做到完美,儘量高效就行了,那正常時間一般只需要同時送四五家外賣,那我就招5個員工作為正式員工(核心執行緒),再招若干兼職(非核心執行緒)在用餐高峰時緩解一下送餐壓力即可。

那麼,人員分配方案出來了,當正式員工(核心執行緒)空閒時有單進來理所應當讓他們派送,如果正式員工忙不過了,就讓兼職人員(非核心執行緒)送,按單提成唄。

好吧,囉嗦這麼多,這就是執行緒池的概念原理吧。

我們來總結一下優點吧。

  • 重用執行緒池中的執行緒,避免頻繁地建立和銷燬執行緒帶來的效能消耗;
  • 有效控制執行緒的最大併發數量,防止執行緒過大導致搶佔資源造成系統阻塞;
  • 可以對執行緒進行一定地管理。

ThreadPoolExecutor

ExecutorService是最初的執行緒池介面,ThreadPoolExecutor類是對執行緒池的具體實現,它通過構造方法來配置執行緒池的引數,我們來分析一下它常用的建構函式吧。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

引數解釋:

  • corePoolSize,執行緒池中核心執行緒的數量,預設情況下,即使核心執行緒沒有任務在執行它也存在的,我們固定一定數量的核心執行緒且它一直存活這樣就避免了一般情況下CPU建立和銷燬執行緒帶來的開銷。我們如果將ThreadPoolExecutor的allowCoreThreadTimeOut屬性設定為true,那麼閒置的核心執行緒就會有超時策略,這個時間由keepAliveTime來設定,即keepAliveTime時間內如果核心執行緒沒有迴應則該執行緒就會被終止。allowCoreThreadTimeOut預設為false,核心執行緒沒有超時時間。
  • maximumPoolSize,執行緒池中的最大執行緒數,當任務數量超過最大執行緒數時其它任務可能就會被阻塞。最大執行緒數=核心執行緒+非核心執行緒。非核心執行緒只有當核心執行緒不夠用且執行緒池有空餘時才會被建立,執行完任務後非核心執行緒會被銷燬。
  • keepAliveTime,非核心執行緒的超時時長,當執行時間超過這個時間時,非核心執行緒就會被回收。當allowCoreThreadTimeOut設定為true時,此屬性也作用在核心執行緒上。
  • unit,列舉時間單位,TimeUnit。
  • workQueue,執行緒池中的任務佇列,我們提交給執行緒池的runnable會被儲存在這個物件上。

執行緒池的分配遵循這樣的規則:

  • 當執行緒池中的核心執行緒數量未達到最大執行緒數時,啟動一個核心執行緒去執行任務;
  • 如果執行緒池中的核心執行緒數量達到最大執行緒數時,那麼任務會被插入到任務佇列中排隊等待執行;
  • 如果在上一步驟中任務佇列已滿但是執行緒池中執行緒數量未達到限定執行緒總數,那麼啟動一個非核心執行緒來處理任務;
  • 如果上一步驟中執行緒數量達到了限定執行緒總量,那麼執行緒池則拒絕執行該任務,且ThreadPoolExecutor會呼叫RejectedtionHandler的rejectedExecution方法來通知呼叫者。

執行緒池的分類

我們來介紹一下不同特性的執行緒池,它們都直接或者間接通過ThreadPoolExecutor來實現自己的功能。它們分別是:

  • FixedThreadPool
  • CachedThreadPool
  • ScheduledThreadPool
  • SingleThreadExecutor

1. FixedThreadPool

通過Executors的newFixedThreadPool()方法建立,它是個執行緒數量固定的執行緒池,該執行緒池的執行緒全部為核心執行緒,它們沒有超時機制且排隊任務佇列無限制,因為全都是核心執行緒,所以響應較快,且不用擔心執行緒會被回收。

public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(
        nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>() 
        );
}
ExecutorService mExecutor = Executors.newFixedThreadPool(5);

引數nThreads,就是我們固定的核心執行緒數量。

2. CachedThreadPool

通過Executors的newCachedThreadPool()方法來建立,它是一個數量無限多的執行緒池,它所有的執行緒都是非核心執行緒,當有新任務來時如果沒有空閒的執行緒則直接建立新的執行緒不會去排隊而直接執行,並且超時時間都是60s,所以此執行緒池適合執行大量耗時小的任務。由於設定了超時時間為60s,所以當執行緒空閒一定時間時就會被系統回收,所以理論上該執行緒池不會有佔用系統資源的無用執行緒。

public static ExecutorService new CachedThreadPool(){
    return new ThreadPoolExecutor(
        0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>()
    );
}

3. ScheduledThreadPool

通過Executors的newScheduledThreadPool()方法來建立,ScheduledThreadPool執行緒池像是上兩種的合體,它有數量固定的核心執行緒,且有數量無限多的非核心執行緒,但是它的非核心執行緒超時時間是0s,所以非核心執行緒一旦空閒立馬就會被回收。這類執行緒池適合用於執行定時任務和固定週期的重複任務。

public static ScheduledThreadPool newScheduledThreadPool(int corePoolSize){
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize){
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

引數corePoolSize是核心執行緒數量。

4. SingleThreadExecutor

通過Executors的newSingleThreadExecutor()方法來建立,它內部只有一個核心執行緒,它確保所有任務進來都要排隊按順序執行。它的意義在於,統一所有的外界任務到同一執行緒中,讓呼叫者可以忽略執行緒同步問題。

public static ExecutorService newSingleThreadExecutor(){
    return new FinalizableDelegatedExecutorService(
        new ThreadPoolExecutor(
        1, 1, 0L, TimeUnit.MILLISECONDS, 
        new LinkedBlockingQueue<Runnable>()));
}

執行緒池一般用法

  • shutDown(),關閉執行緒池,需要執行完已提交的任務;
  • shutDownNow(),關閉執行緒池,並嘗試結束已提交的任務;
  • allowCoreThreadTimeOut(boolen),允許核心執行緒閒置超時回收;
  • execute(),提交任務無返回值;
  • submit(),提交任務有返回值;

自定義執行緒池

ExecutorService mExecutor = Executors.newFixedThreadPool(5);

execute()方法

接收一個Runnable物件作為引數,非同步執行。

Runnable myRunnable = new Runnable() {
    @Override
    public void run() {
        Log.i("myRunnable", "run");
    }
};
mExecutor.execute(myRunnable);