OkHttp原始碼之執行緒排程
身為一名Android開發者,我想應該沒有人不知道OkHttp這款優秀的http客戶端,大家一般都是使用okhttp的非同步請求模式,那麼不知大家在使用時有沒有想過,okhttp的非同步執行緒是怎麼組織的呢?一個簡單的執行緒池嗎? 一切都讓原始碼說話。
通常來說,執行緒排程主要是非同步呼叫才會涉及,我們看看RealCall的enqueue方法:
@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } //初始化此次請求的異常撲捉器 captureCallStackTrace(); //okhttp的一種監聽機制,能監聽各種請求不同階段,這裡回撥 //請求開始的方法 eventListener.callStart(this); //這句就是開始執行緒排程的重點 client.dispatcher().enqueue(new AsyncCall(responseCallback)); }
這裡我們關注的是執行緒的排程,那麼什麼異常、監聽是幹什麼的我們完全不管,這裡直接跟到enqueue方法中去,我們發現所有的執行緒排程都和一個類有關,Dispatcher.java,這個就是核心類,我們重點分析,首先看下成員變數:
//最大請求數量64 private int maxRequests = 64; //每個host最大請求數量 private int maxRequestsPerHost = 5; private @Nullable Runnable idleCallback; /** Executes calls. Created lazily. */ private @Nullable ExecutorService executorService; /** Ready async calls in the order they'll be run. */ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
看到 maxRequests和maxRequestsPerHost這兩個屬性,我們自然而然有這樣的疑問:
最大請求數量是64,超過了怎麼辦?會擴容嗎?還是等待其他的結束了再執行超過部分的請求?
然後我們看到readyAsyncCalls這個屬性,我們可以大膽猜測,超過最大數量的請求應該會放在這裡等待執行。接下來就是驗證猜測的過程。
非同步執行緒呼叫
之前我們提到,RealCall的enqueue()方法中會呼叫Dispatcher的enqueue方法,我們就先從這個方法開始:
synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }
這裡很簡單,首先判斷當前正在執行的請求數量有沒有超過maxRequests,以及該host的請求數量有沒有超過maxRequestsPerHost,如果沒有我們可以立即執行,並加入到runningAsyncCalls佇列,這裡沒有任何疑問。如果不滿足數量要求,超出了最大數量,直接加入到等待佇列。
上面的邏輯很簡單,但是看完了大家肯定一臉懵逼,那些放在等待佇列中的請求什麼時候執行呢?
到這裡我們可以按照常理大膽的猜一猜,這些等待執行的請求肯定是等待已有的正在執行的請求結束後執行的,所以我們繼續回到RealCall裡找,果然被我們找到了:
@Override protected void execute() { boolean signalledCallback = false; try { //省略不相干程式碼 } catch (IOException e) { //省略不相干程式碼 } finally { client.dispatcher().finished(this); } }
這裡的execute就是真正發請求的地方,該請求是阻塞的。我們看到最後會執行Dispatcher.finished()方法,估計這裡就是用來執行等待請求的程式碼了,我們跟進去看看:
void finished(AsyncCall call) { finished(runningAsyncCalls, call, true); }
繼續跟進:
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) { int runningCallsCount; Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); if (promoteCalls) promoteCalls(); runningCallsCount = runningCallsCount(); idleCallback = this.idleCallback; } //這裡的idleCallback應該是為了在所有請求執行結束後執行, //這裡只是提供一種手段監控這個全部結束時機。 if (runningCallsCount == 0 && idleCallback != null) { idleCallback.run(); } }
我們可以看到結束請求後,首先把自己從runningAsyncCalls(之前當做引數傳入,正在執行非同步請求的佇列)中刪除,然後由於promoteCalls為true,會去執行promoteCalls()方法。那麼毫無疑問,這個promoteCall()方法應該主要就是去執行正在等待的請求,我們跟進去看下:
private void promoteCalls() { //當前執行佇列還是滿的,這種情況出現在其他請求結束後立即用新的請求補上來了,這是什麼都不幹 if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } }
這裡的邏輯很簡單,就是在runningAsync佇列有空位置的情況下,從readyAsyncCall佇列中拿請求出來放進去執行,直到runningAsyncCalls滿了或者readyAsyncCalls空了為止。
同步呼叫
之前我們列舉Dispatcher中的成員變數時,不知大家有沒有注意到這個:
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
看到這裡大不知道大家有沒有感覺奇怪,這個佇列明顯是用來儲存同步請求的,但是同步請求是阻塞的,不可能會同時出現多個啊,沒有什麼執行緒排程一說啊,我們看下RealCall的execute方法:
@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); eventListener.callStart(this); try { //重點關注這裡 client.dispatcher().executed(this); Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } catch (IOException e) { eventListener.callFailed(this, e); throw e; } finally { //重點關注這裡 client.dispatcher().finished(this); } }
可以看到,同步請求中開始執行了dispatcher的executed()方法,結束後執行了finished方法,我們一個個看過去.
首先是executed()方法:
/** Used by {@code Call#execute} to signal it is in-flight. */ synchronized void executed(RealCall call) { runningSyncCalls.add(call); }
然後看下finished方法:
/** Used by {@code Call#execute} to signal completion. */ void finished(RealCall call) { finished(runningSyncCalls, call, false); }
從上面可以看到,這裡dispatcher只是簡單的在開始時加入到runningSyncCalls佇列,然後結束後又移除,什麼都沒做,可能是以後有其他作用。
總結
同步的其實什麼都沒幹,沒什麼好說的,非同步的我們總結下:

image