1. 程式人生 > >(4.2.36.4)HTTP之OkHttp(四): OkHttp原始碼解析

(4.2.36.4)HTTP之OkHttp(四): OkHttp原始碼解析

原始碼開始之前我先貼一段OkHttp請求網路的例項

OkHttpClient mOkHttpClient = new OkHttpClient();

final Request request = new Request.Builder()
		.url("https://www.jianshu.com/u/b4e69e85aef6")
		.addHeader("user_agent","22222")
		.build();
		
Call call = mOkHttpClient.newCall(request);
call.enqueue(new Callback() {
	@Override
	public void onFailure(Call call, IOException e) {

	}

	@Override
	public void onResponse(Call call, Response response) throws IOException {
		if(response != null )
		Log.i(TAG, "返回服務端資料:"+ String.valueOf(response.body().string()));
	}
});

一、OkHttp優點

OkHttp是一個高效的Http客戶端,有如下的特點:

  • 支援HTTP2/SPDY黑科技
  • socket自動選擇最好路線,並支援自動重連
  • 擁有自動維護的socket連線池,減少握手次數
  • 擁有佇列執行緒池,輕鬆寫併發
  • 擁有Interceptors輕鬆處理請求與響應(比如透明GZIP壓縮,LOGGING)
  • 基於Headers的快取策略(不僅可以快取資料,就連響應頭都給快取了)

二、原始碼涉及的主要幾個物件

  • Call:對請求的封裝,有非同步請求和同步請求。
  • Dispatcher:任務排程器
  • Connection:是RealConnection的父類介面,表示對JDK中的物理socket進行了引用計數封裝,用來控制socket連線
  • HttpCodec:對Http請求進行編碼,對Http響應進行解碼,由於Http協議有基於HTTP1.0和Http2.0的兩種情況,Http1Code代表基於Http1.0協議的方式,Http2Code代表基於Http2.0協議的方式。
  • StreamAllocation: 用來控制Connections/Streams的資源分配與釋放
  • RouteDatabase:用來儲存連線的錯誤路徑,以便能提升連線的效率。
  • RetryAndFollowUpInterceptor 負責失敗重試以及重定向的攔截器
  • BridgeInterceptor: 負責把使用者構造的請求轉換為傳送到伺服器的請求、把伺服器返回的響應轉為使用者友好的響應的
  • CacheInterceptor: 負責讀取快取直接返回、更新快取
  • ConnectInterceptor: 負責和伺服器建立連線的
  • CallServerInterceptor:負責向伺服器傳送請求資料、從伺服器讀取響應資料

2.1 OkHttp網路請求流程

Call call = mOkHttpClient.newCall(request);
call.enqueue(xxx);

  1. 首先會new一個Call物件出來,但其實真正new出來的物件是NewCall物件
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }
  1. 然後會執行call的enqueue方法

@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException(“Already Executed”); executed = true; } captureCallStackTrace(); eventListener.callStart(this); client.dispatcher().enqueue(new AsyncCall(responseCallback)); }

  1. 該方法中首先判斷請求有沒有被執行,如果請求已經執行,那麼直接丟擲異常,如果請求沒有執行,就會執行Dispatcher物件的enqueue方法

2.2 Dispatcher任務排程

Dispatcher的各個引數的說明如下

//支援的最大併發請求數量
private int maxRequests = 64;
//每個主機的最大請求數量
private int maxRequestsPerHost = 5;

//請求執行緒池
private @Nullable ExecutorService executorService;

//將要執行的非同步請求佇列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

//正在執行的非同步請求佇列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

//正在執行的同步請求佇列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();


2.2.1 Dispatcher的enqueue方法

回到之前說的,call的enqueue方法其實執行的使Dispatcher的enqueue方法,Dispatcher之後會把call放進請求佇列中,最終執行由執行緒池來執行請求任務。 }

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

  }

如果正在執行的非同步請求數量小於最大的併發數,且正在執行的客戶端實際數量請求小於規定的每個主機最大請求數量,那麼就把該請求放進正在執行的非同步請求佇列中,否則就把該請求放進將要執行的非同步請求佇列中。

繼續看看Dispatcher的executorService方法,如下:

public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

Dispatcher初始化了一個執行緒池,核心執行緒的數量為0 ,最大的執行緒數量為Integer.MAX_VALUE,空閒執行緒存在的最大時間為60秒,這個執行緒類似於CacheThreadPool,比較適合執行大量的耗時比較少的任務。同時我們Dispatcher也可以來設定自己執行緒池。

2.2.2 RealCall執行任務

下面來看看RealCall裡究竟執行了什麼任務:

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
       ...
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

  1. RealCall通過執行getResponseWithInterceptorChain()返回Response,如果請求被取消則在進行OnFailue回撥,如果請求成功則進行onResponse的回撥。
    1. 請求如果被取消,其回撥實在onFailue中進行回撥的
    2. enqueue方法的回撥是在子執行緒中完成的
  2. client.dispatcher().finished(this); 執行完成後移除任務,並觸發 readyAsyncCalls 的執行

2.2.3 攔截器

那麼RealCall 的getResponseWithInterceptorChain方法中究竟幹了些什麼呢,它是如何返回Response的呢?

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());//1
    interceptors.add(retryAndFollowUpInterceptor);//2
    interceptors.add(new BridgeInterceptor(client.cookieJar()));//3
    interceptors.add(new CacheInterceptor(client.internalCache()));//4
    interceptors.add(new ConnectInterceptor(client));//5
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());//6 
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));//7

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);//8
  }

  1. 在配置 OkHttpClient 時設定的 interceptors ()
  2. 負責失敗重試以及重定向的RetryAndFollowUpInterceptor
  3. 負責把使用者構造的請求轉換為傳送到伺服器的請求、把伺服器返回的響應轉為使用者友好的響應的 BridgeInterceptor
  4. 負責讀取快取直接返回、更新快取的 CacheInterceptor
  5. 負責和伺服器建立連線的 ConnectInterceptor
  6. 配置 OkHttpClient 時設定的 networkInterceptors
  7. 負責向伺服器傳送請求資料、從伺服器讀取響應資料的 CallServerInterceptor
  8. 在 return chain.proceed(originalRequest),中開啟鏈式呼叫

RealInterceptorChain的proceed方法原始碼如下:

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
}

public interface Interceptor {
  Response intercept(Chain chain) throws IOException;

   interface Chain {
		Request request();

		Response proceed(Request request) throws IOException;

	}
}


首先來了解一下攔截器吧,攔截器是一種能夠監控、重寫,重試呼叫的機制。通常情況下,攔截器用來新增、移除、轉換請求和響應的頭部資訊。比如將域名替換為IP地址,在請求頭中移除新增host屬性;也可以新增我們應用中的一些公共引數,比如裝置id、版本號,等等。

事實上OkHttp就是通過定義許多攔截器一步一步地對Request進行攔截處理(從頭至尾),直到請求返回網路資料,後面又倒過來,一步一步地對Response進行攔截處理,最後攔截的結果就是回撥的最終Response。(從尾至頭)

回頭再看RealInterceptorChain的proceed方法,通過順序地傳入一個攔截器的集合,建立一個RealInterceptorChain,然後拿到之前OkHttp建立的各種攔截器,並呼叫其interrupt方法,並返回Response物件。其呼叫順序如下:

在這裡插入圖片描述 【圖】

RetryAndFollowUpInterceptor:進行連線失敗重新連線,以及重定向

@Override public Response intercept(Chain chain) throws IOException {
	Request request = chain.request();
	RealInterceptorChain realChain = (RealInterceptorChain) chain;
	Call call = realChain.call();
	...

	followUpCount = 0;
	Response priorResponse = null;
	while (true) {
		if (canceled) {
		  streamAllocation.release();
		  throw new IOException("Canceled");
		}

		Response response;
		boolean releaseConnection = true;
		try {
		  response = realChain.proceed(request, streamAllocation, null, null);//【遞迴】
		...

		Request followUp = followUpRequest(response);   
		if (followUp == null) {
			  if (!forWebSocket) {
				streamAllocation.release();
		}
		  return response;
		}

		...

		 if (++followUpCount > MAX_FOLLOW_UPS) {
		  streamAllocation.release();
		 throw new ProtocolException("Too many follow-up requests: " + followUpCount); }
		...
			
		 request = followUp;
		  priorResponse = response;
		  }
	}
}	

整段程式碼就是在一個死迴圈

  1. 可以看出重連線的次數最多為20次
  2. 重定向功能的邏輯在followUpRequest方法中,這個方法會根據響應頭中的location欄位獲取重定向的url,並通過requestBuilder重新new一個Request物件,並改變request的response的值,然後重新進行攔截。

BridgeInterceptor:對請求頭和響應頭進行修改

主要流程如下

1、獲取請求,請求的body,根據body的長度進行相關的 頭

2、其他一些頭的設定,如"User-Agent"、"Connection"等

3、呼叫chain.proceed(requestBuilder.build());傳遞給下一個攔截器進行處理,並返回Response

4、是否有cookie,響應是否使用了gzip,如果有的話重新設定響應的body

這樣BridgeInterceptor就執行完了,從程式碼中也可以看到,他主要是在請求之前對響應頭做了一些檢查,並新增一些頭,然後在請求之後對響應做一些處理

public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();
 
    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }
 
      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }
 
    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }
 
    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }
 
    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }
 
    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }
 
    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }
 
    Response networkResponse = chain.proceed(requestBuilder.build());
 
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
 
    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);
 
    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
    }
 
    return responseBuilder.build();
  }

CacheInterceptor:讀取快取和更新快取的操作

具體流程大概如下:

  1. 首先判斷使用者是否有設定cache,如果有的話,則從使用者的cache中獲取當前請求的快取,使用者可以通過如下方式設定cache
    • InternalCache是不能直接使用的,而Cache可以直接使用,它裡面自己實現了InternalCache
  2. 然後根據前面是否有快取已經當前請求構造一個CacheStrategy,它的networkRequest代表當前請求,cacheResponse代表當前快取響應
  3. 如果networkRequest為空則說明不需要網路請求,直接返回當前快取
  4. 呼叫 networkResponse = chain.proceed(networkRequest)處理當前請求
  5. 如果快取不為空,呼叫validate進行驗證,是否需要更新快取
  6. 如果快取為空,則儲存當前快取
public Response intercept(Chain chain) throws IOException {
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;
 
    long now = System.currentTimeMillis();
 
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;
 
    if (cache != null) {
      cache.trackResponse(strategy);
    }
 
    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }
 
    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_BODY)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }
 
    // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }
 
    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }
 
    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (validate(cacheResponse, networkResponse)) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();
 
        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }
 
    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();
 
    if (HttpHeaders.hasBody(response)) {
      CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
      response = cacheWritingResponse(cacheRequest, response);
    }
 
    return response;
  }

ConnectInterceptor:與伺服器進行連線

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }


  1. 首先獲取前面建立的StreamAllocation,然後呼叫它的newStream建立一條連線
  2. 把前面建立的RealConnection作為引數繼續呼叫

實際上建立連線就是建立了一個 HttpCodec 物件,它將在後面的步驟中被使用,那它又是何方神聖呢?它是對 HTTP 協議操作的抽象,有兩個實現:Http1Codec和 Http2Codec,顧名思義,它們分別對應 HTTP/1.1 和 HTTP/2 版本的實現。

在 Http1Codec中,它利用 Okio 對 Socket 的讀寫操作進行封裝,它對 java.io和 java.nio 進行了封裝,讓我們更便捷高效的進行 IO 操作。

而建立 HttpCodec 物件的過程涉及到 StreamAllocation、RealConnection程式碼較長,這個過程概括來說

就是找到一個可用的 RealConnection,再利用 RealConnection 的輸入輸出(BufferedSource 和 BufferedSink)建立 HttpCodec 物件,供後續步驟使用。

CallServerInterceptor:傳送請求和接收資料

 @Override public Response intercept(Chain chain) throws IOException {
    HttpStream httpStream = ((RealInterceptorChain) chain).httpStream();
    StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
    Request request = chain.request();
 
    long sentRequestMillis = System.currentTimeMillis();
    httpStream.writeRequestHeaders(request);
 
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      Sink requestBodyOut = httpStream.createRequestBody(request, request.body().contentLength());
      BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
      request.body().writeTo(bufferedRequestBody);
      bufferedRequestBody.close();
    }
 
    httpStream.finishRequest();
 
    Response response = httpStream.readResponseHeaders()
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
 
    if (!forWebSocket || response.code() != 101) {
      response = response.newBuilder()
          .body(httpStream.openResponseBody(response))
          .build();
    }
 
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }
 
    int code = response.code();
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }
 
    return response;
  }

  1. 首先會寫請求頭
  2. 檢視是否運行當前方法帶body,執行則寫body
  3. 呼叫httpStream.finishRequest();重新整理
  4. 讀取響應頭和body並設定Response
  5. 是否需要關閉連線
  6. 返回Response

總結

這裡我們可以看到,核心工作都由 HttpCodec 物件完成,而 HttpCodec 實際上利用的是 Okio,而 Okio 實際上還是用的 Socket,所以沒什麼神祕的,只不過一層套一層,層數有點多。

其實 Interceptor的設計也是一種分層的思想,每個 Interceptor 就是一層。為什麼要套這麼多層呢?分層的思想在 TCP/IP 協議中就體現得淋漓盡致,分層簡化了每一層的邏輯,每層只需要關注自己的責任(單一原則思想也在此體現),而各層之間通過約定的介面/協議進行合作(面向介面程式設計思想),共同完成複雜的任務,這是典型的責任鏈設計模式

在這裡插入圖片描述 【okhttp_full_process.png】

2.3 OkHttp的複用連線池

Http有一種叫做keepalive connections的機制,而okHttp支援5個併發socket連線,預設keepalive時間為5分鐘,接下來我們學習okHttp是如何複用連線的。

2.3.1 主要變數與構造方法

連線池的類位於okHttp.ConnectionPool,它的主要變數如下:


public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }


private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

/** The maximum number of idle connections for each address. */
private final int maxIdleConnections;
private final long keepAliveDurationNs;
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
  while (true) {
	long waitNanos = cleanup(System.nanoTime());
	if (waitNanos == -1) return;
	if (waitNanos > 0) {
	  long waitMillis = waitNanos / 1000000L;
	  waitNanos -= (waitMillis * 1000000L);
	  synchronized (ConnectionPool.this) {
		try {
		  ConnectionPool.this.wait(waitMillis, (int) waitNanos);
		} catch (InterruptedException ignored) {
		}
	  }
	}
  }
}
};

private final Deque<RealConnection> connections = new ArrayDeque<>();//
final RouteDatabase routeDatabase = new RouteDatabase();
boolean cleanupRunning;

通過構造方法可以看出CollectionPool預設空閒的socket最大連線數為5個,socket的keepalive時間為5分鐘。CollectionPool實在OkHttpClient例項化的時候建立的

主要變數說明一下:

  • executor執行緒池:類似於CachedThreadPool,需要注意的是這種執行緒池的工作佇列採用了沒有容量的SynchronousQueue。
  • Deque 雙向佇列:雙端佇列同時具有佇列和棧的性質,經常在快取中被使用,裡面維護了RealConnection也就是Socket物理連線的包裝。
  • RouteDatabase :它用來記錄連線失敗的路線名單,當連線失敗時就會把失敗的路線加進去。

2.3.2 快取操作

ConnectionPool提供對Deque進行操作的方法分別為put,get,connectionBecameIdle和evictAll

這幾個操作,分別對應放入連線,獲取連線,移除連線和移除所有連線操作

只舉例說明put和get操作。

void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

再新增到Deque之前首先要清理空閒執行緒,這個後面會講到。再來看看get操作:

  @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }

遍歷connections快取列表。當某個連線計數小於限制的大小,並且request的地址和快取列表中此連線的地址完全匹配時,則直接複用快取列表中的connection作為request的連線。

2.3.3 自動回收連線

OkHttp時根據StreamAllocation引用計數是否為0來實現自動回收連線的。我們在put操作前首先要呼叫executor.execute(cleanupRunnable)來清理閒置的執行緒。

我們來檢視cleanupRunnable到底做了什麼?

執行緒不斷地呼叫clearup方法進行清理,並返回下次需要清理的間隔時間,然後呼叫wait方法進行等待以釋放鎖與時間片。當等待時間到了後,再次進行清理,並返回下次需要清理的間隔時間,如此迴圈下去

接下來看看clearup方法,如下所示:

long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

   
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

     
        if (pruneAndGetAllocationCount(connection, now) > 0) {//註釋<1>
          inUseConnectionCount++;
          continue;
        }

        idleConnectionCount++;

      
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {//註釋<2>
     
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
    
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
    
        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use.
        cleanupRunning = false;
        return -1;//註釋<3>
      }
    }

    closeQuietly(longestIdleConnection.socket());
    return 0;
  }

clearup方法所做的事情非常簡單總結就是,根據連線中的引用計數來計算空閒連線數和活躍連線數,然後標記空閒的連線。

  1. 註釋<2>:如果空閒連線keepAlive時間超過5分鐘,或者空閒連線數超過5個,則從Deque中移除此連線。接下來更具空閒連線或者活躍連線來返回下次需要清理的時間數:
    1. 如果空閒連線大於0,則返回此連線即將到期的時間;
    2. 如果都是活躍連線且大於0,則返回預設的keepAlive時間5分鐘;
  2. 註釋<3>:如果沒有任何連線,則跳出迴圈並返回-1;
  3. 註釋<1>:通過pruneAndGetAllocationCount方法來判斷連線是否閒置。如果pruneAndGetAllocationCount方法的返回值大於0則是活躍連線,否則就是空閒連線。
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);

      if (reference.get() != null) {
        i++;
        continue;
      }

      // We've discovered a leaked allocation. This is an application bug.
      StreamAllocation.StreamAllocationReference streamAllocRef =
          (StreamAllocation.StreamAllocationReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

      references.remove(i);
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {//註釋<1>
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }

    return references.size();
  }

2.3.4 引用計數

在OkHttp的高層程式碼呼叫中,使用了類似於引用計數的方式跟蹤socket流的呼叫。這裡的計數物件是StreamAllocation,它被反覆執行acquire和release操作,這兩個方法其實是在改變RealConnection中 List<Reference>的大小。acquire方法和release方法,如下所示:

public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();

    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }

RealConnection是socket物理連線的包裝,它裡面維護了 List<Reference>的引用。List中StreamAllocation的數量也是socket被引用的計數。如果計數為0,則說明此連線沒有被複用,也就是空閒的,需要通過下文的演算法實現回收;如果計數不為0,則表示上層程式碼仍然在引用,就無需關閉連線。

2.3.5 總結

可以看出此連線池複用的核心就是用Deque來儲存連線,通過put,getconnectionBecameIdle和evictAll幾個操作來對Deque進行操作,另外通過判斷連線中的計數物件StreamAllocation來進行自動回收連線。

參考文獻