OKHttp源码解析

2021-11-29  本文已影响0人  天上飘的是浮云

前言: OKHttp源码基于3.10.0版本进行跟踪,因为后期版本源码基于Kotlin写,不好跟踪。

implementation "com.squareup.okhttp3:okhttp:3.10.0"
流程图.png

一、OKHttp执行请求的流程

        var okhttpClient = OkHttpClient.Builder().build();
        var request: Request? = Request.Builder().url("www.baidu.com").build()
        var call = okhttpClient.newCall(request)

        call?.enqueue(object: Callback {
            override fun onFailure(call: Call, e: IOException) {
                TODO("Not yet implemented")
            }

            override fun onResponse(call: Call, response: Response) {
                TODO("Not yet implemented")
            }

        })

用法我们都知道:

  1. OKHttpClient.newCall()实际上返回的RealCall对象
  2. 在RealCall中调用enqueue()方法中放入队列中,在放入队列时,先判断是否重复添加了该请求,如果重复就报错,不然就调用dispatcher.enqueue方法
  3. dispatcher.enqueue()方法继续判断,如果当前运行队列中的数值小于64,并且同时访问同一台服务器的请求小于5,就直接运行,不然就加入等待队列中。
  4. 当进入运行队列时,就可以直接运行,这时候调用AsyncCall.execute()方法,然后通过Response response = getResponseWithInterceptorChain()方法使用责任链模式开始执行请求响应

二、OKHttp中关键类及其作用

责任链拦截器:

三、OKHttp发送request流程源码跟踪

1. OkHttpClient.Builder().build()建造者模式创建OKHttpClient实例

其实OKHttpClient并不是一个单例,可以在App中创建多个,但是还是建议只创建一个。因为每个OKHttpClient有自己的请求池,线程池和连接池等,创建太多,对内存消耗不太友好。

这个没啥好说的建造者模式本质可以设置很多属性,但也可以不设置任何属性,也可以创建出一个复杂的对象来。

2. 通过Request.Builder().build()也是建造者模式构建Request实例

作为请求信息的封装,内部包含了请求url,请求方法method,请求头headers,请求体RequestBody,tag标签等

这里也比较简单,没啥好跟踪的。

3. OKhttpClient实例对象调用newCall(request)方法实例化一个Call对象

Call实际上是一个接口,实际上返回的是RealCall对象

  @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }
4. 调用RealCall对象的enqueue方法开始请求数据

而enqueue()方法中又调用了dispatcher的enqueue()方法,而dispatcher.enqueue 又创建了个AsyncCall对象传入。

这里要注意的是它有个判断request是否被执行,如果被执行,重复请求将抛出异常。

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
5. Dispatcher中的enqueue方法
  1. 在Dispatcher类中有三个双端队列:

if中的判断条件是如果异步运行队列的size > 64或者运行队列中同一host的异步任务大于5 的话,不能加入运行队列,放入等待队列中,满足条件加入运行队列,并通过线程池执行AsyncCall。

  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }
  1. 它有一个线程池
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

SynchronousQueue队列里要不为0,要不为1。该队列最大只能同时容纳一个任务,之所以用这个队列,相当于把任务提交个线程池,就会立即执行,不会等待。
因为Dispatcher中已经维护了等待队列和运行队列了,所以线程池使用了SynchronousQueue,不需要线程池内部再去维护等待队列了,提交线程池就能够执行。

6. 因为AsyncCall继承只NameRunnable,实际就是一个异步任务,接下来就执行AsyncCall.execute()方法了
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
147行        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }

看到147行:Response response = getResponseWithInterceptorChain();就进入到责任链模式的拦截器执行真正的请求了。

7. 这里责任链拦截器请求的先不看,看到 client.dispatcher().finished(this);

最后Response响应回来后,又调用了dispatcher的finished()方法。

  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    ...
    synchronized (this) {
     ...
      if (promoteCalls) promoteCalls();
      ...
    }

    ...
  }

它最终调用了它的另一个finish()的重载方法,并去调用了promoteCalls(),它在这里边就会去进行队列的管理并执行下一个请求。

  private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

四、OKHttp真正发送请求时的责任链拦截器源码跟踪

4.1 上边我们跟踪到了AsyncCall的execute方法了,然后调用了getResponseWithInterceptorChain()方法返回Response
Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
//1. 添加自己设置的拦截器
  interceptors.addAll(client.interceptors());
//2. 添加重试重定向拦截器     
 interceptors.add(retryAndFollowUpInterceptor);
//3. 添加桥拦截器    
  interceptors.add(new BridgeInterceptor(client.cookieJar()));
//4. 添加缓存拦截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
//5. 添加连接拦截器
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {    interceptors.addAll(client.networkInterceptors());
    }
//6. 添加CallServer拦截器
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }
4.1 首先RealInterceptorChain实现了Interceptor.Chain,Chain链条的意思,相当于一个责任链连点。
   Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

这一句将我们的拦截器集合interceptors,原始请求originalRequest,index=0传入构造出一个Chain对象,并调用了chain.proceed方法

4.2 RealInterceptorChain的proceed方法会返回Response对象,在它里面实际会生成下一个Chain,并从拦截器集合interceptors中取出对应的拦截器interceptor,并把nextChain,交给interceptor.intercept(next)执行。

这实际上就是责任链将任务一层一层往下传,只处理本拦截器该干的事儿。

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
  ...
      // Call the next interceptor in the chain.
143行    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);
  ...
}
4.3 这里看下RetryAndFollowUpInterceptor重试重定向拦截器的intercept()方法
  @Override public Response intercept(Chain chain) throws IOException {
  ...
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
...
126行 response = realChain.proceed(request, streamAllocation, null, null);

  ...
}

这里126行,实际执行了上一个RealInterceptorChain传过来的chain的proceed()方法,然后将request等有传入进去,这里有循环进入了RealInterceptorChain的proceed方法,将request请求责任链一层层往下传递。

直到最后一个拦截器执行完真正的请求后,返回response,在一层层往上传递,并进行相应的处理。

4.4 RetryAndFollowUpInterceptor重试重定向拦截器做了啥
  @Override public Response intercept(Chain chain) throws IOException {
  ...
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
...
117行 while (true) {
...
126行 response = realChain.proceed(request, streamAllocation, null, null);
...
158行 Request followUp = followUpRequest(response, streamAllocation.route());
  ...

169行    if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }
 }
}
followUpRequest()方法
private Request followUpRequest(Response userResponse, Route route) throws IOException {
  ...
          // fall-through
      case HTTP_MULT_CHOICE:
      case HTTP_MOVED_PERM:
      case HTTP_MOVED_TEMP:
      case HTTP_SEE_OTHER:
        // Does the client allow redirects?
        if (!client.followRedirects()) return null;

305行        String location = userResponse.header("Location");
        if (location == null) return null;
        HttpUrl url = userResponse.request().url().resolve(location);
  ...
}

305行 从Response的响应头中取出location字段标识的重定向url,并拼装重定向request

它还将SSLFactory传入Address对象中
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);

194行  private Address createAddress(HttpUrl url) {
    SSLSocketFactory sslSocketFactory = null;
    HostnameVerifier hostnameVerifier = null;
    CertificatePinner certificatePinner = null;
    if (url.isHttps()) {
      sslSocketFactory = client.sslSocketFactory();
      hostnameVerifier = client.hostnameVerifier();
      certificatePinner = client.certificatePinner();
    }

    return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
        sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
        client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
  }
   if (builder.sslSocketFactory != null || !isTLS) {
      this.sslSocketFactory = builder.sslSocketFactory;
    } else {
      this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
    }

  private SSLSocketFactory systemDefaultSslSocketFactory(X509TrustManager trustManager) {
    try {
      SSLContext sslContext = Platform.get().getSSLContext();
      sslContext.init(null, new TrustManager[] { trustManager }, null);
      return sslContext.getSocketFactory();
    } catch (GeneralSecurityException e) {
      throw assertionError("No System TLS", e); // The system has no TLS. Just give up.
    }
  }
4.5 BridgeInterceptor桥拦截器做了啥
  @Override public Response intercept(Chain chain) throws IOException {
  ...
   if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }
   
      requestBuilder.header("Accept-Encoding", "gzip");
  ...
97行Response networkResponse = chain.proceed(requestBuilder.build());

100行  if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }
}
4.6 CacheInterceptor缓存拦截器做了啥
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();

    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;
networkResponse = chain.proceed(networkRequest);
cache.update(cacheResponse, response);
4.7 ConnectInterceptor连接拦截器做了啥

连接拦截器其实做的事情不是很多:
1.尝试当前连接是否可以复用。
2.尝试连接池中找可以复用的连接
3.切换路由,继续在连接中尝试找可以复用的连接
4.以上都没有则new一个新的。
5.新的连接放入连接池
6.建立连接,开始握手

  @Override public Response intercept(Chain chain) throws IOException {
    。。。
    StreamAllocation streamAllocation = realChain.streamAllocation();

    。。。。
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

缓存策略有两种:

4.8 CallServerInterceptor拦截器做了啥

这里就是最后一个拦截器了,它不会调用realChain.proceed把request再往下传递了,它将真正执行http请求:
1.先写入请求Header

httpCodec.writeRequestHeaders(request);

2.如果请求头的Expect: 100-continue时,只发送请求头,执行3,不然执行4

    if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

3.根据后台返回的结果判断是否继续请求流程

      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
        // from being reused. Otherwise we're still obligated to transmit the request body to
        // leave the connection in a consistent state.
        streamAllocation.noNewStreams();
      }

4.写入请求体,完成请求

httpCodec.finishRequest();

5.得到响应头,构建初步响应

    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

6.构建响应体,完成最终响应
7.返回响应

   Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
上一篇 下一篇

猜你喜欢

热点阅读