Android开发技术分享Android开发Android技术知识

okhttp源码解析(二):网路数据的读取

2018-04-28  本文已影响0人  珠穆朗玛小王子

前言

上一篇我们整体分析了一下okhttp的网络请求从发起到结束的流程,因为省略了很多的细节,所以还是很容易理解的,今天我们来重点分析一下okhttp是怎么从网络读取数据的。

之前提到过,okhttp的封装是非常仔细的,他使用okio框架来读取数据,这次分析okio的内容并不在我们的分析范围之内,不了解的朋友可以自己去学习一下。

正文

上一篇我们最后分析到了Interceptors的递归,完成了整个网络请求的读取,包括缓存策略:

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    // 重试的Interceptor,在构造方法中创建
    interceptors.add(retryAndFollowUpInterceptor);
    // 桥接Interceptor,主要对request的header进行操作,添加一些header信息
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    // 缓存Interceptor,判断是否要使用缓存
    // 如果使用缓存,直接返回缓存的response,后面的Interceptor就不会得到执行
    interceptors.add(new CacheInterceptor(client.internalCache()));
    // 连接Interceptor
    interceptors.add(new ConnectInterceptor(client));
    // 我们在okhttp自定义的interceptor,不设置的话,可以忽略
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    // 呼叫服务端的Interceptor,主要是读取从服务端返回的数据
    interceptors.add(new CallServerInterceptor(forWebSocket));
    // Interceptor递归的开始
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

代码已经写了注释,今天重点分析的是http数据读取部分,所以我们的分析主要是:

BridgeInterceptor -> ConnectInterceptor -> CallServerInterceptor

首先我们打开BridgeInterceptor:

@Override
    public Response intercept(Chain chain) throws IOException {
        ...
        // 如果我们有RequestBody,会写一些header信息,这里就省略了
        RequestBody body = userRequest.body();
        if (body != null) {
            ...
        }
        ...
        // 如果我们没有指定编码的格式,默认使用gzip
        boolean transparentGzip = false;
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
            transparentGzip = true;
            requestBuilder.header("Accept-Encoding", "gzip");
        }
        // 把之前的cookie存在header里
        List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
        if (!cookies.isEmpty()) {
            requestBuilder.header("Cookie", cookieHeader(cookies));
        }
        ...
        // 得到Response
        Response networkResponse = chain.proceed(requestBuilder.build());
        // 保存新的cookie
        HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

        Response.Builder responseBuilder = networkResponse.newBuilder()
                .request(userRequest);
        // 如果使用的gzip编码,并且返回的response有body信息,对做相应的处理
        if (transparentGzip
                && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
                && HttpHeaders.hasBody(networkResponse)) {
            GzipSource responseBody = new GzipSource(networkResponse.body().source());
            Headers strippedHeaders = networkResponse.headers().newBuilder()
                    .removeAll("Content-Encoding")
                    .removeAll("Content-Length")
                    .build();
            responseBuilder.headers(strippedHeaders);
            String contentType = networkResponse.header("Content-Type");
            responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
        }

        return responseBuilder.build();
    }

可以看到BridgeInterceptor主要是对header信息进行一些补全,在功能上没有做特别处理,所以就简单看一眼。

接下来是ConnectInterceptor:

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    // 判断请求的方式是否是get
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    // 得到指定的编码器
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    // 得到可用的连接
    RealConnection connection = streamAllocation.connection();
    // 把得到的编码器和连接传给下一个Interceptor
    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

这里判断了请求的方式是否是Get,然后去创建指定的编码解码器,创建Connection,然后传递给下个Interceptor。

细心的朋友会发现:

StreamAllocation streamAllocation = realChain.streamAllocation();
这个对象是怎么来的,我记得一开始的时候是null啊
 // Interceptor递归的开始
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis())
StreamAllocation这个类保存了很多的信息,例如地址相关的信息,HttpCodec,连接池,路由等等,可以说是很强大了。他的创建是在RetryAndFollowUpInterceptor中:

 @Override public Response intercept(Chain chain) throws IOException {
   ...
    StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
    this.streamAllocation = streamAllocation;}

重试机制不在这次的分析范围内,所以大家知道创建的时机就可以了。

然后看一看这个编解码器HttpCodec是怎么来的:

public HttpCodec newStream(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
     ...
      // 找到可用的链接
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      // 创建指定的编码器
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
      ...
  }

这个newStream方法做了两件事:

1、先找到可以用的连接;

2、创建指定的编解码器HttpCodec。

找到可以使用的连接是一个复杂的过程,这里我们把好几个方法一起串起来看:

 private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
      boolean doExtensiveHealthChecks) throws IOException {
    // 循环查找可用的链接
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);
      // 如果这是一个新的链接,可以跳过检查
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }
      // 如果这个链接不可用,进入下次循环
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }
      return candidate;
    }
  }
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    ...
    // 从连接池中找到能用的链接
    synchronized (connectionPool) {
      // 对现在的连接做一个备份
      releasedConnection = this.connection;
      // 得到要关闭的链接的socket
      toClose = releaseIfNoNewStreams();
      // 经过检查以后,发现当前的链接可以继续使用
      if (this.connection != null) {
        result = this.connection;
        releasedConnection = null;
      }
      ...
       // 如果当前没有可以用的链接,从连接池中查找
      if (result == null) {
        // Attempt to get a connection from the pool.
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;// 使用路由地址,可以是代理地址
        }
      }
    }
    // 关闭之前的socket
    closeQuietly(toClose);
    ... 
    if (result != null) {
      return result;
    }
   // 开始通过路由中的地址,去查找
    boolean newRouteSelection = false;
    // 如果之前没有找到链接,使用下一个路由
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    synchronized (connectionPool) {
     ...
      if (newRouteSelection) {
        List<Route> routes = routeSelection.getAll();
        for (int i = 0, size = routes.size(); i < size; i++) {
          Route route = routes.get(i);
          // 在连接池中遍历路由集合,直到找到能用的链接
          Internal.instance.get(connectionPool, address, this, route);
          if (connection != null) {
            foundPooledConnection = true;
            result = connection;
            this.route = route;
            break;
          }
        }
      }
      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          // 如果没有找到,再使用下一个路由集合
          selectedRoute = routeSelection.next();
        }
        route = selectedRoute;
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        acquire(result, false);
      }
    }

    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // 到这里还没找到连接,那就去创建这个连接
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      reportedAcquired = true;

      // 加入到链接池
      Internal.instance.put(connectionPool, result);

      // 如果result连接是http2.0连接,http2.0支持一个连接同时发起多个请求,这里做去重判断,防止创建多个
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
  }

这里就完全看出okhttp的优势了,他在使用一个http连接之后, 并没有马上关闭这个连接,而是等待复用,下一次访问的时候,就省去了创建连接的成本,优化了网络请求的速度。

每一次网络访问成功,都会把相应的地址信息保存到连接池中,下一次访问的时候,去连接池中取,看看是否有可用的连接。为了保证接通率,我们还可以设置代理地址,只要代理地址接通,一样会保存下来。这里我们还看到了http2.0的优势,一个连接可以同时处理多个网络请求,这样可以大大省去网络的连接成本。

在这两个方法中还涉及到了很多其他类的方法,这里就先省略了,大家可以自己去查看。

第二步是获取指定的编解码器:

public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
      StreamAllocation streamAllocation) throws SocketException {
    if (http2Connection != null) {
      return new Http2Codec(client, chain, streamAllocation, http2Connection);
    } else {
      socket.setSoTimeout(chain.readTimeoutMillis());
      source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
      return new Http1Codec(client, streamAllocation, source, sink);
    }
  }

非常简单,更新http的协议规则,返回对应的HttpCodec,判断也很简单,他的判断主要有两处:

首先判断是否使用了ssl加密,如果没有就使用Http1Codec

if (route.address().sslSocketFactory() == null) {
      protocol = Protocol.HTTP_1_1;
      socket = rawSocket;
      return;
    }

如果有使用,就去判断具体http协议是哪一种,如果是是http 2.0就会使用Http2Codec,其他情况使用Http1Codec

// 判断是否支持安全协议,支持的话去判断使用的事哪个协议
      String maybeProtocol = connectionSpec.supportsTlsExtensions()
          ? Platform.get().getSelectedProtocol(sslSocket)
          : null;
      protocol = maybeProtocol != null
          ? Protocol.get(maybeProtocol)
          : Protocol.HTTP_1_1;

OK,这里ConnectInterceptor就结束了,okhttp默认是支持sslFactory的。

最后就是CallServerInterceptor了,也是最复杂的部分,所以这里我们分块分析,首先是写入request信息:

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    // 得到之前的Interceptor传递的各种参数
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();
    ...
    // 编码器写入header信息
    httpCodec.writeRequestHeaders(request);
    Response.Builder responseBuilder = null;
    // 如果我们的方法需要允许提交body,并且body不等于null
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // 100表示距离上次请求,内容未发生变化
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        // 我们期望返回100,直接发送,不发送body
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);
      }
      if (responseBuilder == null) {
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        // 把Request的Request的body写入
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        // 如果连接不支持同时响应多个请求,就直接关闭
        streamAllocation.noNewStreams();
      }
    }
    // 写入request结束
    httpCodec.finishRequest();}
 ...
}

首先我们先把Request的header传送给服务器,有两种情况:

1、如果我们期望返回100,也就是内容无变化,那就不用写入body了;

2、把body通过输入流的形式,发送给服务器;

完成上面的写入步骤,判断这个连接是否支持多请求访问,不支持就可以直接关闭了。

下面是读取数据的步骤:

@Override public Response intercept(Chain chain) throws IOException {
    ...
    // 写入request结束
    httpCodec.finishRequest();
    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      // 读取相应response的header信息
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    // 创建response,把握手信息,和request等信息保存进去
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
    // 开始判断请求码
    int code = response.code();
    if (code == 100) {
      // 如果是100,直接读取header
      responseBuilder = httpCodec.readResponseHeaders(false);
      response = responseBuilder
              .request(request)
              // 握手
              .handshake(streamAllocation.connection().handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();

      code = response.code();
    }
    ...
    // 判断请求码
    if (forWebSocket && code == 101) {
      // 客户端需要转换协议,这里需要设置一个空的response
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      // 读取网络的body
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }
    // 如果header请求关闭连接
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      // 关闭这个链接
      streamAllocation.noNewStreams();
    }
    // 特殊code判断
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }

这样Response的就读取结束了,具体里面的流程。

如果想要了解具体的读取和写入流程,以我现在使用的Http 2.0为例:

连接:Http2Connection;

流:Http2Stream;

编解码器:Http2Codec;

读操作:Http2Reader;

写操作:Http2Writer;

他们之间的关系:

1、Http2Connection调用Http2Reader和Http2Writer来进行读写;

2、Http2Stream调用Http2Connection进行读写;

3、Http2Codec调用Http2Connection和Http2Stream进行操作;

总结

今天我们分析了okhttp是如何通过Interceptor完成了网络请求的原理,我们发现okhttp在连接的缓存上帮助我们节省每一次http请求建立连接的时间成本,这样节省了系统资源,提高了网络访问的速度,但是并不支持所有的http协议。

既然了解了okhttp的这一特性,我们在http上优化网络请求的速度上又有了新的思路,优先使用HTTP 2.0协议。

明天就是五一假期了,希望大家多出去走一走,学习之余也要享受生活,拜拜~

上一篇 下一篇

猜你喜欢

热点阅读