Okhttp源码解析(概述)

2018-07-06  本文已影响0人  TangBuzhi

使用

添加gradle依赖

compile 'com.squareup.okhttp3:okhttp:3.8.0'

okhttp发起网络请求可分为四步

  1. 调用OkHttpClient产生其实例client
  2. 通过建造者Request#Builder构造request
  3. 调用client.newCall(request)生成call
  4. 调用call同步(excute)或异步(enqueue)方法产生成功/失败回调
      //步骤1
      //使用建造者模式构造client如果不添加任何配置等同于OkHttpClient()使用默认配置
      val okClient = OkHttpClient.Builder().build()//OkHttpClient()
      //步骤2(get请求)
      val okRequest = Request.Builder()
                  .url("url")
                  .build()
      //步骤2(post请求)
      val mediaType = MediaType.parse("application/json; charset=utf-8")
      val okBody = RequestBody.create(mediaType , "json")
      val okRequest = Request.Builder()
                .url("url")
                .post(okBody)
                .build()
      //步骤3
      val okCall = okClient.newCall(okRequest)
      //步骤4
      //val response = okCall.excute()//同步请求
      okCall.enqueue(object : Callback {//异步请求
             override fun onFailure(call: Call?, e: IOException?) {}//失败

             override fun onResponse(call: Call?, response: Response?) {}//成功
         })

源码解析

观察Okhttp发起网络请求的4个步骤,还是很清晰的.

  1. 步骤1的作用很明确,就是产生OkHttpClient实例,当然初始化client时,也初始化了其他一些重要的字段,比如任务派发机制Dispatcher,后面会详细讲,这里略过;
  2. 步骤2的作用也很明确,就是产生Request实例,既然说到Request,不得不说与它对应的Response,它们可以进行一些对比理解:
//Request
public final class Request {
  final HttpUrl url;//请求url
  final String method;//请求方式
  final Headers headers;//请求头
  final @Nullable RequestBody body;//请求体
  final Object tag;//标识位
  //省略...
}
//Response
public final class Response implements Closeable {
  final Request request;//请求
  final int code;//响应code
  final String message;//响应message
  final Headers headers;//响应头
  final @Nullable ResponseBody body;//响应体
  //省略...
}
  1. 步骤3的作用也是很明确的,就是产生call,它是通过client.newCall(request)方法而产生,下面看newCall方法:
    OkHttpClient#newCall
@Override public Call newCall(Request request) {
    return new RealCall(this, request, false /* for web socket */);
  }

既然是产生Call实例,那么先看下Call究竟是个啥?

public interface Call extends Cloneable {
       Request request();//返回初始化此call的原始request
       Response execute() throws IOException;//同步阻塞方法,直到response被处理或error时释放
       void enqueue(Callback responseCallback);//异步
       void cancel();//取消call
       boolean isExecuted();//是否执行过
       boolean isCanceled();//是否被取消
       Call clone();//clone方法,一个call只能被执行一次,后面源码会讲到,如果想再次请求,就需调用此方法
       interface Factory {//产生call的工厂接口,将call的真正实现放到具体类中初始,从而实现动态配置
              Call newCall(Request request);
          }
}

从Call的源码可知:call封装了okhttp的请求任务,所有对请求的操作全部在该接口里面有定义,可以毫不夸张的讲,call是okhttp的核心接口!此外,call里面还定义了实例化call的工厂接口,它的作用是将真正call的实例化过程延迟到具体类中,从而实现动态配置,此乃okhttp一大妙处.
从OkHttpClient源码public class OkHttpClient implements Cloneable, Call.Factory可知,它也实现了Call.Factory接口,返回Call实例,而查看call的继承树可知,它的唯一实现就是RealCall:
RealCall

final class RealCall implements Call {
    final OkHttpClient client;
    final Request originalRequest;
    final boolean forWebSocket;
    RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        this.client = client;//okhttpclient实例
        this.originalRequest = originalRequest;//原始request
        this.forWebSocket = forWebSocket;//是否是websocket
        //省略...
  }
}

到这里步骤3产生call的过程分析完毕.
4.1 同步请求
RealCall#execute

@Override public Response execute() throws IOException {
    synchronized (this) {//同步锁,key是realcall本身
      //如果call执行过则抛已执行异常,没有执行过则把标识位executed置为true
      //从这里可以看出,每个call只能执行一次
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();//捕获请求的StackTrace
    try {
      client.dispatcher().executed(this);//到这里真正的主角任务派发器dispatcher上场
      Response result = getResponseWithInterceptorChain();//返回response对象
      if (result == null) throw new IOException("Canceled");//如果response对象为空,则抛取消异常
      return result;//正常response返回
    } finally {//无论try里面的代码执行如何,最终总会执行finally
      client.dispatcher().finished(this);//把call任务释放掉
    }
  }

观察execute方法,首先同步锁状态,判断call是否有执行过,一旦执行过则抛已执行异常,接着是捕获请求的StackTrace,然后调用任务派发器Dispatcher对任务进行派发,紧接着调用getResponseWithInterceptorChain方法,返回response对象,在response不为空时返回,并最终调用任务派发器释放请求call.
下面看okhttp的核心类任务派发器Dispatcher:
Dispatcher

public final class Dispatcher {
  private int maxRequests = 64;//最大并发请求数
  private int maxRequestsPerHost = 5;//每台主机最大请求数
  private @Nullable ExecutorService executorService;//线程池
  //双向队列,先进先出
  //存放待执行异步任务
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  //存放正在执行异步任务
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  //存放正在执行同步任务
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
  public Dispatcher(ExecutorService executorService) {//带参构造,参数executorService为用户自定义线程池
      this.executorService = executorService;
    }

  public Dispatcher() {//空参构造方法
  }

  public synchronized ExecutorService executorService() {
    if (executorService == null) {//当用户使用默认配置时,由系统默认创建线程池
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
}

从上面源码可知,dispatcher里面封装了三个任务队列:正在运行的同步任务,正在运行的异步任务和待运行的异步任务。此外还有用于执行任务的线程池以及一些配置,比如默认最大并发请求数,每台主机的最大请求数等。好的,回到RealCall的excute方法,看看dispatcher的excute方法:
Dispatcher#excute

synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

这个源码很简单,首先它是个同步方法,然后就是把call任务添加到正在运行的同步任务队列中,这也解释了为什么同步excute方法是个阻塞方法了,因为它要等到response被处理并返回或error时才会释放。下面就是getResponseWithInterceptorChain返回response了:
RealCall#getResponseWithInterceptorChain

Response getResponseWithInterceptorChain() throws IOException {
    //创建一个拦截器集合
    List<Interceptor> interceptors = new ArrayList<>();
    //用户自定义的拦截器
    interceptors.addAll(client.interceptors());
    //处理请求失败的重试,重定向
    interceptors.add(retryAndFollowUpInterceptor);
    //添加一些请求的头部或其他信息,并对返回的Response做部分处理
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //判断缓存是否存在,读取缓存,更新缓存等
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //建立客户端和服务器的连接
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {//用户自定义的网络层拦截器
      interceptors.addAll(client.networkInterceptors());
    }
    //最后一个拦截器,用于向服务器发起网络请求
    interceptors.add(new CallServerInterceptor(forWebSocket));
    //一个包裹原始request的chain
    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    //把chain传递到第一个Interceptor手中
    return chain.proceed(originalRequest);
  }

观察这部分源码,也是极为简单的,首先就是创建一个拦截器集合,接着不断向里面添加拦截器,然后创建真正的拦截器链RealInterceptorChain,最后通过此拦截器链来处理原始request请求来返回response.下面看RealInterceptorChain是如何处理请求返回response:
RealInterceptorChain

public final class RealInterceptorChain implements Interceptor.Chain {
  private final List<Interceptor> interceptors;
  private final StreamAllocation streamAllocation;
  private final HttpCodec httpCodec;
  private final RealConnection connection;
  private final int index;
  private final Request request;
  private int calls;
  //构造方法就是进行一些赋值操作
  public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
      HttpCodec httpCodec, RealConnection connection, int index, Request request) {
    this.interceptors = interceptors;//拦截器集合
    this.connection = connection;//null
    this.streamAllocation = streamAllocation;//null
    this.httpCodec = httpCodec;//null
    this.index = index;//0
    this.request = request;//原始请求
  }
}

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    //其他省略。。。
    // Call the next interceptor in the chain.
    //由于streamAllocation, httpCodec, connection均为null
    //所以这部分代码就是依次将index+1创建新的拦截器链next,它区别于元chain的唯一不同点就是index下标
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);
    //对传入的所有拦截器遍历取出,顺序为0 => interceptors.size() - 1
    Interceptor interceptor = interceptors.get(index);
    //然后就是每一个拦截器都会对拦截器链拦截处理并最终返回response
    //这里有一个很有意思的事情:递归调用chain.proceed()
    //拦截器集合前size-1个拦截器interceptor的intercept方法都会调用RealInterceptorChain的proceed方法
    //于是就发生了每个拦截器都会对response进行各种的操作,并交给最后一个拦截器CallServerInterceptor的intercept方法
    //最终返回response(CallServerInterceptor的intercept方法不会继续递归)
    Response response = interceptor.intercept(next);
    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }
    return response;
  }

观察这部分源码可知:首先RealInterceptorChain构造方法就是进行一些赋值操作,然后proceed方法内,由于streamAllocation, httpCodec, connection均为null,所以新创建的每个新拦截器链next唯一区别于前一个就是index+1,接着从拦截器集合中逐一取出interceptor,并执行拦截intercept方法返回response.有意思的是,拦截器Interceptor(除了最后一个拦截器CallServerInterceptor)的拦截intercept方法都会调用chain.proceed方法,于是就发生了递归调用产生response.这里再叨叨一句,调用/返回过程如下:



至此,okhttp同步请求返回response已完。

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

分析同步方法时,上面源码只有最后一行未分析,那么直接看dispatcher的enqueue方法
Dispatcher#enqueue

synchronized void enqueue(AsyncCall call) {
    //当正在执行的任务数量小于64并且请求同一个主机小于5时,会把任务加入正在运行的异步队列
    //并启动线程池执行任务
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {//在不满足上面两个条件时,会把任务添加到待执行异步队列中
      readyAsyncCalls.add(call);
    }
  }

//计算AsyncCall对同一主机请求数量
private int runningCallsForHost(AsyncCall call) {
    int result = 0;
    for (AsyncCall c : runningAsyncCalls) {
      if (c.host().equals(call.host())) result++;
    }
    return result;
  }

分析同步方法时,已经顺带讲过异步的两个队列,所以这部分源码很简单,下面来了解一下AsyncCall,AsyncCall继承自NamedRunnable,下面看这两个类:

//它是一个runnable,只不过它可以设置当前线程名称
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }
  //把run抽象成execute方法,具体实现子类必须重写该方法
  protected abstract void execute();
}
//AsyncCall是RealCall的内部类,它继承自NamedRunnable,本质也是一个runnable
final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }
    //省略部分代码。。。
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

阅读这两个类源码可知:NamedRunnable实现Runnable接口,并抽象出excute方法,在run方法中设置当前线程名称。AsyncCall继承自NamedRunnable并重写excute方法,又调用getResponseWithInterceptorChain方法返回response,前面已经分析完,不再赘述。然后根据重定向的拦截器是否被取消决定回调成功或失败。并最终调用dispatcher.finish(call)释放任务。下面看dispatcher的finish方法:

void finished(AsyncCall call) {//是否异步方法
    finished(runningAsyncCalls, call, true);
  }

void finished(RealCall call) {//释放同步方法
    finished(runningSyncCalls, call, false);
  }

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {//同步代码块
      //如果从队列中remove任务失败则抛异常,在移除成功后继续后面代码
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //在异步时执行promoteCalls(),把待执行异步任务转移到在运行异步任务队列
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();//正在执行任务总数
      idleCallback = this.idleCallback;//赋值
    }
    //当线程池为空和回调不为空时,执行回调
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

private void promoteCalls() {//把待执行的异步任务添加到正在执行异步任务集合中,并从待执行集合中移除
    //如果正在执行的异步任务大于等于最大并发请求数,则继续等待
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    //如果待执行的异步任务为空,也继续等待
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    //只有正执行异步数量小于最大并发请求数64且待执行数量不为空时,进行遍历
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();//从待执行任务中取出call
      //当任务对同一主机发起请求数小于5时
      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();//从待执行任务队列移除
        runningAsyncCalls.add(call);//添加到正在执行任务队列
        executorService().execute(call);//线程池执行任务
      }
       //如果正在执行的异步任务大于等于最大并发请求数,则继续等待
      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

public synchronized int runningCallsCount() {//正在执行任务数量=同步任务数量+异步任务数量
    return runningAsyncCalls.size() + runningSyncCalls.size();
  }

从上面这部分源码可知:dispatcher.finish(call)就是把已执行任务从任务队列中移除,但是当执行的是异步任务时,它不仅会移除任务,它还会从待执行任务队列中取出任务添加到在执行任务队列。所以可以讲,不管是同步还是异步任务,它都以try...finally的方式最终执行finish(call),来结束当前call,并在异步任务执行时,从待执行任务队列取出任务加入正在执行任务队列,从而避免了wait/notify操作造成的额外开销。
至此,OkHttp的请求与响应流程已全部分析完毕。
下一篇详细阐述部分细节实现,期待一下!

上一篇下一篇

猜你喜欢

热点阅读