OkHttp源码解析-请求网络流程

2018-08-20  本文已影响0人  撸码的皮大叔

分析Okhttp源码的一篇笔记

1.OkHttp的请求网络流程

使用就不说了 Okhttp官网:http://square.github.io/okhttp/
(1) 从请求处理开始分析

OkHttpClient.newCall(request) 进行 execute 或者 enqueue操作 当调用newCall方法时

 @Override public Call newCall(Request request) {
    return new RealCall(this, request);
  }

看到返回的是一个RealCall类, 调用enqueue 异步请求网络实际调用的是RealCall的enqueue方法,接下来看一下RealCall的enqueue里面干了什么

  void enqueue(Callback responseCallback, boolean forWebSocket) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
//最终的请求是dispatcher来完成
    client.dispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket));
  }

So 接下来分析一下dispatcher

(2) Dispatcher任务调度
Dispatcher 主要是控制并发的请求,它主要维护一下变量:

//最大并发请求数
  private int maxRequests = 64;  
//每个主机的最大请求数
  private int maxRequestsPerHost = 5;
  /**  消费者线程 */
  private ExecutorService executorService;
  /** 将要运行的异步请求队列 */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  /**  正在运行的异步请求队列*/
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  /** 正在运行的同步请求队列 */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

Dispatcher构造方法

 public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

两个构造方法,可以使用自己设定的线程池,如果没有设定,则会请求网络前自己创建默认线程池。这个线程池类似CachedThreadPool,比较适合执行大量的耗时比较少的任务,其实当调用RealCall的enqueue方法实际上调用的Dispatcher里面的enqueue方法

  synchronized void enqueue(AsyncCall call) {
//如果正在运行的异步请求队列小于64且 正在运行的请求主机数小于5 则把请求加载到runningAsyncCalls
//中并在线程池中执行,否则就加入到readyAsyncCalls中进行缓存等待。
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

线程池中传进来的参数AsyncCall,它是RealCall的内部类,其内部也实现了execute方法

  @Override protected void execute() {
      boolean signalledCallback = false;
      try {
       ...
      } catch (IOException e) {
       ...
      } finally {
  //无论这个请求的结果如何,都会执行   client.dispatcher().finished(this);
        client.dispatcher().finished(this);
      }
    }
  }

finished方法如下

 
  synchronized void finished(AsyncCall call) {
 //将此次请求 runningAsyncCalls移除后还执行promoteCalls方法
    if (!runningAsyncCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!");
    promoteCalls();
  }

promoteCalls方法如下

  private void promoteCalls() {
//如果正在运行的异步请求队列数大于最大请求数 return
    if (runningAsyncCalls.size() >= maxRequests) return;
   //若果将要运行的异步请求队列为空  return
    if (readyAsyncCalls.isEmpty()) return;  

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();//取出下一个请求 

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);//加入runningAsyncCalls中并交由线程池处理
        executorService().execute(call);
      }
      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

在回AsyncCall的execute方法

 @Override protected void execute() {

      try {
 //getResponseWithInterceptorChain() 返回Response   请求网络
        Response response = getResponseWithInterceptorChain(forWebSocket);
        ...
      } catch (IOException e) {
       ...
      } finally {
     ...
      }
    }
  }

(3) Interceptor拦截器
getResponseWithInterceptorChain方法如下

private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException {
    Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket);
    return chain.proceed(originalRequest);
  }

getResponseWithInterceptorChain方法中创建了ApplicationInterceptorChain,他是一个拦截器链,这个类也是RealCall的内部类,接下来执行了它的proceed方法

    @Override public Response proceed(Request request) throws IOException {
      
      if (index < client.interceptors().size()) {
        Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket);
       //从拦截器列表中取出拦截器
        Interceptor interceptor = client.interceptors().get(index);
        Response interceptedResponse = interceptor.intercept(chain);//1

        if (interceptedResponse == null) {
          throw new NullPointerException("application interceptor " + interceptor
              + " returned null");
        }

        return interceptedResponse;
      }

      // 如果没有更多拦截器的话 执行网络请求
      return getResponse(request, forWebSocket);
    }
  }

proceed方法每次从拦截器列表中去除拦截器,当存在多个拦截器时都会在上面注释1处阻塞,并等待下一个拦截器的调用返回,下面分别以拦截器中有一个、两个拦截器的场景加以模拟


拦截器模拟场景.png

拦截器是一种能够监控、重写、重试调用的机制。通常情况下用来添加、移除、转换请求和响应的头部信息,比如将域名替换为IP地址,在请求中添加host属性,也可以添加我们应用中的一下公共参数,比如设备id、版本号等,回到代码上来, 最后一行返回getResponse(request, forWebSocket)
来看getResponse做了什么

 Response getResponse(Request request, boolean forWebSocket) throws IOException {

   engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null);

  int followUpCount = 0;
    while (true) {
      if (canceled) {
        engine.releaseStreamAllocation();
        throw new IOException("Canceled");
      }
     boolean releaseConnection = true;
      try {
        engine.sendRequest();
        engine.readResponse();
        releaseConnection = false;
      } catch (RequestException e) {
 
        throw e.getCause();
      } catch (RouteException e) {

      } catch (IOException e) {
       ... 
      }
      }

}

创建了HttpEngine并且调用了HttpEngine的sendRequest方法和readResponse方法

(4) 缓存策略
查看一下sendRequest方法

 public void sendRequest() throws RequestException, RouteException, IOException {
    if (cacheStrategy != null) return; // Already sent.
    if (httpStream != null) throw new IllegalStateException();

    Request request = networkRequest(userRequest);
  // 获取client中的Cache,同时Cache在初始化时会读取缓存目录中曾经请求过的所有信息
    InternalCache responseCache = Internal.instance.internalCache(client);
    Response cacheCandidate = responseCache != null
        ? responseCache.get(request)//1
        : null;

    long now = System.currentTimeMillis();
    cacheStrategy = new CacheStrategy.Factory(now, request, cacheCandidate).get();
   //网络请求
    networkRequest = cacheStrategy.networkRequest;
 //缓存的响应
    cacheResponse = cacheStrategy.cacheResponse;

    if (responseCache != null) {
   //记录当前请求是网络发起还是缓存发起
      responseCache.trackResponse(cacheStrategy);
    }


   

    // 不进行网络请求并且缓存不存在或者过期,则返回504
    if (networkRequest == null && cacheResponse == null) {
      userResponse = new Response.Builder()
          .request(userRequest)
          .priorResponse(stripBody(priorResponse))
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_BODY)
          .build();
      return;
    }

    // 不进行网络请求而且缓存可以使用,则直接返回缓存
    if (networkRequest == null) {
      userResponse = cacheResponse.newBuilder()
          .request(userRequest)
          .priorResponse(stripBody(priorResponse))
          .cacheResponse(stripBody(cacheResponse))
          .build();
      userResponse = unzip(userResponse);
      return;
    }

    // 需要访问网络时
    boolean success = false;
    try {
      httpStream = connect();
      httpStream.setHttpEngine(this);

      ...
    }  
  }

显然是发送请求,但是最主要的是做了缓存的策略,上面注释1处cacheCandidate 是上次与服务器交互缓存的Response,这里缓存均基于Map,key是请求中的url的md5,value是在文件中查询到的缓存,页面置换基于LRU算法,现在只需知道cachCandidate是一个可以读取缓存Header的Response即可,根据cacheStrategy的处理得到了networkRequest和cacheResponse都是为nulld的情况下,也就是不进行网络请求并且缓存不存在或者过期,这个是返回504错误,当networkRequest为null时也就是不进行网络请求,如果缓存可以使用时则直接返回缓存,其他则请求网络。

接下来查看readResponse方法

 public void readResponse() throws IOException {

     ... 
    Response networkResponse;

    if (forWebSocket) {
    //读取网络响应
      networkResponse = readNetworkResponse();
    }  

 
    receiveHeaders(networkResponse.headers());

  //检查缓存是否可用,如果可用,就用当前缓存的Response,关闭网络连接,释放连接
    if (cacheResponse != null) {
      if (validate(cacheResponse, networkResponse)) {//1
        userResponse = cacheResponse.newBuilder()
            .request(userRequest)
            .priorResponse(stripBody(priorResponse))
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();
        releaseStreamAllocation();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        InternalCache responseCache = Internal.instance.internalCache(client);
        responseCache.trackConditionalCacheHit();
        responseCache.update(cacheResponse, stripBody(userResponse));
        userResponse = unzip(userResponse);
        return;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

);

  }

这个方法主要用来解析HTTP响应报文,如果有缓存并且可用,则用缓存的数据并更新缓存,否则就用网络 请求返回的数据,查看注释1处validate方法是如何判断缓存是否可用。

  private static boolean validate(Response cached, Response network) {
 //如果服务器返回304,则缓存有效
    if (network.code() == HTTP_NOT_MODIFIED) {
      return true;
    }

   
//通过缓存和网络请求响应中的Last-Modified来计算是否是最新数据,如果是,则缓存有效
    Date lastModified = cached.headers().getDate("Last-Modified");
    if (lastModified != null) {
      Date networkLastModified = network.headers().getDate("Last-Modified");
      if (networkLastModified != null
          && networkLastModified.getTime() < lastModified.getTime()) {
        return true;
      }
    }

    return false;
  }

如果缓存有效,则返回304 Not Modified,否者直接返回body。如果缓存过期或者强制放弃缓存,则缓存策略全部交给服务器判断,客服端只需要发送条件GET请求即可。条件GET请求有两种方式:一种是Last-Modified-Data,另一种ETag,这里采用Last-Modified-Data,通过缓存和网络请求响应中的Last-Modified来计算是否是最新数据 ,如果是 则缓存有效

(5) 失败重连
重回RealCall的getResponse方法

 Response getResponse(Request request, boolean forWebSocket) throws IOException {
    
      ...
   
      try {
   
      }catch (RouteException e) {
       
       HttpEngine retryEngine = engine.recover(e.getLastConnectException(), null);//1
      } catch (IOException e) {
    
        HttpEngine retryEngine = engine.recover(e, null);//2
     
      }
  }
}

当发生IOException或者RouteException时都会执行HttpEngine的recover方法 ,代码如下:

  public HttpEngine recover(IOException e, Sink requestBodyOut) {
    if (!streamAllocation.recover(e, requestBodyOut)) {
      return null;
    }

    if (!client.retryOnConnectionFailure()) {
      return null;
    }

    StreamAllocation streamAllocation = close();

    //重新创建HttpEngine并返回,用来完成重连
    return new HttpEngine(client, userRequest, bufferRequestBody, callerWritesRequestBody,
        forWebSocket, streamAllocation, (RetryableSink) requestBodyOut, priorResponse);
  }

Okhttp请求流程图


image.png
上一篇下一篇

猜你喜欢

热点阅读