okHttp 源码流程简单分析

2019-03-04  本文已影响0人  Wynne丶XXW

基础用法介绍

导入方式

Glide 导入 (app目录下的build.gradle)

(api/implementation/compile) com.squareup.okhttp3:okhttp:3.8.0"

1.Get 请求方式
 OkHttpClient client = new OkHttpClient.Builder().build();
                Request request = new Request.Builder().url(getUrl).build();

                client.newCall(request).enqueue(new Callback() {
                    @Override
                    public void onFailure(Call call, IOException e) {

                    }

                    @Override
                    public void onResponse(Call call, Response response) throws IOException {
                        emitter.onNext(response.body().string());
                    }
                });
2.Post 请求方式
 FormBody formBody = new FormBody.Builder().add("key", "value").build();
                OkHttpClient client = new OkHttpClient.Builder().build();
                Request request = new Request.Builder().post(formBody).url(postUrl).build();

                client.newCall(request).enqueue(new Callback() {
                    @Override
                    public void onFailure(Call call, IOException e) {

                    }

                    @Override
                    public void onResponse(Call call, Response response) throws IOException {
                        emitter.onNext(response.body().string());
                    }
                });

源码流程简单分析

流程图

okHttp请求网络的流程图

    //进行网络请求
     client.newCall(request).enqueue(new Callback() {
                    @Override
                    public void onFailure(Call call, IOException e) {

                    }

                    @Override
                    public void onResponse(Call call, Response response) throws IOException {
                        emitter.onNext(response.body().string());
                    }
                });

通常我们用okHtpp执行一个网络请求都是上例代码, 所以我们先从上例代码的enqueue(new CallBack())方法看起

public interface Call{

  //省略代码
  ...
  
  void enqueue(Callback responseCallback);
  ...
  //省略代码
  }

public class okHttpClient{
 @Override public Call newCall(Request request) {
    return new RealCall(this, request, false /* for web socket */);
  }
}

public class RealCall implements Call{
  RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    final EventListener.Factory eventListenerFactory = client.eventListenerFactory();

    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);

    // TODO(jwilson): this is unsafe publication and not threadsafe.
    this.eventListener = eventListenerFactory.create(this);
  }
}

首先我们发现exqueue(CallBack) 是Call接口的一个方法, 所以我们必须要找到它的实现类的exqueue方法,所以我们往前看,newCall方法,newCall方法内部创建了一个RealCall的对象, 我们发现RealCall的对象 确实实现了Call接口. 所以我们继续看RealCall类的exqueue方法

@Override public void enqueue(Callback responseCallback) {
    //省略代码
    ...
    client.dispatcher().enqueue(new AsyncCall(responseCallback));

  }

public class Dispatcher{
synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }
 }

class AsyncCall extends NamedRunnable{
@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
}
    

enqueue方法很简单 就是调用了Dispatcher的enqueue方法,并传进去了一个Runnable,(AsyncCall 继承NamedRunnable, NamedRunnable 实现了Runnable接口). enqueue方法里面对当前传进来的线程做了判断, 如果正在运行的异步线程数量小于总数量 并且请求数量小于总数量的话 ,就会把传教来的Runnable 加入到队列,然后放入线程池中, 否则将会加入到等待队列中,等待请求. 加入线程池后, 就自然会调用到AsyncCall的execute方法, 这个时候我们会发现平常我们的返回回调的一些方法,不要以为okHttp原来这么简单, 其实这才刚刚开始要进行网络请求. 首先,我们看getResponseWithInterceptorChain()方法

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

getResponseWithInterceptorChain()方法,从名字上来看可以知道是返回相应的拦截链(我的扣脚英语阿!),这里面的操作很简单,我们先只要知道, 这是一个网络请求的拦截连, 最后会通过递归的方式调用RealInterceptorChain的proceed方法进行网络请求


拦截连示意图

现在我们回过头来看返回Response之后做了什么事情, 首先我们通过RetryAndFollowUpInterceptor 拦截器 是否取消了请求, 如果是,则返回onFailure(),如果没有则返回我们网络请求的结果调用onResponse方法, 如果中间出现了异常,则会抛出异常直接调用onFailure()方法. 最后会调用Dispatcher的finished()方法,对队列中的请求线程做处理

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

RealInterceptorChain 拦截器

 public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    省略代码
   ...
    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

   ...
    return response;
  }

getResponseWithInterceptorChain()方法最后会执行RealInterceptorChain的proceed方法. 这个类主要是以递归的方式,让拦截器们处理请求过程和响应结果的一个过程, 从proceed方法可以看出我们会再方法内部又创建一个自己的对象,进行赋值, 通过拦截器队列获取下一个Intercept拦截器,然后调用他们的intercept方法.

RetryAndFollowUpInterceptor 拦截器

从名字可以看出这应该是让请求重新连接的一个拦截器,接下来我们看看 具体做了一些什么事情.


@Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    <1>
    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);

    int followUpCount = 0;
    Response priorResponse = null;
    <2>
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response = null;
      boolean releaseConnection = true;
      try {
      <3>
        response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
      <4>
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
      <5>
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // We're throwing an unchecked exception. Release any resources.
        <6>
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }

      <7>
      Request followUp = followUpRequest(response);

      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }

      closeQuietly(response.body());
      <8>
      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (followUp.body() instanceof UnrepeatableRequestBody) {
        streamAllocation.release();
        throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
      }

      if (!sameConnection(response, followUp.url())) {
        streamAllocation.release();
        streamAllocation = new StreamAllocation(
            client.connectionPool(), createAddress(followUp.url()), callStackTrace);
      } else if (streamAllocation.codec() != null) {
        throw new IllegalStateException("Closing the body of " + response
            + " didn't close its backing stream. Bad interceptor?");
      }

      request = followUp;
      priorResponse = response;
    }
  }
  1. 创建了StreamAllocationd 对象, 这个对象是用来进行连接后端服务器的, 在这个地方暂时没有用到, 会通过proceed方法传递到下一个intercept拦截器中.
  2. 用while 做死循环,重复请求
  3. 这里我们通过向下转型又去调用了RealInterceptorChain的proceed方法. 传递给下一个拦截器进行网络请求,
  4. 当我们网络请求后返回结果后 如果抛出RouteException 异常, 先会判断是否可以进行恢复操作 (!recover(e.getLastConnectException(), false, request)), 如果恢复则直接抛出异常,最后走finally方法 进行释放资源, 如果恢复了则continue 跳出循环 ,继续执行第3步 进行网络请求
  5. 和第四步类似,只是抛出的是IO异常
  6. 释放资源
  7. 对请求结果进行判断, 如果是200, 这里会返回null,当为null时我们就会把结果返回给RealInterceptorChain ,再通过回调返回给上一个调用者
  8. 判断重连次数是否大于最大值,如果大于则释放资源,抛出ProtocolException异常.

BridgeInterceptor 拦截器

这一个拦截器 主要时对网络请求的request 进行加工, 我们平时写okHttp的时候 只要简单的配置request的url就可以进行网络请求, 其实是不对的. 之所以还是能请求网络是因为BridgeInterceptor对我们的request进行了加工

@Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    <1>
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
     
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }
    <2>
    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }
    <3>
    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    <4>
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }
    <5>
    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }
    <6>
    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }
    <7>
    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    <8>
    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }
  1. 判断是否存在内容类型,如果存在则添加. (content-Type 默认为 charset=utf-8) . 如果内容长度不为-1, 则添加content-Length,删除Transfer-Encoding,否则相反. (content-Length属性来说明数据的大小。 Transfer-Encoding属性,它的作用是当传输数据的大小不确定时通过chunked(片)的方式进行传输, 两个是相斥的只有一个能存在)
  2. 判断host是否为空,为空则host进请求头
  3. 判断Connection 是否为空,为空则添加进请求头,(keep-Alive: 使客户端与服务端持久有效)
  4. 判断请求头中是否存在编码类型(accept-Encoding) 不存在则添加gzip(okhttp默认是gzip压缩)
  5. 判断cookie 是否为空, 为空则添加进请求
  6. 判断user-agent(用户代理)是否存在,为空则添加进请求头,默认是okhttp的版本号,默认为okhttp/3.8.0
  7. 对请求头加工好之后,我们还是和之前一样,继续调用下一个拦截器的proceed方法
  8. 当返回来的response 是gzip的编码格式,okHttp就会对其进行压缩处理,然后返回给上一个拦截器

CacheInterceptor 拦截器

对网络请求缓存做处理的拦截器

@Override public Response intercept(Chain chain) throws IOException {
<1>
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();

<2>
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

<3>
    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

<4>
    // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }


    Response networkResponse = null;
    try {
    <5>
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
    <6>
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }
Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    <7>
    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }  }
    }

  1. 获取缓存, 前提是我们如果设置了缓存(这里当它设置了, 具体细节比较深入,此文只是了解流程,并没有太过深入,其实是看不懂 哈哈)
  2. 创建一个缓存策略对象,这里要说到两个变量networkRequest,cacheResponse. 通过他们两个的值可以判断是使用网络请求还是缓存.
  3. 如果网络请求为空 并且 缓存也为空,就直接返回一个code为504的响应
  4. 如果网络请求为空,直接返回缓存了的响应结果
  5. 调用下一个的Interceptor拦截器的proceed方法
  6. 在缓存不为空并且网络请求的结果与上次请求结果没有变化的情况下, 返回缓存
  7. 写入缓存

ConnectInterceptor 拦截器

终于到了正在连接后端的拦截器,前面所有的努力都只为了这一刻!

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    <1>
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    <2>
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
  1. 之前我们在RecyInterceptor中创建的StreamAllocation 就是在这里用到的,用来连接服务端
  2. HttpCodec 是一个流,通过用socket 与服务端进行连接

由于我们只是了解大概流程 所有详细细节如下, 如果不感兴趣的可以自行跳过

public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
1. 设置连接超时,读取超时,写入超时
   int connectTimeout = client.connectTimeoutMillis();
   int readTimeout = client.readTimeoutMillis();
   int writeTimeout = client.writeTimeoutMillis();
   boolean connectionRetryEnabled = client.retryOnConnectionFailure();

   try {
   2.找到一个健康的连接(可以使用的)的连接
     RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
         writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
        3.创建HttpCode
     HttpCodec resultCodec = resultConnection.newCodec(client, this);

     synchronized (connectionPool) {
       codec = resultCodec;
       return resultCodec;
     }
   } catch (IOException e) {
     throw new RouteException(e);
   }
 }
 


private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
     int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
     throws IOException {
     省略代码
     ...
   while (true) {
   1. 循环遍历查找 可以使用的连接
     RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
         connectionRetryEnabled);

     // If this is a brand new connection, we can skip the extensive health checks.
     synchronized (connectionPool) {
     2. 如果成功个数为0 ,则正面是可用的连接.返回
       if (candidate.successCount == 0) {
         return candidate;
       }
     }
     ...
     return candidate;
   }
 }


 public HttpCodec newCodec(

     OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
     1. 在之前findConnect的时候 http2Connection 被赋值为null
   if (http2Connection != null) {
     return new Http2Codec(client, streamAllocation, http2Connection);
   } else {
    2.   设置socket的各种超时
     socket.setSoTimeout(client.readTimeoutMillis());
     source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
     sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
     3. 创建Http1Codec返回
     return new Http1Codec(client, streamAllocation, source, sink);
   }
 }

CallServerIntercepot 拦截器

 @Override public Response intercept(Chain chain) throws IOException {
   <1>
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();
   <2>
    long sentRequestMillis = System.currentTimeMillis();
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
    <3>
      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
        // being reused. Otherwise we're still obligated to transmit the request body to leave the
        // connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }
    <4>
    httpCodec.finishRequest();

    <5>
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
      
     省略代码
     ...
     

    return response;
  }
  1. 配置请求信息
  2. 发送请求时间写入请求头
  3. 进行请求写入
  4. 请求结束
  5. 响应读取

当我们返回响应结果之后,就会一层层回到上一个Intercept, 最后回调到我们的onResponse方法. 到此整个okHttp的网络请求的简单流程就已经走完了,其中很多地方都是带过没有具体详细深入,到之后学习深入细节后 再重新更新文章.加油吧 骚年

学习资料
okHttp超详细解析

上一篇下一篇

猜你喜欢

热点阅读