1. 程式人生 > >Android八門神器(一): OkHttp框架原始碼解析

Android八門神器(一): OkHttp框架原始碼解析

HTTP是我們交換資料和媒體流的現代應用網路,有效利用HTTP可以使我們節省頻寬和更快地載入資料,Square公司開源的OkHttp網路請求是有效率的HTTP客戶端。之前的知識面僅限於框架API的呼叫,接觸到實際的工作之後深知自己知識的不足,故而深挖框架原始碼盡力吸取前輩的設計經驗。關於此框架的原始碼解析網上的教程多不勝數,此文名為原始碼解析,實則是炒冷飯之作,如有錯誤之處還望各位看官指出。

攔截器

攔截器是OkHttp框架設計的精髓所在,攔截器所定義的是Request的所通過的責任鏈而不管Request的具體執行過程,並且可以讓開發人員自定義自己的攔截器功能並且插入到責任鏈中

  1. 使用者自定義的攔截器位於 OkHttpClient.addInterceptor() 新增到interceptors責任鏈中

  2. RealCall.execute()執行的時候呼叫RealCall.getResponseWithInterceptorChain()將 來自 OkHttpClient的interceptors以及預設的攔截器一併加入到RealInterceptorChain責任鏈中並呼叫, 程式碼並沒有對originalRequest進行封裝, InterceptorChain和originalRequest一併流轉到 RealInterceptorChain類中處理

    CustomInterceptor
    RetryAndFollowUpInterceptor
    BridgeInterceptor
    CacheInterceptor
    ConnectInterceptor
    NetworkInterceptors
    CallServerInterceptor
  3. RealInterceptorChain.proceed()

  4. EventListener.callStart()也是在RealCall.execute()嵌入到Request呼叫過程, EventListener.callEnd()位於StreamAllocation中呼叫

  5. Request.Builder

    • url (String/URL/HttpUrl)
    • header
    • CacheControl
    • Tag (Use this API to attach timing, debugging, or other application data to a request so that you may read it in interceptors, event listeners, or callbacks.)

BridgeInterceptor

Bridges from application code to network code. First it builds a network request from a user request. Then it proceeds to call the network. Finally it builds a user response from the network response.

此攔截器是應用碼到網路碼的橋接。它會將使用者請求封裝成一個網路請求並且執行請求,同時它還完成從網路響應到使用者響應的轉化. 最後Chain.proceed() 方法啟動攔截器責任鏈, RealInterceptorChain中通過遞迴呼叫將網路請求以及響應的任務分別分配到各個攔截器中, 然後通過ResponseBuilder.build()方法將網路響應封裝, 然後遞迴呼叫責任鏈模式使得呼叫以及Response處理的過程可以一併寫入BridgeInterceptor中

public final class RealInterceptorChain implements Interceptor.Chain {
   public Response proceed(Request request, StreamAllocation streamAllocation, 
   HttpCodec httpCodec, RealConnection connection) throws IOException {
       if (index >= interceptors.size()) throw new AssertionError();

       calls++;
       
       ...
       // Call the next interceptor in the chain.
       RealInterceptorChain next = new RealInterceptorChain(interceptors, 
          streamAllocation, httpCodec,connection, index + 1, request, call, 
          eventListener, connectTimeout, readTimeout,writeTimeout);
       Interceptor interceptor = interceptors.get(index);
       Response response = interceptor.intercept(next);
       ...
       return response;
     }
}

CallServerInterceptor

Interceptor的邏輯均在intercept()方法中實現, 在通過Chain實體類獲取到請求主題之後,通過BufferedSink介面將請求轉發到Okio介面,在攔截過程中通過EventListener介面將攔截器處理狀態(主要是RequestBodyStart和RequestBodyEnd兩個狀態)傳送出去

public final class CallServiceInterceptor implements Interceptor {
    @Override public Response intercept(Chain chain) throws IOException {
          Response.Builder responseBuilder = null;
          if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
          // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
          // Continue" response before transmitting the request body. If we don't get that, return
          // what we did get (such as a 4xx response) without ever transmitting the request body.
          if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            httpCodec.flushRequest();
            realChain.eventListener().responseHeadersStart(realChain.call());
            responseBuilder = httpCodec.readResponseHeaders(true);
          }

          if (responseBuilder == null) {
            // Write the request body if the "Expect: 100-continue" expectation was met.
            realChain.eventListener().requestBodyStart(realChain.call());
            long contentLength = request.body().contentLength();
            CountingSink requestBodyOut =
                new CountingSink(httpCodec.createRequestBody(request, contentLength));
            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

            request.body().writeTo(bufferedRequestBody);
            bufferedRequestBody.close();
            realChain.eventListener()
                .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
          } else if (!connection.isMultiplexed()) {
            // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
            // from being reused. Otherwise we're still obligated to transmit the request body to
            // leave the connection in a consistent state.
            streamAllocation.noNewStreams();
          }
        }
    }
}

CacheInterceptor

Dispatcher

Dispatcher(分離器或者複用器)是非同步網路請求呼叫時執行的策略方法, 複用器的概念十分常見,它主要的作用是輸入的各路訊號進行卷積運算,最大可能壓榨通訊的頻寬,提高資訊傳輸的效率。 OkHttp在每個分離器使用一個ExecutorService內部呼叫請求, Dispatcher內部主要並不涉及執行的具體。

複用器

  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }
  
  ...
  /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

ExecutorSevice.execute(AsyncCall)執行程式碼位於AsyncCall內部複寫的execute()方法, 方法內定義一些Callback回撥節點執行邏輯,包括使用者主動取消執行(使用retryAndFollowUpInterceptor)以及執行請求成功或者失敗時的回撥方法

final class AsyncCall extends NamedRunnable {
    ...
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }
  1. Created Lazily 成員

    • ExecutorService
    • CacheControl

WebSocket

  1. WebSocket 非同步非堵塞的web socket介面 (通過Enqueue方法來實現)

    • OkHttpClient 通過實現 WebSocket.Factory.newWebSocket 介面實現工廠構造, 通常是由 OkHttpClient來構造

    • WebSocket生命週期:

      • Connecting狀態: 每個websocket的初始狀態, 此時Message可能位於入隊狀態但是還沒有被Dispatcher處理
      • Open狀態: WebSocket已經被伺服器端接受並且Socket位於完全開放狀態, 所有Message入隊之後會即刻被處理
      • Closing狀態: WebSocket進入優雅的關閉狀態,WebSocket繼續處理已入隊的Message但拒絕新的Message入隊
      • Closed狀態: WebSocket已完成收發Message的過程, 進入完全關閉狀態
        WebSocket受到網路等各種因素影響, 可能會斷路而提前進入關閉流程
      • Canceled狀態: 被動WebSocket失敗連線為非優雅的過程, 而主動則是優雅短路過程
  2. RealWebSocket

    RealWebSocket管理著Request佇列內容所佔的空間大小以及關閉Socket之後留給優雅關閉的時間,預設為16M和60秒,在RealWebSocket.connect()方法中RealWebSocket對OkHttpClient以及Request封裝成Call的形式,然後通過Call.enqueue()方法定義呼叫成功和失敗時的Callback程式碼

    public void connect(OkHttpClient client) {
        client = client.newBuilder()
            .eventListener(EventListener.NONE)
            .protocols(ONLY_HTTP1)
            .build();
        final Request request = originalRequest.newBuilder()
            .header("Upgrade", "websocket")
            .header("Connection", "Upgrade")
            .header("Sec-WebSocket-Key", key)
            .header("Sec-WebSocket-Version", "13")
            .build();
        call = Internal.instance.newWebSocketCall(client, request);
        call.enqueue(new Callback() {
          @Override public void onResponse(Call call, Response response) {
            try {
              checkResponse(response);
            } catch (ProtocolException e) {
              failWebSocket(e, response);
              closeQuietly(response);
              return;
            }
    
            // Promote the HTTP streams into web socket streams.
            StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
            streamAllocation.noNewStreams(); // Prevent connection pooling!
            Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);
    
            // Process all web socket messages.
            try {
              listener.onOpen(RealWebSocket.this, response);
              String name = "OkHttp WebSocket " + request.url().redact();
              initReaderAndWriter(name, streams);
              streamAllocation.connection().socket().setSoTimeout(0);
              loopReader();
            } catch (Exception e) {
              failWebSocket(e, null);
            }
          }
    
          @Override public void onFailure(Call call, IOException e) {
            failWebSocket(e, null);
          }
        });
      }

    當Call請求被服務端響應的時候就將HTTP流匯入到Web Socket流中,並且呼叫WebSocketListener相對應的狀態方法, WebSocketListener狀態如下:

    onOpen()

    onMessage()

    onClosing()

    onClosed()

    onFailure()

    • WebSocket -> RealWebSocket
    • Connection -> RealConnection
    • Interceptor -> RealInterceptorChain
    • Call -> RealCall
    • ResponseBody -> RealResponseBody

Gzip壓縮機制

處理Gzip壓縮的程式碼在BridgeInterceptor中,預設情況下為gzip壓縮狀態,可以從下面的原始碼片段中獲知。如果header中沒有Accept-Encoding,預設自動新增 ,且標記變數transparentGziptrue

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

BridgeInterceptor解壓縮的過程呼叫了okio.GzipSource()方法並呼叫Okio.buffer()快取解壓過程,原始碼如下

if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }

RealCall構造方法

在RealCall構造方法上面,早期版本的RealCall構造方法中將EventListener.Factory以及EventListenerFactory.Create()分開處理導致RealCall構造方法非執行緒安全. 現在版本的RealCall的建構函式使用OkHttpClient.eventListenerFactory().create()

早期版本如下:

RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    final EventListener.Factory eventListenerFactory = client.eventListenerFactory();

    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    //重試和跟進攔截器
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);

    // TODO(jwilson): this is unsafe publication and not threadsafe.  
    // 這是不安全的釋出,不是執行緒安全的。
    this.eventListener = eventListenerFactory.create(this);
  }

現在 OkHttp 3.11.0 的RealCall原始碼如下

final class RealCall implements Call {
   private EventListener eventListener;
    ...
   private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        this.client = client;
        this.originalRequest = originalRequest;
        this.forWebSocket = forWebSocket;
        this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
   }

  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }

CacheStrategy