android菜鸟笔记

OkHttp3 流程分析(上)

2019-09-26  本文已影响0人  李发糕

OkHttp3有很多大佬分析过了 这里自己也写一篇加深记忆。
内容比较多,分为上下两篇八。

OKHttp 解析

基本流程

OkHttpClient client = new OkHttpClient(); //生成Client实例

String run(String url) throws IOException {
  Request request = new Request.Builder() //生成request
      .url(url)
      .build();

  try (Response response = client.newCall(request).execute()) {//生成Call并运行请求得到response
    return response.body().string();
  }
}
//上面的代码运行在一个线程中,因为execute是同步方法。自然也有一个异步调用方法
client.newCall(request).enqueue(Callback responseCallback);

下面我们就根据基本的流程分析一下OkHttp3。

Request

没什么好讲的,看一下基本属性

final HttpUrl url;//封装好的url地址
final String method;//请求方法
final Headers headers;//请求头
final @Nullable RequestBody body;//请求体
final Map<Class<?>, Object> tags;//设置的标签 可以用来取消请求
private volatile CacheControl cacheControl;//缓存控制

Request是通过Builder构造的。

Call

这是一个接口,其方法的具体实现是通过RealCall来完成的。

在调用的时候通过client.newCall(request)生成一个RealCall。

//OkHttpClient
@Override public Call newCall(Request request) {
  return RealCall.newRealCall(this, request, false /* for web socket */);
}

//RealCall
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);//先通过构造方法生成call实例
    call.eventListener = client.eventListenerFactory().create(call);//创建eventListener并设置给call用于监听请求过程
    return call;
}

首先我们看一下RealCall的一些属性

final OkHttpClient client;//客户端实例
final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;//用于重试请求的拦截器
private EventListener eventListener;//监听
final Request originalRequest;//我们刚才创建的原始请求
final boolean forWebSocket;//是否是用于websocket

// Guarded by this.
private boolean executed;//是否已经执行了

然后就是execute和enqueue方法

我们先看一下同步的请求情况

execute

//execute
@Override public Response execute() throws IOException {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();
  eventListener.callStart(this);//回调通知请求开始
  try {
    client.dispatcher().executed(this);//通知分发器立即执行此请求
    Response result = getResponseWithInterceptorChain();//通过拦截器链获取response
    if (result == null) throw new IOException("Canceled");
    return result;//返回response
  } catch (IOException e) {
    eventListener.callFailed(this, e);
    throw e;
  } finally {
    client.dispatcher().finished(this);//最后通知分发器结束此请求
  }
}

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());
  //构造拦截器链
    return chain.proceed(originalRequest);//通过拦截器链进行请求并返回
}

同步的方法比较简单

1 修改执行状态,通知回调等。

2 通知dispatcher此请求开始执行拉。

3 构造拦截器链并通过其进行请求,返回请求结果。

4 通知dispatcher此请求结束了。

下面再看一下enqueue异步请求。

enqueue

@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();
  eventListener.callStart(this);
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

1 修改执行状态,通知回调等。

2 将请求包装成AsyncCall并调用dispatcher的enqueue方法将其入队。

我们这里先看一下AsyncCall。

AsyncCall

final class AsyncCall extends NamedRunnable {
  private final Callback responseCallback;

  ···

    //主要看他的执行方法
  @Override protected void execute() {
    boolean signalledCallback = false;
    try {
      Response response = getResponseWithInterceptorChain();
      if (retryAndFollowUpInterceptor.isCanceled()) {
        signalledCallback = true;
        responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
      } else {
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      }
    } catch (IOException e) {
      if (signalledCallback) {
        // Do not signal the callback twice!
        Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
      } else {
        eventListener.callFailed(RealCall.this, e);
        responseCallback.onFailure(RealCall.this, e);
      }
    } finally {
      client.dispatcher().finished(this);
    }
  }
}

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

AsyncCall实际是一个Runnable,其run方法被调用之后会调用execute方法,并在期间更改其当前线程名。这点暂不赘述。我们可以看到在他的execute期间做的事和同步的方法差不多。

1 构造拦截器链并通过其进行请求,通过回调返回请求结果。不过在这里判断了此请求是否cancel了。

2 通知dispatcher此请求结束了。

到这里Call类的事暂时告于段落了。我们接下来看一下上面多次提到的分发器dispatcher。

Dispatcher

先看一下一些基本属性

private int maxRequests = 64;//最大请求数
private int maxRequestsPerHost = 5;//最大Host数
private @Nullable Runnable idleCallback;//闲置状态下的回调

/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;//请求线程池

/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();//等待被执行的请求队列

/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();//正在执行的异步请求队列

/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();//正在执行的同步请求队列

在同步请求的情况下会被调用分发器的executed方法

synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);//将请求存入了同步队列。
}

在异步请求的情况下被调用enqueue方法存入包装的AsyncCall

synchronized void enqueue(AsyncCall call) {
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {//如果运行中的异步请求数小于最大请求数且如果算上此请求,当前运行的请求的主机数小于最大请求数
    runningAsyncCalls.add(call);//存入异步队列
    executorService().execute(call);//通过线程池执行这个请求,后续我们刚才分析过了。
  } else {
    readyAsyncCalls.add(call);//否则现存入预备队列
  }
}

最后,当一个call运行完毕后,会调用分发器的finished方法。

/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
  finished(runningAsyncCalls, call, true);//移除此请求,运行下一个异步的请求
}

/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
  finished(runningSyncCalls, call, false);//移除此请求,
}

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");//把结束的请求从相应队列中移除
      if (promoteCalls) promoteCalls();//运行下一个异步请求
      runningCallsCount = runningCallsCount();//计算运行中的请求总数
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {//如果当前没有网络请求以及闲置callback
      idleCallback.run();//运行闲置任务
    }
}

public synchronized int runningCallsCount() {
    return runningAsyncCalls.size() + runningSyncCalls.size();
}

//运行下一个异步请求
private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; //异步请求到达最大容量了
    if (readyAsyncCalls.isEmpty()) return; //没有别的请求了

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {//遍历准备队列
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {//如果算上此请求,当前运行的请求的主机数小于最大请求数
        i.remove();//从预备队列中移除
        runningAsyncCalls.add(call);//加入异步队列
        executorService().execute(call);//放入线程池运行
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
}

默认的请求线程池类似一个CachedThreadPool,我们看一下参数

public synchronized ExecutorService executorService() {
  if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
  }
  return executorService;
}

请求的分发逻辑到这里就结束了,下面看一下一个请求链是如何被组装的。

Interceptor&Chain

先回到之前Call里面的逻辑再看一下

Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
    originalRequest, this, eventListener, client.connectTimeoutMillis(),
    client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);

之前讲过了,这里创建了一个拦截器链,就是通过这个链来请求并获取响应的。

我们直接看一下这个链的具体实现。

RealInterceptorChain

实际上一个Chain对象可以看作是链的一个节点。

首先是一些基本属性和构造方法

private final List<Interceptor> interceptors;//添加的拦截器list
private final StreamAllocation streamAllocation;//流分配器
private final HttpCodec httpCodec;//http编解码器
private final RealConnection connection;//链接
private final int index;//此节点对应的拦截器在list中的索引
private final Request request;//用户构造的请求
private final Call call;//本次请求call
private final EventListener eventListener;
private final int connectTimeout;//链接超时时限
private final int readTimeout;//读超时时限
private final int writeTimeout;//写入超时时限
private int calls;//此节点proceed调用次数

public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
    HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
    EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
  ···
}

上面有没见过的类没有关系,暂时还不需要,等下会详细分析。

在构造chain时 StreamAllocation HttpCodec RealConnection 传入的都是null,因为这几个属性也是在拦截器链运行的过程中才生成的。

按照前面的逻辑,通过proceed方法获取响应。下面详细进行分析。

@Override public Response proceed(Request request) throws IOException {
  return proceed(request, streamAllocation, httpCodec, connection);
}

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();//此节点对应的拦截器索引越界。
    calls++;//计数++
    //省略异常判断
    ···
    // 生成下一个节点。
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);//获取当前节点对应的拦截器
    Response response = interceptor.intercept(next);//通过当前拦截器调用下一个节点的process方法来获取响应值
        //省略异常判断
    ···
    return response;
  }

省略一些异常判断之后逻辑看起来比较清晰。

Interceptor

基本的请求包含以下几个拦截器。

interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
   interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));

RetryAndFollowUpInterceptor

首先要看的自然是intercept方法。

@Override public Response intercept(Chain chain) throws IOException {
  //获取相应数据
  Request request = chain.request();
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Call call = realChain.call();
  EventListener eventListener = realChain.eventListener();
    //实例化一个StreamAllocation对象。暂时不用管他是干啥的。
  StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
      createAddress(request.url()), call, eventListener, callStackTrace);
  this.streamAllocation = streamAllocation;

  int followUpCount = 0;
  Response priorResponse = null;
  while (true) {//死循环
    if (canceled) {
      streamAllocation.release();
      throw new IOException("Canceled");
    }

    Response response;
    boolean releaseConnection = true;//是否需要释放链接
    try {
      response = realChain.proceed(request, streamAllocation, null, null);//运行下一个节点的proceed方法得到响应数据
      releaseConnection = false;//不需要释放链接
    } catch (RouteException e) {
      // The attempt to connect via a route failed. The request will not have been sent.
      if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
        throw e.getFirstConnectException();//抛出不可处理的异常,需要释放链接了
      }
      releaseConnection = false;//异常,跳过下面的步骤再次循环尝试请求
      continue;
    } catch (IOException e) {
      // An attempt to communicate with a server failed. The request may have been sent.
      boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
      if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
      releaseConnection = false;
      continue;//异常,跳过下面的步骤再次循环尝试请求
    } finally {
      if (releaseConnection) {//如果在前面抛出了异常. 这里需要释放链接
        streamAllocation.streamFailed(null);
        streamAllocation.release();
      }
    }

    // Attach the prior response if it exists. Such responses never have a body.
    if (priorResponse != null) {//如果存在priorresponse,把他添加给response
      response = response.newBuilder()
          .priorResponse(priorResponse.newBuilder()
                  .body(null)
                  .build())
          .build();
    }

    Request followUp;//跟踪请求
    try {
      followUp = followUpRequest(response, streamAllocation.route());//获取重定向后的请求
    } catch (IOException e) {
      streamAllocation.release();
      throw e;
    }

    if (followUp == null) {//无需重定向
      if (!forWebSocket) {
        streamAllocation.release();
      }
      return response;//直接返回
    }

    closeQuietly(response.body());//关闭body停止写入

    if (++followUpCount > MAX_FOLLOW_UPS) {//重定向次数过多异常
      streamAllocation.release();
      throw new ProtocolException("Too many follow-up requests: " + followUpCount);
    }

    if (followUp.body() instanceof UnrepeatableRequestBody) {//禁止重定向
      streamAllocation.release();
      throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
    }

    if (!sameConnection(response, followUp.url())) {
      streamAllocation.release();//根据链接判断是否需要新生成streamAllocation
      streamAllocation = new StreamAllocation(client.connectionPool(),
          createAddress(followUp.url()), call, eventListener, callStackTrace);
      this.streamAllocation = streamAllocation;
    } else if (streamAllocation.codec() != null) {
      throw new IllegalStateException("Closing the body of " + response
          + " didn't close its backing stream. Bad interceptor?");
    }

    request = followUp;//重定向
    priorResponse = response;//设置priorResponse
  }
}

只是为了了解整体过程,其余方法这里不赘述了。

BridgeInterceptor
1. 根据原本的请求新建request并填充需要的各种header,cookie等属性,使用此request进行网络请求

2. 从返回的数据中接收cookie,根据返回的response信息构建新的response用于返回上一层处理(okhttp默认没有实现cookie的存取,如果需要要自己实现cookiejar)

3. 如果Content-Encoding == gzip 则解码后返回

CacheInterceptor
@Override public Response intercept(Chain chain) throws IOException {
      //根据request获取缓存
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;
    long now = System.currentTimeMillis();
      //获取缓存策略
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
      //网络请求及缓存response
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;
    if (cache != null) {
      cache.trackResponse(strategy); //记录缓存命中
    }
      //有缓存,不符合要求
    if (cacheCandidate != null && cacheResponse == null) {
        //关闭缓存流
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }
    // If we're forbidden from using the network and the cache is insufficient, fail.
      //没网没缓存,返回504网关超时
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_BODY)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }
    // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }
    Response networkResponse = null;
    try {
      //网络请求数据
        networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }
    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (validate(cacheResponse, networkResponse)) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers())) //合并header
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();
        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response); //更新缓存
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }
    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();
    if (HttpHeaders.hasBody(response)) {
      CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
      response = cacheWritingResponse(cacheRequest, response);
    }
    return response;
  }

ConnectInterceptor
//连接目标服务器并把请求传递给下一拦截器
@Override 
public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);//生成Http编解码器
    RealConnection connection = streamAllocation.connection();//获取链接
    return realChain.proceed(request, streamAllocation, httpCodec, connection);//交给最后一个拦截器去
}

CallServerInterceptor
@Override public Response intercept(Chain chain) throws IOException {
    ···
    httpCodec.writeRequestHeaders(request);//写入请求头
    ···
      
    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      //有请求体的情况
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();//这种情况需要先请求等待有HTTP/1.1 100 Continue响应头再写入请求体
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);//读取响应头,如果responsecode为100,这里会返回null。
      }

      if (responseBuilder == null) {//可以写入请求体
        // Write the request body if the "Expect: 100-continue" expectation was met.
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));//创建请求体输出流
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);//添加写入缓冲

        request.body().writeTo(bufferedRequestBody);//把请求body写入输出流
        bufferedRequestBody.close();//关闭输出流
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {//发送了100-continue的请求,但是没有得到期望结构,要避免此Http/1链接被重用。
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();//请求写入完毕

    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      responseBuilder = httpCodec.readResponseHeaders(false);//读取响应头生成响应构建者
    }

    Response response = responseBuilder//构建一个响应
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
    
    int code = response.code();
    if (code == 100) {
      // server sent a 100-continue even though we did not request one.
      // try again to read the actual response
      responseBuilder = httpCodec.readResponseHeaders(false);//重新读取响应头并构建

      response = responseBuilder
              .request(request)
              .handshake(streamAllocation.connection().handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();

      code = response.code();
    }

    realChain.eventListener()
            .responseHeadersEnd(realChain.call(), response);

    if (forWebSocket && code == 101) {//websocket链接
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))//解析响应体
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();//是否需要关闭streamAllocation
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {//报错
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }

上面的代码中涉及了 100 (Continue) HTTP 状态码,在客户端发送 Request Message 之前,HTTP/1.1 协议允许客户端先判定服务器是否愿意接受客户端发来的消息主体(基于 Request Headers)。即, 客户端 在 Post(较大)数据到服务端之前,允许双方“握手”,如果匹配上了,Client 才开始发送(较大)数据。这么做的原因是,如果客户端直接发送请求数据,但是服务器又将该请求拒绝的话,这种行为将带来很大的资源开销。

缓存

此部分可以看关于DiskLruCache的另一篇文章

网络

终于到了激动人心的最后一部分~这里算是OkHttp3的核心部分了

首先我们先看一下刚刚见到的httpCodec。

HttpCodec

HttpCodec是一个接口,具体实现交给了Http1Codec和Http2Codec。我们先看看这个接口

public interface HttpCodec {

  int DISCARD_STREAM_TIMEOUT_MILLIS = 100;

  //创建一个用于写入requestBody的输出流
  Sink createRequestBody(Request request, long contentLength);

  //写入请求头
  void writeRequestHeaders(Request request) throws IOException;

  //相当于flush,把请求刷入底层socket
  void flushRequest() throws IOException;

  //flush并结束本次请求
  void finishRequest() throws IOException;

  //解析响应头
  Response.Builder readResponseHeaders(boolean expectContinue) throws IOException;

  //获取用于读取响应体的流
  ResponseBody openResponseBody(Response response) throws IOException;

    //取消请求
  void cancel();
}

下面我们看一下对于Http/1.x与Http/2的请求的具体实现。

Http1Codec

首先是基本属性和构造方法

final OkHttpClient client;
/** The stream allocation that owns this stream. May be null for HTTPS proxy tunnels. */
final StreamAllocation streamAllocation;

final BufferedSource source;//用于读取响应的source
final BufferedSink sink;//输入请求的sink
int state = STATE_IDLE;
private long headerLimit = HEADER_LIMIT;

public Http1Codec(OkHttpClient client, StreamAllocation streamAllocation, BufferedSource source,
    BufferedSink sink) {
  this.client = client;
  this.streamAllocation = streamAllocation;
  this.source = source;
  this.sink = sink;
}

按照之前看到的进行一次网络请求的顺序,首先要写入请求头

@Override public void writeRequestHeaders(Request request) throws IOException {
  String requestLine = RequestLine.get(
      request, streamAllocation.connection().route().proxy().type());
  writeRequest(request.headers(), requestLine);
}
public void writeRequest(Headers headers, String requestLine) throws IOException {
    if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
    sink.writeUtf8(requestLine).writeUtf8("\r\n");
    for (int i = 0, size = headers.size(); i < size; i++) {
      sink.writeUtf8(headers.name(i))
          .writeUtf8(": ")
          .writeUtf8(headers.value(i))
          .writeUtf8("\r\n");
    }
    sink.writeUtf8("\r\n");
    state = STATE_OPEN_REQUEST_BODY;
}

这里的逻辑比较简单,就是拼接及按顺序写入请求头。

然后要做的事获取一个用于写入requestBody的输出流。

@Override public Sink createRequestBody(Request request, long contentLength) {
  if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
    return newChunkedSink();//长度不固定
  }

  if (contentLength != -1) {
    return newFixedLengthSink(contentLength);//固定长度
  }

  throw new IllegalStateException(
      "Cannot stream a request body without chunked encoding or a known content length!");
}

public Sink newChunkedSink() {
    if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
    state = STATE_WRITING_REQUEST_BODY;//判断并修改当前状态
    return new ChunkedSink();
}

public Sink newFixedLengthSink(long contentLength) {
    if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
    state = STATE_WRITING_REQUEST_BODY;
    return new FixedLengthSink(contentLength);
}

这里按照是否固定长度生成了两种输出流,并修改当前状态。

先看一下不固定长度的。

private final class ChunkedSink implements Sink {
  private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
  private boolean closed;

  ChunkedSink() {
  }

  @Override public Timeout timeout() {
    return timeout;
  }

  @Override public void write(Buffer source, long byteCount) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    if (byteCount == 0) return;

    sink.writeHexadecimalUnsignedLong(byteCount);
    sink.writeUtf8("\r\n");
    sink.write(source, byteCount);
    sink.writeUtf8("\r\n");
  }

  @Override public synchronized void flush() throws IOException {
    if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf.
    sink.flush();
  }

  @Override public synchronized void close() throws IOException {
    if (closed) return;
    closed = true;
    sink.writeUtf8("0\r\n\r\n");
    detachTimeout(timeout);
    state = STATE_READ_RESPONSE_HEADERS;
  }
}

这里可以看到,对编解码器持有的sink进行了一次包装,在写入数据的时候加入了换行等操作。

同理对于定长的sink也是一样的逻辑。

private final class FixedLengthSink implements Sink {
  private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
  private boolean closed;
  private long bytesRemaining;

  FixedLengthSink(long bytesRemaining) {
    this.bytesRemaining = bytesRemaining;
  }

  @Override public Timeout timeout() {
    return timeout;
  }

  @Override public void write(Buffer source, long byteCount) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    checkOffsetAndCount(source.size(), 0, byteCount);
    if (byteCount > bytesRemaining) {
      throw new ProtocolException("expected " + bytesRemaining
          + " bytes but received " + byteCount);
    }
    sink.write(source, byteCount);
    bytesRemaining -= byteCount;
  }

  @Override public void flush() throws IOException {
    if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf.
    sink.flush();
  }

  @Override public void close() throws IOException {
    if (closed) return;
    closed = true;
    if (bytesRemaining > 0) throw new ProtocolException("unexpected end of stream");
    detachTimeout(timeout);
    state = STATE_READ_RESPONSE_HEADERS;
  }
}

在构造的时候传入长度标示并在写入和关闭的时候做判断。

在请求体写入之后调用flush将请求刷入底层socket。

@Override public void flushRequest() throws IOException {
  sink.flush();
}

然后解析出我们的响应头

@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
  if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {//状态判断
    throw new IllegalStateException("state: " + state);
  }

  try {
    StatusLine statusLine = StatusLine.parse(readHeaderLine());//读取并解析第一行状态行
    Response.Builder responseBuilder = new Response.Builder()//根据信息构造responseBuilder
        .protocol(statusLine.protocol)
        .code(statusLine.code)
        .message(statusLine.message)
        .headers(readHeaders());//读取剩余行

    if (expectContinue && statusLine.code == HTTP_CONTINUE) {//根据参数和响应码返回
      return null;
    } else if (statusLine.code == HTTP_CONTINUE) {
      state = STATE_READ_RESPONSE_HEADERS;
      return responseBuilder;
    }

    state = STATE_OPEN_RESPONSE_BODY;
    return responseBuilder;
  } catch (EOFException e) {
    // Provide more context if the server ends the stream before sending a response.
    IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
    exception.initCause(e);
    throw exception;
  }
}

//读取一行响应头
private String readHeaderLine() throws IOException {
    String line = source.readUtf8LineStrict(headerLimit);//直接读取一行
    headerLimit -= line.length();
    return line;
}

//读取剩余行
public Headers readHeaders() throws IOException {
    Headers.Builder headers = new Headers.Builder();
    // parse the result headers until the first blank line
    for (String line; (line = readHeaderLine()).length() != 0; ) {//循环读取并写入headers
      Internal.instance.addLenient(headers, line);
    }
    return headers.build();
}

最后就是获取响应体了

@Override public ResponseBody openResponseBody(Response response) throws IOException {
  streamAllocation.eventListener.responseBodyStart(streamAllocation.call);
  String contentType = response.header("Content-Type");

  if (!HttpHeaders.hasBody(response)) {
    Source source = newFixedLengthSource(0);
    return new RealResponseBody(contentType, 0, Okio.buffer(source));
  }

  if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
    Source source = newChunkedSource(response.request().url());
    return new RealResponseBody(contentType, -1L, Okio.buffer(source));
  }

  long contentLength = HttpHeaders.contentLength(response);
  if (contentLength != -1) {
    Source source = newFixedLengthSource(contentLength);
    return new RealResponseBody(contentType, contentLength, Okio.buffer(source));
  }

  return new RealResponseBody(contentType, -1L, Okio.buffer(newUnknownLengthSource()));
}

在这个方法里,对source进行了两层包装,先根据是否定长包装成了对应的source,然后又包装了缓冲层。

在第一层包装中,几个类ChunkedSource/FixedLengthSource/UnknownLengthSource都继承了AbstractSource

这个类提供了当数据读取完毕后关闭缓存条目并释放socket用于重复使用的方法。

protected final void endOfInput(boolean reuseConnection, IOException e) throws IOException {
  if (state == STATE_CLOSED) return;//判断状态
  if (state != STATE_READING_RESPONSE_BODY) throw new IllegalStateException("state: " + state);

  detachTimeout(timeout);//移除超时

  state = STATE_CLOSED;//设置状态关闭
  if (streamAllocation != null) {
    streamAllocation.streamFinished(!reuseConnection, Http1Codec.this, bytesRead, e);//finishstream
  }
}

因为http1一个链接只能有一个请求,而http2支持一个链接多个请求,所以其实现和http1codec不同,我们看一下。

Http2Codec

先看一下基本属性

private final Interceptor.Chain chain;
final StreamAllocation streamAllocation;
private final Http2Connection connection;
private Http2Stream stream;
private final Protocol protocol;

与1不同,多了很多没见过的对象。我们先看一下请求流程再依次分析

和刚才一样,首先是写入请求头

@Override public void writeRequestHeaders(Request request) throws IOException {
  if (stream != null) return;//如果当前链接存在运行中的流,不能再此写入了

  boolean hasRequestBody = request.body() != null;
  List<Header> requestHeaders = http2HeadersList(request);//解析出请求头
  stream = connection.newStream(requestHeaders, hasRequestBody);//使用链接获取新的流的实例
  stream.readTimeout().timeout(chain.readTimeoutMillis(), TimeUnit.MILLISECONDS);//设置超时
  stream.writeTimeout().timeout(chain.writeTimeoutMillis(), TimeUnit.MILLISECONDS);
}

剩下几个方法就不展示了,实际上就是对stream的操作替换了http1中直接操作链接中的source和sink,有兴趣可以看一下。

下面我们重点看一下这里的Http2Connection和Http2Stream。

Http2Connection

首先看一下他的部分属性和构造方法。

final Map<Integer, Http2Stream> streams = new LinkedHashMap<>();//流集合
final String hostname;//主机名
int lastGoodStreamId;//上一个好的流的id
int nextStreamId;//下一个流id
boolean shutdown;//是否终止
···
final Socket socket;//底层socket
final Http2Writer writer;
Http2Connection(Builder builder) {
    pushObserver = builder.pushObserver;
    client = builder.client;
    listener = builder.listener;
    // http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-5.1.1
    nextStreamId = builder.client ? 1 : 2;
    if (builder.client) {
      nextStreamId += 2; // In HTTP/2, 1 on client is reserved for Upgrade.
    }

    // Flow control was designed more for servers, or proxies than edge clients.
    // If we are a client, set the flow control window to 16MiB.  This avoids
    // thrashing window updates every 64KiB, yet small enough to avoid blowing
    // up the heap.
    if (builder.client) {
      okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, OKHTTP_CLIENT_WINDOW_SIZE);
    }

    hostname = builder.hostname;

    writerExecutor = new ScheduledThreadPoolExecutor(1,
        Util.threadFactory(Util.format("OkHttp %s Writer", hostname), false));
    if (builder.pingIntervalMillis != 0) {
      writerExecutor.scheduleAtFixedRate(new PingRunnable(false, 0, 0),
          builder.pingIntervalMillis, builder.pingIntervalMillis, MILLISECONDS);
    }

    // Like newSingleThreadExecutor, except lazy creates the thread.
    pushExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>(),
        Util.threadFactory(Util.format("OkHttp %s Push Observer", hostname), true));
    peerSettings.set(Settings.INITIAL_WINDOW_SIZE, DEFAULT_INITIAL_WINDOW_SIZE);
    peerSettings.set(Settings.MAX_FRAME_SIZE, Http2.INITIAL_MAX_FRAME_SIZE);
    bytesLeftInWriteWindow = peerSettings.getInitialWindowSize();
    socket = builder.socket;
    writer = new Http2Writer(builder.sink, client);

    readerRunnable = new ReaderRunnable(new Http2Reader(builder.source, client));
  }

可以看到,一个http2的链接中又多个stream用于请求。当我们写入请求头时,通过newstream获取一个流实例。

private Http2Stream newStream(
    int associatedStreamId, List<Header> requestHeaders, boolean out) throws IOException {
  boolean outFinished = !out;
  boolean inFinished = false;
  boolean flushHeaders;
  Http2Stream stream;
  int streamId;

  synchronized (writer) {
    synchronized (this) {
      if (nextStreamId > Integer.MAX_VALUE / 2) {//数量过多,关闭一部分拒绝的流
        shutdown(REFUSED_STREAM);
      }
      if (shutdown) {
        throw new ConnectionShutdownException();//当前链接已经关闭了,抛出异常
      }
      streamId = nextStreamId;
      nextStreamId += 2; //计数器+2
      stream = new Http2Stream(streamId, this, outFinished, inFinished, requestHeaders);//生成一个流
      flushHeaders = !out || bytesLeftInWriteWindow == 0L || stream.bytesLeftInWriteWindow == 0L;//是否需要立即把请求头写入socket
      if (stream.isOpen()) {//如果成功开启了,把这个流存入缓存
        streams.put(streamId, stream);
      }
    }
    if (associatedStreamId == 0) {//写入数据
      writer.synStream(outFinished, streamId, associatedStreamId, requestHeaders);
    } else if (client) {
      throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
    } else { // HTTP/2 has a PUSH_PROMISE frame.
      writer.pushPromise(associatedStreamId, streamId, requestHeaders);
    }
  }

  if (flushHeaders) {
    writer.flush();
  }

  return stream;
}

下面我们结合Http2Stream来看

Http2Stream

先看一下基本属性和构造方法

long bytesLeftInWriteWindow;//流量控制窗口值

final int id;
final Http2Connection connection;

/** Request headers. Immutable and non null. */
private final List<Header> requestHeaders;//请求头

/** Response headers yet to be {@linkplain #takeResponseHeaders taken}. */
private List<Header> responseHeaders;//响应头

/** True if response headers have been sent or received. */
private boolean hasResponseHeaders;//是否有响应头

private final FramingSource source;//读取一帧的流
final FramingSink sink;//写入一帧的流

在Http2中,使用帧的方法传递数据,并添加了流量控制的方案。具体的可以去查看相关资料,这里不赘述。

Http2Stream(int id, Http2Connection connection, boolean outFinished, boolean inFinished,
    List<Header> requestHeaders) {
  if (connection == null) throw new NullPointerException("connection == null");
  if (requestHeaders == null) throw new NullPointerException("requestHeaders == null");
  this.id = id;
  this.connection = connection;
  this.bytesLeftInWriteWindow =
      connection.peerSettings.getInitialWindowSize();
  this.source = new FramingSource(connection.okHttpSettings.getInitialWindowSize());
  this.sink = new FramingSink();
  this.source.finished = inFinished;
  this.sink.finished = outFinished;
  this.requestHeaders = requestHeaders;
}

在构造函数中生成了source和sink的实例。

private final class FramingSource implements Source {
  /** Buffer to receive data from the network into. Only accessed by the reader thread. */
  private final Buffer receiveBuffer = new Buffer();

  /** Buffer with readable data. Guarded by Http2Stream.this. */
  private final Buffer readBuffer = new Buffer();

  /** Maximum number of bytes to buffer before reporting a flow control error. */
  private final long maxByteCount;

  /** True if the caller has closed this stream. */
  boolean closed;

  /**
   * True if either side has cleanly shut down this stream. We will receive no more bytes beyond
   * those already in the buffer.
   */
  boolean finished;

  FramingSource(long maxByteCount) {
    this.maxByteCount = maxByteCount;
  }

  @Override public long read(Buffer sink, long byteCount) throws IOException {
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);

    long read = -1;
    ErrorCode errorCode;
    synchronized (Http2Stream.this) {
      waitUntilReadable();
      if (closed) {
        throw new IOException("stream closed");
      }
      errorCode = Http2Stream.this.errorCode;

      if (readBuffer.size() > 0) {
        // Move bytes from the read buffer into the caller's buffer.
        read = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
        unacknowledgedBytesRead += read;
      }

      if (errorCode == null
          && unacknowledgedBytesRead >= connection.okHttpSettings.getInitialWindowSize() / 2) {
        // Flow control: notify the peer that we're ready for more data! Only send a WINDOW_UPDATE
        // if the stream isn't in error.
        connection.writeWindowUpdateLater(id, unacknowledgedBytesRead);
        unacknowledgedBytesRead = 0;
      }
    }

    if (read != -1) {
      // Update connection.unacknowledgedBytesRead outside the stream lock.
      updateConnectionFlowControl(read);
      return read;
    }

    if (errorCode != null) {
      // We defer throwing the exception until now so that we can refill the connection
      // flow-control window. This is necessary because we don't transmit window updates until the
      // application reads the data. If we throw this prior to updating the connection
      // flow-control window, we risk having it go to 0 preventing the server from sending data.
      throw new StreamResetException(errorCode);
    }

    return -1; // This source is exhausted.
  }

  private void updateConnectionFlowControl(long read) {
    assert (!Thread.holdsLock(Http2Stream.this));
    connection.updateConnectionFlowControl(read);
  }

  /** Returns once the source is either readable or finished. */
  private void waitUntilReadable() throws IOException {
    readTimeout.enter();
    try {
      while (readBuffer.size() == 0 && !finished && !closed && errorCode == null) {
        waitForIo();
      }
    } finally {
      readTimeout.exitAndThrowIfTimedOut();
    }
  }

  void receive(BufferedSource in, long byteCount) throws IOException {
    assert (!Thread.holdsLock(Http2Stream.this));

    while (byteCount > 0) {
      boolean finished;
      boolean flowControlError;
      synchronized (Http2Stream.this) {
        finished = this.finished;
        flowControlError = byteCount + readBuffer.size() > maxByteCount;
      }

      // If the peer sends more data than we can handle, discard it and close the connection.
      if (flowControlError) {
        in.skip(byteCount);
        closeLater(ErrorCode.FLOW_CONTROL_ERROR);
        return;
      }

      // Discard data received after the stream is finished. It's probably a benign race.
      if (finished) {
        in.skip(byteCount);
        return;
      }

      // Fill the receive buffer without holding any locks.
      long read = in.read(receiveBuffer, byteCount);
      if (read == -1) throw new EOFException();
      byteCount -= read;

      // Move the received data to the read buffer to the reader can read it.
      synchronized (Http2Stream.this) {
        boolean wasEmpty = readBuffer.size() == 0;
        readBuffer.writeAll(receiveBuffer);
        if (wasEmpty) {
          Http2Stream.this.notifyAll();
        }
      }
    }
  }

  @Override public Timeout timeout() {
    return readTimeout;
  }

  @Override public void close() throws IOException {
    long bytesDiscarded;
    synchronized (Http2Stream.this) {
      closed = true;
      bytesDiscarded = readBuffer.size();
      readBuffer.clear();
      Http2Stream.this.notifyAll();
    }
    if (bytesDiscarded > 0) {
      updateConnectionFlowControl(bytesDiscarded);
    }
    cancelStreamIfNecessary();
  }
}

先到这里,下一篇详细分析一下OkHttp请求的三个核心 StreamAllocation RealConnection 和ConnectionPool

上一篇下一篇

猜你喜欢

热点阅读