Android开发经验谈

okhttp源码学习(二)主要流程

2019-06-29  本文已影响1人  刘景昌

一个基本的OKhttp基本创建立流程

    OkHttpClient client = new OkHttpClient();
    RequestBody body = RequestBody.create(JSON, "{json:json}");
    Request request = new Request.Builder()
            .url("https:www.baidu.com")
            .post(body)
            .build();
    Response response = client.newCall(request).execute();

分析下最基础的类

1.OkHttpClient

分析构造方法并添加注释(添加注释是为了 后面看着更加方便)

  public OkHttpClient() {
    this(new Builder());
  }

  OkHttpClient(Builder builder) {
    this.dispatcher = builder.dispatcher; //调度器
    this.proxy = builder.proxy;//代理
    this.protocols = builder.protocols;//协议
    this.connectionSpecs = builder.connectionSpecs;//传输层版本和连接协议
    this.interceptors = Util.immutableList(builder.interceptors);//拦截器
    this.networkInterceptors = Util.immutableList(builder.networkInterceptors);//网络拦截器
    this.eventListenerFactory = builder.eventListenerFactory;
    this.proxySelector = builder.proxySelector;//代理选择器
    this.cookieJar = builder.cookieJar;//cookie
    this.cache = builder.cache;//cache 缓存
    this.internalCache = builder.internalCache;//内部缓存
    this.socketFactory = builder.socketFactory;//socket 工厂
    boolean isTLS = false;
    for (ConnectionSpec spec : connectionSpecs) {
      isTLS = isTLS || spec.isTls();
    }

    if (builder.sslSocketFactory != null || !isTLS) {
      this.sslSocketFactory = builder.sslSocketFactory;//socket工厂 用于https
      this.certificateChainCleaner = builder.certificateChainCleaner;//验证确认响应书,适用HTTPS 请求连接的主机名
    } else {
      X509TrustManager trustManager = Util.platformTrustManager();
      this.sslSocketFactory = newSslSocketFactory(trustManager);
      this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
    }

    if (sslSocketFactory != null) {
      Platform.get().configureSslSocketFactory(sslSocketFactory);
    }

    this.hostnameVerifier = builder.hostnameVerifier;//主机名字确认
    this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
        certificateChainCleaner);//证书链
    this.proxyAuthenticator = builder.proxyAuthenticator;//代理身份验证
    this.authenticator = builder.authenticator;//本地省份验证
    this.connectionPool = builder.connectionPool;//链接池 复用连接
    this.dns = builder.dns;//域名
    this.followSslRedirects = builder.followSslRedirects;//安全套接层重定向
    this.followRedirects = builder.followRedirects;//本地重定向
    this.retryOnConnectionFailure = builder.retryOnConnectionFailure;//重试连接失败
    this.callTimeout = builder.callTimeout;
    this.connectTimeout = builder.connectTimeout;//连接超时
    this.readTimeout = builder.readTimeout;//读取超时
    this.writeTimeout = builder.writeTimeout;//写入超时
    this.pingInterval = builder.pingInterval;

    if (interceptors.contains(null)) {
      throw new IllegalStateException("Null interceptor: " + interceptors);
    }
    if (networkInterceptors.contains(null)) {
      throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
    }
  }

我们在创建好了 OkHttpClient 之后我们有创建了一个Request

2.Request、Response

request:我们通常说的一个请求所携带的所有信息由 Headers和 RequestBody组成 RequestBody有两个子类 FormBody (表单提交的)和 MultipartBody(文件上传)
Response:服务请求的返回值,同样包括Headers和RequestBody,而ResponseBod的子类也是有两个:RealResponseBody(真实响应)和CacheResponseBody(缓存响应)
在生成了真正的请求后 我们 然后就是使用 client.newCall(request).execute();去真正的请求数据
OkHttpClient 的newCall方法

@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }

实际上是调用了 RealCall的newRealCall 方法
下面我们来看看RealCall类

3.RealCall类

先来看构造方法 已添加注释

 /***
   * 构造方法
   * @param client 初始的 OkHttpClient对象
   * @param originalRequest 请求的 Request对象
   * @param forWebSocket 是否使用webSokcet
   */
 static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.transmitter = new Transmitter(client, call);
    return call;
  }

里面的两个重要方法
execute():同步网络请求
enqueue():异步网络请求
我们以同步为例子继续向下看

  @Override public Response execute() throws IOException {
    //使用同步加 executed 布尔值判断 确认executed只有一个在执行
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    //创建超时控制
    transmitter.timeoutEnter();
    // 调用eventListener的callStart方法(绑定call)
    transmitter.callStart();
    try {
      //将call添加到runningSyncCalls这个双向队列中
      client.dispatcher().executed(this);
      return getResponseWithInterceptorChain();
    } finally {
      client.dispatcher().finished(this);
    }
  }

关于Transmitter 后面再或这次只过主流程
我们走到最后 到getResponseWithInterceptorChain方法中 我们进去看一下

  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    //添加开发者应用层自定义的Interceptor
    interceptors.addAll(client.interceptors());
    //这个Interceptor是处理请求失败的重试,重定向
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    //这个Interceptor工作是添加一些请求的头部或其他信息 并对返回的Response做一些友好的处理
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //这个Interceptor的职责是判断缓存是否存在,读取缓存,更新缓存等等
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //这个Interceptor的职责是建立客户端和服务器的连接
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      //添加开发者自定义的网络层拦截器
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));
    //一个包裹这request的chain
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    boolean calledNoMoreExchanges = false;
    try {
      Response response = chain.proceed(originalRequest);
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }

这这里我们获得了一个拦截器集合 通过 RealInterceptorChain的构造方法获得 Interceptor.Chain
我们看一下RealInterceptorChain的构造方法

  public RealInterceptorChain(List<Interceptor> interceptors, Transmitter transmitter,
      @Nullable Exchange exchange, int index, Request request, Call call,
      int connectTimeout, int readTimeout, int writeTimeout) {
    this.interceptors = interceptors;
    this.transmitter = transmitter;
    this.exchange = exchange;
    this.index = index;
    this.request = request;
    this.call = call;
    this.connectTimeout = connectTimeout;
    this.readTimeout = readTimeout;
    this.writeTimeout = writeTimeout;
  }

这里面只有赋值没有其他的操作
然后就剩下一个了chain.proceed(originalRequest) 我们来看这个方法
真正的proceed方法

  public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
      throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.exchange != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

最后返回带到 excute中


 @Override public Response execute() throws IOException {
    //使用同步加 executed 布尔值判断 确认executed只有一个在执行
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    //创建超时控制
    transmitter.timeoutEnter();
    // 调用eventListener的callStart方法(绑定call)
    transmitter.callStart();
    try {
      //将call添加到runningSyncCalls这个双向队列中
      client.dispatcher().executed(this);
      return getResponseWithInterceptorChain();
    } finally {
      client.dispatcher().finished(this);
    }
  }

到这里同步的流程就走完了。
再来看看异步的请求

Response response = client.newCall(request).enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
        }
        @Override
        public void onResponse(Call call, Response response) throws IOException {
        }
    });

进入 enqueue方法



  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    // 调用eventListener的callStart方法(绑定call)
    transmitter.callStart();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

合同步一样先调用了 transmitter.callStart();然后调用 dispatcher.enqueue(new AsyncCall(responseCallback));那么咱们进入dispatcher看下


  void enqueue(AsyncCall call) {
    synchronized (this) {
      readyAsyncCalls.add(call);

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.get().forWebSocket) {
        AsyncCall existingCall = findExistingCallWithHost(call.host());
      //这只是一个赋值函数
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
    promoteAndExecute();
  }

第一步我们把call添加到 readyAsyncCalls 然后判断 是否为websocket 进入reuseCallsPerHostFrom方法 最后进图promoteAndExecute() 我们进入promoteAndExecute里面看一下


    //最大同时请求数量
  private int maxRequests = 64;
  //同一个Host下的最大同时请求数量
  private int maxRequestsPerHost = 5;
  private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();
      //如果正在执行的请求小于设定值即64
        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
      //请求同一个主机的request小于设定值即5
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

根据源码和注释大家可以看到如果正在执行的异步请求小于64,并且请求同一个主机小于5的时候就先往正在运行的队列里面添加这个call,然后循环 执行 asyncCall.executeOn我们先去看一下 AsyncCall类

  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;
    private volatile AtomicInteger callsPerHost = new AtomicInteger(0);

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    AtomicInteger callsPerHost() {
      return callsPerHost;
    }

    void reuseCallsPerHostFrom(AsyncCall other) {
      this.callsPerHost = other.callsPerHost;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    /**
     * Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
     * if the executor has been shut down by reporting the call as failed.
     */
    void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

有父类进去看一下

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

看到这里我们终于有看到熟悉的方法了 就是getResponseWithInterceptorChain()这个方法 然后就是安装同步的流程走完全部
下面我们来就来看一下整的Okhttp的流程图


image.png

下篇开始一个个的详解

最后献上一份添加了注释的源码 https://github.com/525642022/okhttpTest/blob/master/README.md
哈哈

上一篇下一篇

猜你喜欢

热点阅读