1. 程式人生 > >OkHttp原始碼詳解之二完結篇

OkHttp原始碼詳解之二完結篇

1. 請大家思考幾個問題

在開始本文之前,請大家思考如下幾個問題。並請大家帶著這幾個問題,去本文尋找答案。如果你對下面幾個問題的答案瞭如指掌那本文可以略過不看

  1. 在瀏覽器中輸入一個網址,按回車後發生了什麼?
  2. Okhttp的TCP連線建立發生在什麼時候?
  3. Okhttp的Request請求發生在什麼時候?
  4. Okio與Okhttp是在什麼時候發生關聯的?
  5. Okhttp的Interceptor和Chain是怎麼串聯起來的?
  6. Okhttp同步請求和非同步請求分別是怎麼實現的。非同步請求有限制嗎?

本文將圍繞以上6個問題對Okhttp做一個簡單的講解,如有遺漏的知識點,歡迎在評論區指出,一起探討成長。

2. Http請求的流程

首先來回答第一個問題“在瀏覽器中輸入一個網址,按回車後發生了什麼?”。Http權威指南一書給出的答案是發生了7件事情

  1. 瀏覽器從url中解析出主機名
  2. 瀏覽器將伺服器的主機名轉換成伺服器的IP地址
  3. 瀏覽器將埠號從url中解析出來
  4. 瀏覽器建立一條與Web伺服器的TCP連線
  5. 瀏覽器向伺服器傳送一條Http請求報文
  6. 伺服器向瀏覽器回送一條Http響應報文
  7. 關閉連線,瀏覽器顯示文件

以上七步是每一個Http請求必須要做的事情,Okhttp庫要實現Http請求也不例外。這七個步驟的每一步,在Okhttp中都有體現。

  1. HttpUrl類負責步驟1和步驟3的主機名和埠解析
  2. Dns介面的具體實現負責步驟2的實現
  3. RealConnection類就是步驟4中的那條TCP連線
  4. CallServerInterceptor攔截器的intercept方法負責步驟5和步驟6的傳送報文和接收報文
  5. ConnectPool連線池中提供了關閉TCP連線的方法
  6. Okio在哪操作?當然是往Socket的OutputStream寫請求報文和從Socket的InputStram讀取響應報文了

至此文章開頭提出的6個問題,前5個都已經有了一個簡單的回答。至於第六個問題,非同步在Okhttp中用的快取執行緒池,理論上快取執行緒池,是當有任務到來,就會從執行緒池中取一個空閒的執行緒或者新建執行緒來處理任務,而且快取執行緒池的執行緒數是Integer.MAX_VALUE。理論上是沒有限制的。但是Dispatcher類線上程池的基礎上,做了強制限制,最多可以同時處理的網路請求數64個,對於同一個主機名,最多可以同時處理5個網路請求。接下來我帶大家從原始碼的角度來尋找這幾個問題的答案

3.初識Okhttp

首先我們來寫兩個簡單的例子來使用Okhttp完成get和post請求。

同步get請求

OkHttpClient client = new OkHttpClient();
String run(String url) throws IOException {
  Request request = new Request.Builder()
      .url(url)
      .build();

  Response response = client.newCall(request).execute();
  return response.body().string();
}

同步post請求

public static final MediaType JSON
    = MediaType.parse("application/json; charset=utf-8");

OkHttpClient client = new OkHttpClient();

String post(String url, String json) throws IOException {
  RequestBody body = RequestBody.create(JSON, json);
  Request request = new Request.Builder()
      .url(url)
      .post(body)
      .build();
  Response response = client.newCall(request).execute();
  return response.body().string();
}

非同步get請求

OkHttpClient client = new OkHttpClient();

void run(String url) throws IOException {
    Request request = new Request.Builder()
            .url(url)
            .build();

    client.newCall(request).enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {

        }

        @Override
        public void onResponse(Call call, Response response) throws IOException {
            System.out.println(response.body().string());
        }
    });
}

非同步post請求

public static final MediaType JSON
            = MediaType.parse("application/json; charset=utf-8");

    OkHttpClient client = new OkHttpClient();

    void post(String url, String json) throws IOException {
        RequestBody body = RequestBody.create(JSON, json);
        Request request = new Request.Builder()
                .url(url)
                .post(body)
                .build();
        client.newCall(request).enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {

            }
        });
    }

Okhttp完成一次網路請求,過程如下

  1. 建立一個OkHttpClient物件client
  2. 建立一個Request物件request,並設定url
  3. 呼叫client.newCall(request)生成一個Call物件call
  4. 呼叫call.execute()[同步呼叫]或者call.enqueue(callback)[非同步呼叫]完成網路請求

4. 同步和非同步網路請求過程

同步呼叫過程如下

OkHttpClient.newCall(Request r)  => RealCall.execute() => RealCall.getResponseWithInterceptorChain()

非同步呼叫過程如下

OkHttpClient.newCall(Request r) => RealCall.enqueue(Callback) => Dispatcher.enqueue(AsyncCall)
    => 執行緒池執行AsyncCall =>AsyncCall.execute() => RealCall.getResponseWithInterceptorChain()

從上圖可以看出非同步呼叫比同步呼叫步驟更長,也更復雜。在這裡我把整個呼叫過程分成兩個階段,RealCall.getResponseWithInterceptorChain()稱作“網路請求執行階段”,其餘部分稱作“網路請求準備階段”。由於“網路請求執行階段”涉及到鏈式(Chain)呼叫以及各種攔截器的執行比“網路請求準備階段”複雜太多。所以我們先來看“網路請求準備階段”,這個階段也需要分成同步和非同步兩種方式來講解

首先我們來看下準備階段的公共呼叫部分OkHttpClient.newCall(Request r)

 @Override public Call newCall(Request request) {
    return new RealCall(this, request, false /* for web socket */);
  }

Call是什麼,在我看來Call可以理解成是對網路請求封裝,它可以執行網路請求,也可以取消網路請求。我們看下Call的類定義

public interface Call extends Cloneable {

  Request request();

  Response execute() throws IOException;


  void enqueue(Callback responseCallback);


  void cancel();


  boolean isExecuted();

  boolean isCanceled();


  Call clone();

  interface Factory {
    Call newCall(Request request);
  }
}
  1. request()返回的Request物件
  2. execute()同步執行網路操作
  3. enqueue(Callback responseCallback)非同步執行網路操作
  4. cancel()取消網路操作

RealCall是Call的實現類。我們重點來看下execute()和enqueue(Callback responseCallback)

execute()方法

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      //這裡只是把call存到Dispatcher的列表中
      client.dispatcher().executed(this);
      //真正執行網路操作
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }

enqueue(Callback callback)

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

我們注意到AsyncCall類,首先它是RealCall類的非靜態內部類,它持有RealCall的物件,它可以做任何RealCall能做的事情。同時它繼承自NamedRunnable。NamedRunnable是Ruannable的子類。它主要有兩個功能,1.可以被執行緒池執行 2.修改當前執行執行緒的執行緒名(方便debug)。由於這幾個類都比較簡單,而且篇幅有限,這裡就不貼程式碼了。請自行查閱

當然到這裡非同步執行的準備階段並沒有結束,它是怎麼被子執行緒執行的呢。這裡我們就需要在Dispatcher類中尋找答案了。Dispatcher是幹嘛用的,它的功能就是負責分發使用者建立的網路請求,以及控制網路請求的個數,以及上一個網路請求結束後,要不要執行等待中的網路請求。下面我們來看下Dispatcher都有哪些成員變數,以及這些成員變數的作用

public final class Dispatcher {
  //最多可以同時請求數量
  private int maxRequests = 64;
  //每個host最大同時請求數量
  private int maxRequestsPerHost = 5;
  //當沒有任務執行的回撥 比如說執行10個任務,10個任務都執行完了會回撥該介面
  private Runnable idleCallback;

  /** Executes calls. Created lazily. */
  //執行緒池 用的是快取執行緒池(提高吞吐量,並且能及時回收無用的執行緒)
  private ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  //等待的非同步任務佇列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  //正在執行的非同步任務佇列 這個放的是Runnable
  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  //正在執行的同步任務佇列 這個儲存的是RealCall
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

 }

總結下幾個比較重要的概念

  1. executorService 執行緒池並且是快取執行緒池,最最大程度提高吞吐量,並且能回收空閒的執行緒
  2. Deque readyAsyncCalls非同步請求等待佇列,當非同步請求數超過64個,或者單個主機名請求超過5個,網路任務(AsyncCall)會放到該佇列裡。當任務執行完畢,會呼叫promoteCalls()把滿足條件的網路任務(AsyncCall)放到執行緒池中執行
  3. Deque runningAsyncCalls 正在執行的非同步任務佇列
  4. Deque runningSyncCalls 正在執行的同步任務佇列

緊接著我們來看下Dispatcher的enqueue(AsyncCall call)

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
     //如果請求沒有超過限制 用執行緒池執行網路請求
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      //如果超過了限制,把請求放到非同步等待佇列中去,什麼時候被喚醒?finished後
      readyAsyncCalls.add(call);
    }
  }

通過執行緒池執行AsyncCall 最終呼叫的是AsyncCall的run(),而run()中呼叫的是AsyncCall的execute()

@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

在execute()中呼叫了getResponseWithInterceptorChain(),也就是我們前面說的“網路請求執行階段”。至此,同步和非同步網路請求的“網路請求準備階段”就講解完了,接下來我們講解“網路請求執行階段”

5. Interceptor和Chain

Response getResponseWithInterceptorChain() throws IOException {
    List<Interceptor> interceptors = new ArrayList<>();
    //使用者自定義的攔截器
    interceptors.addAll(client.interceptors());
    //重試和重定向攔截器
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //快取攔截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //連線攔截器
    interceptors.add(new ConnectInterceptor(client));
    //使用者自定義的網路攔截器
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    //真正請求伺服器的攔截器
    interceptors.add(new CallServerInterceptor(forWebSocket));

    //用Chain把List中的interceptors串起來執行
    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

要理解上面的程式碼。我們需要搞清楚兩個概念 Interceptor和Chain。我們先來看下它們的定義

public interface Interceptor {
  Response intercept(Chain chain) throws IOException;

  interface Chain {
    Request request();

    Response proceed(Request request) throws IOException;

    Connection connection();
  }
}

可以舉一個比較形象的例子來解釋它們的關係。就以我們IT行業來講。老闆說想要一個APP,然後把需求轉化成任務流轉給產品經理,產品經理構思了下,然後把任務流轉給設計師,設計師一通設計之後,把設計圖流轉給碼農,碼農接到任務後就開始加班加點的編碼,最終寫出了APP,然後把APP交給設計師,讓設計師檢視介面是否美觀,設計師再把APP流轉給產品經理,產品經理覺得很滿意,最終把APP交付給老闆。老闆看了很滿意,說大傢伙晚上加個雞腿。雖然實際生產中並不是這樣的一個流程,想了很久覺得還是這樣講,更好理解。對應到這個例子中,產品經理、設計師、程式設計師他們都是真正幹活的傢伙,他們對應的是Interceptor。Chain是什麼,是老闆嗎?不是!!Chain只是一套規則,對應的就是例子裡的流轉流程。

image

image

對照圖片,“網路請求執行階段”會依次執行Interceptors裡的Interceptor。Interceptor執行分成三步。第一步:處理請求 第二步:執行下個Interceptor 第三步:處理響應

 List<Interceptor> interceptors = new ArrayList<>();
    //使用者自定義的攔截器
    interceptors.addAll(client.interceptors());
    //重試和重定向攔截器
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //快取攔截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //連線攔截器
    interceptors.add(new ConnectInterceptor(client));
    //使用者自定義的網路攔截器
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    //真正請求伺服器的攔截器
    interceptors.add(new CallServerInterceptor(forWebSocket));

首先執行的就是retryAndFollowUpInterceptor

RetryAndFollowUpInterceptor

//第一部分 根據url解析出主機名 解析埠
Request request = chain.request();

    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);

....省略

while(true){
    ....
    //第二部分處理下一個Interceptor
    response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
    releaseConnection = false;
    ....
    //第三部分 處理響應,如果是重定向,用while迴圈,重新處理下一個Interceptor
     if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }

      closeQuietly(response.body());

      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }



}

BridgeInterceptor和CacheInterceptor請讀者自行分析

ConnectInterceptor中會解析主機DNS並且建立TCP連線,Socket的輸入輸出流通過Source和Sink處理

public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();


    boolean doExtensiveHealthChecks = !request.method().equals("GET");

    //這裡會解析DNS並且建立TCP連線,Socket的輸入輸出流會和Okio的Source和Sink關聯
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
}

CallServerInterceptor 通過httpCodec的sink向socket傳送請求,並且通過httpCodec的openResponseBody把socket的輸入流寫入到Response中

@Override public Response intercept(Chain chain) throws IOException {
    HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
    StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
    Request request = chain.request();

    long sentRequestMillis = System.currentTimeMillis();
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return what
      // we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      // Write the request body, unless an "Expect: 100-continue" expectation failed.
      if (responseBuilder == null) {
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      }
    }

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))//真正把body寫進去
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }

6. 結束語

OkHttp庫還是挺龐大的,涉及到很多Http的基礎知識。這裡只是講解了OkHttp的一小部分。很多細節的東西也沒有深入講解。比如說Socket的建立,連線池的管理,HttpCodec如何解析輸入輸出流,路由的細節。希望有機會可以再深入的講解一番