1. 程式人生 > >OkHttp 原始碼分析(五)——ConnectInterceptor

OkHttp 原始碼分析(五)——ConnectInterceptor

0、前言

前面的文章中,我們分析了http的快取策略和Okhttp快取攔截器的快取機制,我們知道,在沒有快取命中的情況下,需要對網路資源進行請求,這時候攔截鏈就來到ConnectInterceptor。

ConnectInterceptor的主要作用是和伺服器建立連線,在連線建立後通過okio獲取通向服務端的輸入流Source和輸出流Sink。

1、原始碼分析

public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

ConnectInterceptor攔截器的intercept方法很簡單,首先拿到StreamAllocation物件streamAllocation,這個物件在RetryAndFollowUpInterceptor攔截器中已經初始化好了,這在RetryAndFollowUpInterceptor攔截器分析中提到過。在拿到streamAllocation後,後續的連線建立工作都交給streamAllocation來完成。

ConnectionPool

我們知道http連線的需要三次握手、四次揮手操作,如果每次請求進行握手連線和揮手釋放資源,則會消耗很多不必要的時間。針對這種情況,Http協議提供了keep-alive的請求頭欄位,來保持長連線,以便下次進行請求時不必進行重新握手操作。OkHttp為了方便管理所有連線,採用連線池ConnectionPool。

ConnectionPool的主要功能就是為了降低由於頻繁建立連線導致的網路延遲。它實現了複用連線的策略。ConnectionPool雙端佇列Deque<RealConnection>來儲存它所管理的所有RealConnection

private final Deque<RealConnection> connections = new ArrayDeque<>();

ConnectionPool會對連線池中的最大空閒連線以及連線的保活時間進行控制,maxIdleConnections和keepAliveDurationNs成員分別體現對最大空閒連線數及連線保活時間的控制。ConnectionPool的初始化由OkhttpClient的Builder類完成,預設最大空閒連線數為5、保活時間5分鐘。此外,我們也可以初始化OkHttpClient自定義ConnectionPool。ConnectionPool有提供put、get、evictAll等操作,但對連線池的操作,是通過Internal.instance進行的。

public abstract class Internal {

  public static void initializeInstanceForTests() {
    // Needed in tests to ensure that the instance is actually pointing to something.
    new OkHttpClient();
  }

  public static Internal instance;
  ...//
}

StreamAllocation

StreamAllocation是用來建立執行HTTP請求所需網路設施的元件,如其名字所顯示的那樣,分配Stream。StreamAllocation的建構函式如下:

public StreamAllocation(ConnectionPool connectionPool, Address address, Call call,
      EventListener eventListener, Object callStackTrace) {
    this.connectionPool = connectionPool;
    this.address = address;
    this.call = call;
    this.eventListener = eventListener;
    this.routeSelector = new RouteSelector(address, routeDatabase(), call, eventListener);
    this.callStackTrace = callStackTrace;
  }

回到上面的程式碼StreamAllocation.newStream方法:

public HttpCodec newStream(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

這個方法程式碼也不多,主要通過findHealthyConnection去查詢是否有可服用的連線,有則複用,無則返回一個新建的連線:

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
      boolean doExtensiveHealthChecks) throws IOException {
    //死迴圈獲取一個連線
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);
      //連線池同步獲取,上面找到的連線是否是一個新的連線,如果是的話,就直接返回了,就是我們需要找
    // 的連線了
      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }
      //如果不是一個新的連線,判斷是否一個健康的連線。
      //不健康的RealConnection條件為如下幾種情況: 
      //RealConnection物件 socket沒有關閉 
      //socket的輸入流沒有關閉 
      //socket的輸出流沒有關閉 
      //http2時連線沒有關閉 
      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }

繼續看findConnection方法:

 private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    Connection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      // Attempt to use an already-allocated connection. We need to be careful here because our
      // already-allocated connection may have been restricted from creating new streams.
      releasedConnection = this.connection;
      toClose = releaseIfNoNewStreams();
      if (this.connection != null) {
        // We had an already-allocated connection and it's good.
        result = this.connection;
        releasedConnection = null;
      }
      if (!reportedAcquired) {
        // If the connection was never reported acquired, don't report it as released!
        releasedConnection = null;
      }

      if (result == null) {
        // Attempt to get a connection from the pool.
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;
        }
      }
    }
    closeQuietly(toClose);

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
    }
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      return result;
    }

    // If we need a route selection, make one. This is a blocking operation.
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        List<Route> routes = routeSelection.getAll();
        for (int i = 0, size = routes.size(); i < size; i++) {
          Route route = routes.get(i);
          Internal.instance.get(connectionPool, address, this, route);
          if (connection != null) {
            foundPooledConnection = true;
            result = connection;
            this.route = route;
            break;
          }
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        route = selectedRoute;
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        acquire(result, false);
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      reportedAcquired = true;

      // Pool the connection.
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
  }

findConnection這個方法程式碼比較長,大致流程如下:

  1. 判斷streamAllocation物件內部是否有可複用的connection物件;

  2. 如果streamAllocation物件無可用的connection物件,則通過Internal.instance從連線池中查詢可用的connection物件;

  3. 如果連線池中仍未找到,則遍歷所有路由路徑,嘗試再次從ConnectionPool中尋找可複用的連線;

  4. 前面三步都未找到,則新建一個連線,進行TCP + TLS握手

  5. 將新建的連線放入連線池中,並返回結果

 上面關鍵的步驟在於通過Internal.instance的get方法在連線池中查詢可複用的連線,上面提到了連線池的操作是通過Internal.instance進行的,Internal.instance的get方法最終呼叫的是ConnectionPool的get方法:

RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }

ConnectionPool的get方法遍歷connections佇列,通過isEligible檢測連線是否可複用,可複用則通過streamAllocation的acquire方法繫結:

public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();

    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }

isEligible方法判斷連線是否可用:

 public boolean isEligible(Address address, @Nullable Route route) {
    // 如果這個連線不接受新的流,返回false.
    if (allocations.size() >= allocationLimit || noNewStreams) return false;

    //判斷host是否匹配
    // If the non-host fields of the address don't overlap, we're done.
    if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;

    // If the host exactly matches, we're done: this connection can carry the address.
    if (address.url().host().equals(this.route().address().url().host())) {
      return true; // This connection is a perfect match.
    }
    
    // At this point we don't have a hostname match. But we still be able to carry the request if
    // our connection coalescing requirements are met. See also:
    // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
    // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
    //上面條件不滿足,判斷是否http2
    // 1. This connection must be HTTP/2.
    if (http2Connection == null) return false;

    // 2. 路由必須共享一個IP地址。這要求我們為兩個主機都有DNS地址,這隻發生在路由規劃之後。我們            
    //不能合併使用代理的連線,因為代理沒有告訴我們源伺服器的IP地址。
    if (route == null) return false;
    if (route.proxy().type() != Proxy.Type.DIRECT) return false;
    if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
    if (!this.route.socketAddress().equals(route.socketAddress())) return false;

    // 3. 此連線的伺服器證書必須覆蓋新主機
    if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
    if (!supportsUrl(address.url())) return false;

    // 4. Certificate pinning must match the host.
    try {
      address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
    } catch (SSLPeerUnverifiedException e) {
      return false;
    }

    return true; // The caller's address can be carried by this connection.
  }

新建流(HttpCodec)

回到StreamAllocation的newStream方法,在獲取到Connection物件後,接下來就是新建流,即HttpCodec物件。HttpCodec物件是封裝了底層IO的可以直接用來收發資料的元件(依賴okio庫),它會將請求的資料序列化之後傳送到網路,並將接收的資料反序列化為應用程式方便操作的格式。

public interface HttpCodec {
  /**
   * The timeout to use while discarding a stream of input data. Since this is used for connection
   * reuse, this timeout should be significantly less than the time it takes to establish a new
   * connection.
   */
  int DISCARD_STREAM_TIMEOUT_MILLIS = 100;

  /** Returns an output stream where the request body can be streamed. */
  Sink createRequestBody(Request request, long contentLength);

  /** This should update the HTTP engine's sentRequestMillis field. */
  void writeRequestHeaders(Request request) throws IOException;

  /** Flush the request to the underlying socket. */
  void flushRequest() throws IOException;

  /** Flush the request to the underlying socket and signal no more bytes will be transmitted. */
  void finishRequest() throws IOException;

  /**
   * Parses bytes of a response header from an HTTP transport.
   *
   * @param expectContinue true to return null if this is an intermediate response with a "100"
   *     response code. Otherwise this method never returns null.
   */
  Response.Builder readResponseHeaders(boolean expectContinue) throws IOException;

  /** Returns a stream that reads the response body. */
  ResponseBody openResponseBody(Response response) throws IOException;

  /**
   * Cancel this stream. Resources held by this stream will be cleaned up, though not synchronously.
   * That may happen later by the connection pool thread.
   */
  void cancel();
}

HttpCodec作用:

  • 建立請求體,以用於傳送請求體資料。

  • 寫入請求頭

  • 結束請求傳送

  • 讀取響應頭部。

  • 開啟請求體,以用於後續獲取請求體資料。

  • 取消請求執行

 拿到HttpCodec物件後,回到攔截器處,將