OKHttp源码分析

2017-09-06  本文已影响0人  SDY_0656

1,OKHttp工作流程
在使用OKHttp请求调用call.enqueue(responseCallback)的时候,就是将call加入到Dispatcher中,交给Diapatcher来分发,下面是call.enqueue()的代码:

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

然后我们看一下dispatcher().enqueue(new AsyncCall(responseCallback))是怎么写的:

 synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

很简单,就是判断如果当前正在运行的request小于64并且和call相同的这个端口的请求数小于5的时候,就直接把这个call加入到当前运行的请求队列中,并且用线程池启动这个call,否则就加入到等待的请求队列中,下面是call的execute()代码:

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }

可以看到,真正请求获得response的部分是在getResponseWithInterceptorChain();先不看这个,看一下再最后finally运行的dispatcher.finished(),看看这个做了什么:

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

这段代码先是将call从正在运行的请求队列里移除,然后在调用了promoteCalls从等待的请求队列里移动请求到运行队列里,

 private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

可以看出,最后finally运行的代码就是手动的移动等待的请求队列。
现在来看一看具体是怎么样取得response的,也就是上面提到的getResponseWithInterceptorChain(),

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

Socket连接的部分在ConnectInterceptor中,而在ConnectInterceptor中,我们主要看一下StreamAllocation的代码,

public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

在findHealthyConnection()函数中,真正连接的代码如下:

 RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

这部分代码很长,主要做了如下三个部分的工作:
1, 选择路线与自动重连(RouteSelector)
在RouteSelector的next()函数中,
如果Proxy为null,
(1)在构造函数中设置代理为Proxy.NO_PROXY
(2)如果缓存中的lastInetSocketAddress为空,就通过DNS(默认是Dns.SYSTEM,包装了jdk自带的lookup函数)查询,并保存结果,注意结果是数组,即一个域名有多个IP,这就是自动重连的来源。
(3)如果还没有查询到就递归调用next查询,直到查到为止
(4)一切next都没有枚举到,抛出NoSuchElementException,退出(这个几乎见不到)。
如果Proxy为HTTP
(1)设置socket的ip为代理地址的ip
(2)设置socket的端口为代理地址的端口
(3)一切next都没有枚举到,抛出NoSuchElementException,退出
2,连接socket
连接socket这部分的工作主要是由RealConnection完成的,在经过RouteSelector选择完端口之后,就可以进行TCP连接了,这部分的工作主要分为如下5各部分:
(1)如果连接池中已经存在连接,就从中取出(get)RealConnection,如果没有就进行下一步。
(2)根据选择的路线(Route),调用Platform.get().connectSocket选择当前平台Runtime下最好的socket库进行握手
(3)将建立成功的RealConnection放入(put)连接池缓存
(4)如果存在TLS,就根据SSL版本与证书进行安全握手
(5) 构造HttpStream并维护刚刚的socket连接,管道建立完成
3,release Socket
这部分主要有两部分工作:
(1)尝试从缓存的连接池中删除(remove)
(2)如果没有命中缓存,就直接调用jdk的socket关闭
在socket连接完成之后,接下来进行的就是http请求的序列化/response的反序列化。在connect中有很重要的一步:

    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));

source简单理解就是 inputstream,sink类似于outputstream,在OKHttp中拦截器是非常重要的部分,request.builder.build()出来的是一般的请求,如果要添加其他的装饰,就需要用到拦截器interceptor,拦截器可以用来修改/消费请求,修改结果。

上一篇下一篇

猜你喜欢

热点阅读