1. 程式人生 > >網路請求框架----OkHttp原理

網路請求框架----OkHttp原理

一.前言

在 Android 中,網路請求是一個必不可少的功能,因此就有許多代表網路請求客戶端的元件庫,具有代表性的有下面三種:

  • Apache 的 HTTP 客戶端元件 HttpClient。
  • Java JDK 自帶的 HttpURLConnection 標準庫。
  • 重寫應用層程式碼的 HTTP 元件庫。
HttpClient

這是 Android 6.0 之前經常會使用的 API ,但是因為不支援 HTTP/2,支援 HTTP/2 的版本還處於 beta 階段,不適合用於 Android APP 中使用,所以 Google 在 6.0 版本里面刪除了 HttpClient 相關 API。

HttpURLConnection

這是 Java 自帶的一個元件,不需要引入依賴就可以使用,同樣這個元件庫也無法支援 HTTP/2, 支援的版本也要到 Java 9 後才行。同時這個標準庫 封裝層次太低,並且支援特性太少,缺乏連線池管理,域名機制控制等特性,因此在 Android 中使用就會相當繁瑣。

OkHttp

上述兩個元件都是不支援 HTTP/ 2 ,但是 HTTP/2 對於移動客戶端而言, 無論是從握手延遲、響應延遲, 還是資源開銷看都有相當吸引力,而且 OkHttp 在弱網和無網環境下有自動檢測和恢復機制,這使得 OkHttp 成為 Android 最常見的網路請求庫。

二.簡介

OkHttp 是一個支援 HTTP 和 HTTP/2 的封裝的網路請求客戶端,適用於 Android 和 java 應用程式。OkHttp 有如下優點:

  • 支援 HTTPS/HTTP2/WebSocket
  • 內部維護任務佇列執行緒池,支援 HTTP/2 的併發訪問
  • 內部維護連線池,支援 HTTP/1.x 的 keep-Alive 機制,也支援 HTTP/2 的多路複用, 減少連線建立開銷。
  • 通過快取避免重複的請求
  • 請求失敗時自動重試主機的其他ip,自動重定向。

三.原理

1.初始化

OkHttp 的使用初始化有兩種方式。

  • 預設方式:
OkHttpClient mOkHttpClient = new OkHttpClient();
  • 自定義配置方式:
OkHttpClient.Builder builder = new OkHttpClient.Builder()
                .connectTimeout()
                .writeTimeout()
                .readTimeout()
                .cache()
                .addInterceptor()
                .connectionPool()
                .dns()
                ...
                ;
                
  OkHttpClient   mOkHttpClient = builder.build();

不管是哪種方式,對於 OkHttp 來說都是初始化一些配置,因為這裡的引數十分多,所以這裡使用的 Builder 設計模式進行簡化。Builder 初始化的物件主要有:

public OkHttpClient() {
    this(new Builder());
  }
public Builder newBuilder() {
    return new Builder(this);
  }

public static final class Builder {
    public Builder() {
      dispatcher = new Dispatcher(); //請求的排程器
      protocols = DEFAULT_PROTOCOLS; // 預設支援的協議
      connectionSpecs = DEFAULT_CONNECTION_SPECS; // 預設連線配置
      eventListenerFactory = EventListener.factory(EventListener.NONE); // 對於請求和回撥的監聽
      proxySelector = ProxySelector.getDefault(); // 代理伺服器的選擇
      cookieJar = CookieJar.NO_COOKIES; // 預設沒有 Cookie 
      socketFactory = SocketFactory.getDefault(); // Socket 的工廠
      hostnameVerifier = OkHostnameVerifier.INSTANCE; //主機名認證
      certificatePinner = CertificatePinner.DEFAULT; // 安全認證相關的配置
      proxyAuthenticator = Authenticator.NONE; // 安全認證相關的配置
      authenticator = Authenticator.NONE; // 安全認證相關的配置
      connectionPool = new ConnectionPool();  //連線池
      dns = Dns.SYSTEM;   // DNS 域名解析系統
      followSslRedirects = true; // 允許SSL重定向
      followRedirects = true; //  允許重定向
      retryOnConnectionFailure = true;  // 允許失敗重連
      connectTimeout = 10_000;   // 連線超時 , 10 s
      readTimeout = 10_000;   //  讀取 超時 ,10 s 
      writeTimeout = 10_000;  // 寫入超時,10s
      pingInterval = 0; //ping  間隔時間,這是 WebSocket 長連線的活性檢測的間隔時間 
    }

上面這些配置都是 OkHttpClient 的預設屬性,當然也可以使用自己自定義的屬性。而且可以看到每一次初始化都會建立新的的 Builder ,因此也會重新建立一個連線池,排程器等耗資源的類,因此在使用 OkHttpClient 通常使用的單例模式,使得整個系統只有一個 請求排程器和連線池,減少資源的消耗。

2.發起請求

先看一個請求的建立的方式

   Request.Builder requestBuilder = new Request.Builder().url(url);
    requestBuilder.method("GET", null);
     Request request = requestBuilder.build();

可以看到這裡同樣的是使用 Builder 的模式來建立一個 Request 請求

public final class Request {
  final HttpUrl url;
  final String method;
  final Headers headers;
  final RequestBody body;
  final Object tag;

  private volatile CacheControl cacheControl; // Lazily initialized.

  Request(Builder builder) {
    this.url = builder.url;
    this.method = builder.method;
    this.headers = builder.headers.build();
    this.body = builder.body;
    this.tag = builder.tag != null ? builder.tag : this;
  }

Request 主要是對請求的 Url ,請求方法,請求頭,請求體,以及快取首部欄位的一個封裝而已。對於一個網路請求的, OkHttp 有兩種執行的方式:

  • 同步的:executed,這種方式不能在 主執行緒中呼叫。
okHttpClient.newCall(request).execute();

  • 非同步的 enqueue(responseCallback)。
okHttpClient.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
               
            }
        });

3.處理請求

在 OkHttp 中一個請求的處理主要是由 dispatcher 分發器負責,先看 Dispatcher 類主要有什麼東西。

public final class Dispatcher {
  private int maxRequests = 64;  //最大併發的請求數 為 64 
  private int maxRequestsPerHost = 5;  //每個主機最大請求數為 預設為 5 
  private Runnable idleCallback;

  /** Executes calls. Created lazily. */
  private ExecutorService executorService;  //請求處理的執行緒池

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();   //非同步處理的準備佇列

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); //非同步處理的執行佇列

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();  //同步處理的執行佇列

}

簡單來說就是對於同步請求,使用一個 佇列進行儲存。對於非同步的請求,有一個準備的佇列和一個正在執行的佇列進行儲存。因為同步的方式還是在 主執行緒中執行,因此沒有使用到執行緒池,而對於非同步的方式,OkHttp 使用了執行緒池對非同步請求進行管理。

在一個請求發起之後就是對請求的處理,因為處理請求的方式有同步和非同步兩種,所以具體的實現也有所不同,下面先看 同步的方式:

(1)同步請求的方式
  // okHttpClient 類中
 @Override public Call newCall(Request request) {
    return new RealCall(this, request, false /* for web socket */);
  }

   // RealCall 類中,實現 Call 介面
  final class RealCall implements Call {
   ...
@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);  //新增到請求佇列中
      Response result = getResponseWithInterceptorChain();  //攔截鏈,對應的一系列方法呼叫包括請求,得到響應後返回
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this); //結束這個請求
    }
  }
}
  //Call 介面
public interface Call extends Cloneable {

  Request request();

  Response execute() throws IOException;
  
  void enqueue(Callback responseCallback);
  
  boolean isExecuted();

  boolean isCanceled();

  Call clone();

  interface Factory {
    Call newCall(Request request);
  }
}

首先將 Request 封裝為一個 RealCall, 這個 RealCall 實現了 Call 介面,在 Call 介面中可以看到裡面既有 Request 也有 Response ,顯然 Call 介面定義的就是一次 網路請求和其對應的響應的抽象。

在 RealCall 類中一個請求的處理步驟主要是分為三步:

  • client.dispatcher().executed(this); 新增到佇列。
  • getResponseWithInterceptorChain(),發起攔截鏈,同時得到響應後返回。
  • client.dispatcher().finished(this);結束這個請求。
dispatcher().executed(this) / client.dispatcher().finished(this)
/** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
 /** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls(); //對於 executed 方式,這裡為false 所以不執行 promoteCalls
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

對於同步方式,新增佇列就是將請求新增到同步執行的佇列,然後就呼叫攔截器鏈得到請求後就結束這個 Call 。結束的時候就直接從佇列中移除。

(1)非同步請求的方式
 @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

可以看到這裡將回調的介面封裝在 AsyncCall 類裡面,這個類繼承了 NamedRunnable 抽象類,然後就執行 排程器的分發

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

NamedRunnable 實現了 Runnable 介面,並在 run 方法中呼叫了抽象方法 execute ,那麼也就是說 AsyncCall 的 execute 方法最終會在子執行緒中執行。

 // 實現 Runnable 介面
final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      this.responseCallback = responseCallback;
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();  //攔截器鏈
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));  //失敗的回撥
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);  //成功的回撥
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);  //結束這個 Call 
      }
    }
  }

在 AsyncCall 的 方法中,首先就是通過攔截器鏈得到響應,然後對響應進行判斷,如果成功就呼叫 responseCallback.onResponse ,失敗就呼叫就 responseCallback.onFailure 。

在 RealCall 類中一個非同步請求的處理步驟主要是分為三步:

  • 在主執行緒,client.dispatcher().executed(new AsyncCall(responseCallback)); 將 回撥介面封裝為 AsyncCall 後新增到佇列中。
  • 在run 方法中 getResponseWithInterceptorChain(),發起攔截鏈,同時得到響應後返回。
  • 在run 方法中 ,client.dispatcher().finished(this);結束這個請求。
dispatcher.enqueue(new AsyncCall(responseCallback))
synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

  /** Returns the number of running calls that share a host with {@code call}. */
  private int runningCallsForHost(AsyncCall call) {
    int result = 0;
    for (AsyncCall c : runningAsyncCalls) {
      if (c.host().equals(call.host())) result++;
    }
    return result;
  }

判斷 runningAsyncCalls 正在執行的佇列的大小是否小於最大請求數量(最大執行緒數量、 併發數量)並且 所有的 AsyncCall 請求加起來是否小於最大主機請求限制。

  • 如否

將 AsyncCalls 加入到 readyAsyncCalls,的準備佇列

  • 如是

加入到 runningAsyncCalls,正在執行的佇列中 ,並加入執行緒池執行。

最後 client.dispatcher().finished
 /** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }
  
  
  
  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();  因為 為 true 所以還要執行 promoteCalls 
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }
  
  
  
 private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

對於非同步請求的結束,首先 判斷正在執行的佇列 runningAsyncCalls 是否還有請求,有則返回繼續請求,沒有就判斷準備佇列 readyAsyncCalls 是否還有請求,沒有則返回,有則新增到正在執行的佇列,然後執行執行緒.

image.png

4.攔截器鏈

OkHttp 基本上所有的核心功能都是由攔截器鏈完成的,包括快取,網路請求獲取響應等。在前面的程式碼中可以看到對於請求的響應的獲取都是通過下面這行程式碼實現的。

  Response response = getResponseWithInterceptorChain();

下面就看攔截器鏈的具體實現。

 Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

其邏輯大致分為兩部分:

  • 建立一系列攔截器,包括使用者自定義的攔截器,並將其放入一個攔截器List中。
  • 建立一個攔截器鏈 RealInterceptorChain, 並執行攔截器鏈的 proceed 方法.
public final class RealInterceptorChain implements Interceptor.Chain {
  ...
 
  @Override public Response proceed(Request request) throws IOException {
    return proceed(request, streamAllocation, httpCodec, connection);
  }
 ...
  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
    ....
    // Call the next interceptor in the chain.
    // 獲取下一個攔截器,並呼叫其 intercept 方法。
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);
   ....
    return response;
  }
}

在攔截器的 intercept 方法裡,以 RetryAndFollowUpInterceptor 為例:

public final class RetryAndFollowUpInterceptor implements Interceptor {

  @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
     // 對請求前的處理
    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response = null;
      boolean releaseConnection = true;
      try {
      //這裡再去呼叫攔截器鏈的 proceed ,
        response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
        releaseConnection = false;
     
      }

     // 下面就是對 得到響應後的處理,這裡就省略。
  }

在 RealInterceptorChain 裡會去執行攔截器鏈的 proceed 方法。而在攔截器鏈中又會執行下一個攔截器的 intercept 方法,在下一個攔截器的 intercept 中又會去執行攔截器鏈的 proceed ,此時 index + 1 。所以整個執行鏈就在攔截器與攔截器鏈中交替執行,最終完成所有攔截器的操作。這也是 OkHttp 攔截器的鏈式執行邏輯。而一個攔截器的 intercept 方法所執行的邏輯大致分為三部分:

  • 在發起請求前對request進行處理
  • 呼叫下一個攔截器,獲取response
  • 對response進行處理,返回給上一個攔截器

這就是 OkHttp 攔截器機制的核心邏輯。所以一個網路請求實際上就是一個個攔截器執行其intercept 方法的過程。而這其中除了使用者自定義的攔截器外還有幾個核心攔截器完成了網路訪問的核心邏輯,按照先後順序依次是:

  • RetryAndFollowUpInterceptor
  • BridgeInterceptor
  • CacheInterceptor
  • ConnectIntercetor
  • CallServerInterceptor
(1)RetryAndFollowUpInterceptor
/**
 * This interceptor recovers from failures and follows redirects as necessary. It may throw an
 * {@link IOException} if the call was canceled.
 */
public final class RetryAndFollowUpInterceptor implements Interceptor {
  /**

從英文解釋就連可以看出這個攔截器主要負責失敗重傳和在必要的時候進行重定向,當一個請求由於各種原因失敗了,處理以得到新的Request,沿著攔截器鏈繼續新的Request。

(2)BridgeInterceptor

/**
 * Bridges from application code to network code. First it builds a network request from a user
 * request. Then it proceeds to call the network. Finally it builds a user response from the network
 * response.
 */
public final class BridgeInterceptor implements Interceptor {

這個攔截器作為應用程式模組程式碼和網路請求模組程式碼的橋樑,首先會從使用者的 request 構建一個真正的網路請求,然後將這個請求提交給網路請求模組,最後就從網路請求模組返回的資料構建一個 response 給使用者。

 @Override public Response intercept(Chain chain) throws IOException {
    
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }


    //到這裡網路請求前攔截的動作就已經完成,主要有:
    // 設定內容長度,內容編碼
     // 設定gzip壓縮
     //新增cookie
      //設定其他請求頭首部,如 User-Agent,Host,Keep-alive 等。其中 Keep-Alive 是實現多路複用的必要步驟
      //下面就到下一個攔截器去獲取真正的網路響應。
    
    Response networkResponse = chain.proceed(requestBuilder.build());
     
     
    // 獲取網路的響應後,在這裡也進行攔截,做一些處理,比如壓縮,新增響應頭等。
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
 
    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }

BridgeInterceptor 作為客戶端和網路請求的橋樑,在這裡將 Request 和 Response 做一個處理。主要有:

1.在請求前攔截:
  • 設定內容長度,內容編碼
  • 設定gzip壓縮
  • 新增cookie
  • 設定其他請求頭首部,如 User-Agent,Host,Keep-alive 等。其中 Keep-Alive 是實現多路複用的必要步驟
2.呼叫下一個攔截器去獲取響應
3.獲取響應後再次攔截
  • 壓縮
  • 新增/刪除響應首部欄位
(3)CacheInterceptor
/** Serves requests from the cache and writes responses to the cache. */
public final class CacheInterceptor implements Interceptor {

CacheInterceptor 主要是負責讀取快取和更新快取。

@Override public Response intercept(Chain chain) throws IOException {
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();
     
     // 獲取定義響應讀取的策略,分為僅從網路獲取響應,僅從快取獲取響應,或者網路和快取配合。
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }
   
    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    //如果指定僅從快取獲取但是快取沒有就返回一個  504 
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // If we don't need the network, we're done.
    //如果沒有指定從網路獲取並且快取不為空,那麼就將快取返回。
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    //去網路獲取響應
    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    // If we have a cache response too, then we're doing a conditional get.
    //必要的時候更新快取,並返回
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }
     
     //如果沒有快取就將這個響應寫入快取。
    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
  }

快取的主要總結步驟如下:

  • 如果指定僅從快取獲取但是快取沒有就返回一個 504
  • 如果沒有指定從網路獲取並且快取不為空,那麼就將快取返回。
  • 去網路獲取響應
  • 已經有快取並且快取需要更新的時, 更新快取,並返回
  • 如果沒有快取就將這個響應寫入快取。

快取使用的是策略模式,將快取的策略封裝在 CacheStrategy ,這個類告訴 CacheInterceptor 是使用快取還是使用網路請求 。 快取操作的定義是 介面 InternalCache ,主要操作有 put, get, 和 更新等。而具體的實現類說就是 Cache

public final class Cache implements Closeable, Flushable {
   ....

  final InternalCache internalCache = new InternalCache() {
    @Override public Response get(Request request) throws IOException {
      return Cache.this.get(request);
    }

    @Override public CacheRequest put(Response response) throws IOException {
      return Cache.this.put(response);
    }

    @Override public void remove(Request request) throws IOException {
      Cache.this.remove(request);
    }

    @Override public void update(Response cached, Response network) {
      Cache.this.update(cached, network);
    }

    @Override public void trackConditionalCacheHit() {
      Cache.this.trackConditionalCacheHit();
    }

    @Override public void trackResponse(CacheStrategy cacheStrategy) {
      Cache.this.trackResponse(cacheStrategy);
    }
  };

// 快取的核心類
  final DiskLruCache cache;

 

}

可以快取這裡的核心類是 DiskLruCache ,Cache 雖然沒有實現 InternalCache 介面嗎,當時基本上左右的具體的功能,都是由 Cache 結合 InternalCache 完成。

(4)ConnectIntercetor
/** Opens a connection to the target server and proceeds to the next interceptor. */
public final class ConnectInterceptor implements Interceptor {

這個攔截器即開啟一個連線到目標伺服器,並將這個連結提交到下一個攔截器。

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

雖然這個只有這麼點程式碼,但是實際上關於連線池的複用等功能都被上面的類封裝起來了。之所以採用複用的原因是 客戶端和伺服器建立 socket 連線需要經歷 TCP 的三次握手和四次揮手,是一種比較消耗資源的動作。Http 中有一種 keepAlive connections 的機制,在和客戶端通訊結束以後可以保持連線指定的時間。OkHttp3 支援 5 個併發 socket 連線,預設的 keepAlive 時間為 5 分鐘。這種複用的模式就是 設計模式中的享元模式。

1.StreamAllocation

這個類協調三個實體之間的關係。

  • Connections:連線遠端伺服器的物理 Socket 連線
  • Streams : 基於 Connection 的邏輯 Http 請求/響應對 一個請求/響應 對應一個 Streams . 在 Http1.x,一個 Streams 對應一個 Connections。在 Http2.0,多個 Streams 可對應一個 Connections,進行併發請求。
  • Calls : 邏輯 Stream 序列,也就是請求/響應 佇列

image.png

StreamAllocation 會通過 ConnectPool 獲取或者新生成一個 RealConnection 來得到一個連線到 Server 的 Connection 連線, 同時會生成一個 HttpCodec 用於下一個 CallServerInterceptor ,以完成最終的請求.在 newStream 方法中

public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
  
    try {
    找到一個合適的連線,可能複用已有連線也可能是重新建立的連線,返回的連線由連線池負責決定。
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, this);
     ....
  }

2.ConnectionPool
public final class ConnectionPool {
  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

  /** The maximum number of idle connections for each address. */
  private final int maxIdleConnections;
  private final long keepAliveDurationNs;
  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
        ......
    }
  };

  private final Deque<RealConnection> connections = new ArrayDeque<>();
  final RouteDatabase routeDatabase = new RouteDatabase();
  boolean cleanupRunning;
  ......
    
    /**
    *返回符合要求的可重用連線,如果沒有返回NULL
   */
  RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    ......
  }
  
  /*
  * 去除重複連線。主要針對多路複用場景下,一個 address 只需要一個連線
  */
  Socket deduplicate(Address address, StreamAllocation streamAllocation) {
    ......
    }
  
  /*
  * 將連線加入連線池
  */
  void put(RealConnection connection) {
      ......
  }
  
  /*
  * 當有連線空閒時喚起cleanup執行緒清洗連線池
  */
  boolean connectionBecameIdle(RealConnection connection) {
      ......
  }
  
  /**
   * 掃描連線池,清除空閒連線
  */
  long cleanup(long now) {
    ......
  }
  
  /*
   * 標記洩露連線
  */
  private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    ......
  }
}

ConnectionPool 內部通過一個雙端佇列 dequeue) 來維護當前所有連線,主要涉及到的操作包括:

  • put:放入新連線
  • get:從連線池中獲取連線
  • evictAll:關閉所有連線
  • connectionBecameIdle:連線變空閒後呼叫清理執行緒
  • deduplicate:清除重複的多路複用執行緒

下面就看看一個是如何找到的:

RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
 private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
 /**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   */
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      // Attempt to use an already-allocated connection.
      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      // Attempt to get a connection from the pool.
      Internal.instance.get(connectionPool, address, this, null);
      if (connection != null) {
        return connection;
      }

      selectedRoute = route;
    }

    // If we need a route, make one. This is a blocking operation.
    if (selectedRoute == null) {
      selectedRoute = routeSelector.next();
    }

    RealConnection result;
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      // Now that we have an IP address, make another attempt at getting a connection from the pool.
      // This could match due to connection coalescing.
      Internal.instance.get(connectionPool, address, this, selectedRoute);
      if (connection != null) return connection;

      // Create a connection and assign it to this allocation immediately. This makes it possible
      // for an asynchronous cancel() to interrupt the handshake we're about to do.
      route = selectedRoute;
      refusedStreamCount = 0;
      result = new RealConnection(connectionPool, selectedRoute);
      acquire(result);
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      // Pool the connection.
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    return result;
  }

上面找連線的步驟可以總結為:

  • 檢視當前 streamAllocation 是否有之前已經分配過的連線,有則直接使用
  • 從連線池中查詢可複用的連線,有則返回該連線
  • 配置路由,配置後再次從連線池中查詢是否有可複用連線,有則直接返回
  • 新建一個連線,並修改其 StreamAllocation 標記計數,將其放入連線池中
  • 檢視連線池是否有重複的多路複用連線,有則清除,一個地址只需要一個連線。

而在連線池中判斷一個連線是否可以複用的條件為:

  • 連線沒有達到共享上限
  • 非host域必須完全一樣
  • 如果此時host域也相同,則符合條件,可以被複用
  • 如果host不相同,在HTTP/2的域名切片場景下一樣可以複用.

對於連線的清楚,ConnectPool 有一個獨立的執行緒進行清理的工作:

  • 遍歷連線池中所有連線,標記洩露連線(即空閒時間即將達到5分鐘)
  • 如果被標記的連線滿足(空閒 socket 連線超過5個&& keepalive 時間大於5分鐘),就將此 連線從 Deque 中移除,並關閉連線,返回 0,也就是將要執行 wait(0),提醒立刻再次掃描
  • 如果(目前還可以塞得下5個連線,但是有可能洩漏的連線(即空閒時間即將達到5分鐘)),就返回此連線即將到期的剩餘時間,供下次清理
  • 如果(全部都是活躍的連線),就返回預設的keep-alive時間,也就是5分鐘後再執行清理。
3.RealConnection

描述一個物理 Socket 連線,連線池中維護多個 RealConnection 例項。由於Http/2支援多路複用, 一個 RealConnection 可以支援多個網路訪問請求,所以 OkHttp 又引入了 StreamAllocation 來描述一個實際的網路請求開銷(從邏輯上一個Stream對應一個Call,但在實際網路請求過程中一個Call常常涉及到多次請求。如重定向,Authenticate等場景。所以準確地說,一個 Stream 對應一次請求,而一個 Call 對應一組有邏輯關聯的 Stream ),一個 RealConnection 對應一個或多個 StreamAllocation ,所以 StreamAllocation 可以看做是 RealConenction 的計數器,當 RealConnection 的引用計數變為 0,且長時間沒有被其他請求重新佔用就將被釋放.

多路複用: 報頭壓縮:HTTP/2 使用 HPACK 壓縮格式壓縮請求和響應報頭資料,減少不必要流量開銷. 請求與響應複用:HTTP/2 通過引入新的二進位制分幀層實現了完整的請求和響應複用,客戶端和伺服器可以將 HTTP 訊息分解為互不依賴的幀,然後交錯傳送,最後再在另一端將其重新組裝 指定資料流優先順序:將 HTTP 訊息分解為很多獨立的幀之後,我們就可以複用多個數據流中的幀, 客戶端和伺服器交錯傳送和傳輸這些幀的順序就成為關鍵的效能決定因素。為了做到這一點,HTTP/2 標準允許每個資料流都有一個關聯的權重和依賴關係 流控制:HTTP/2 提供了一組簡單的構建塊,這些構建塊允許客戶端和伺服器實現其自己的資料流和連線級流控制.

4.HttpCodec

針對不同的版本,OkHttp 為我們提供了 HttpCodec1(Http1.x)和 HttpCodec2(Http2).他們就是協議的具體實現類。

(5)CallServerInterceptor(forWebSocket)
/** This is the last interceptor in the chain. It makes a network call to the server. */
public final class CallServerInterceptor implements Interceptor {

這是整個攔截鏈的最後一個攔截器,負責和伺服器傳送請求和從伺服器讀取響應, 利用 HttpCodec 完成最終請求的傳送。

到這裡整個攔截鏈的分析就到這裡,大致流程如圖,責任鏈模式在這裡就體現得十分清楚:

image.png