网络

Android之OkHttp源码解析(1)- 请求过程

2019-11-19  本文已影响0人  居居居居居居x

前言

此篇文章将会解析OkHttp的源码,因为OkHttp的的源码有很多部分,我们这篇来讲解OkHttp的网络请求,我们还是从OkHttp的异步操作开始解析,OkHttp版本为3.7。

OkHttp的请求网络流程

当我们要请求网络的时候需要调用OkHttpClient的newCall(request),然后在进行execute()或者enqueue()操作,当调用newCall()方法时候,会调用如下代码。

@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
}

它其实返回的是一个RealCall类,我们调用enqueue()异步请求方法实际上是调用了RealCall的enqueue()方法。

//RealCall的enqueue()方法源码
//从这里开始调用
@Override public void enqueue(Callback responseCallback) {
    enqueue(responseCallback, false);
}

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    //这里交给dispatcher进行调度
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

从这里源码可以看到,最终的请求是dispatcher的enqueue()来完成的,我们接下来就来分析这个。

public final class Dispatcher {

    //最大并发请求数
    private int maxRequests = 64;
    //每个主机的最大请求数
    private int maxRequestsPerHost = 5;
    
    /** Executes calls. Created lazily. */
    //消费者线程池
    private ExecutorService executorService;

    /** Ready async calls in the order they'll be run. */
    //将要运行的异步消息队列
    private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

    /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
    //正在运行的异步请求队列
    private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

    /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
    //正在运行的同步请求队列
    private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

    ......
}

上面为Dispatcher类的部分代码,主要用于控制并发的请求,接下来我们看看Dispatcher的构造方法。

public Dispatcher(ExecutorService executorService) {
  this.executorService = executorService;
}

public Dispatcher() {
}

public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
}

Dispatcher有两个构造方法,可以使用自己设定的线程池,也如果没有线程池,则会在正式请求网络前调用executorService()来创建默认线程池,这个线程池类似于CachedThreadPool,比较适合执行大量的耗时比较少的额任务。然后我们回到前面之前讲过的就是当RealCall调用enqueue()方法时候,实际上就是调用了Dispatcher的enqueue()方法,它的代码如下。

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      //这里通过executorService()创建线程池然后通过线程池的execute()会开始执行AsyncCall的execute()
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
}

当正在运行的异步请求队列的数量小于64并且正在运行的请求主机数小于5时,把请求加载到到runningAsyncCalls中并在线程池中执行,否则就加在到readyAsyncCalls中进行缓存等待,线程池中传进来的参数是AsyncCall,它是RealCall的内部类,其内部也实现了execute()方法,代码如下。

//AsyncCall的execute()方法源码
@Override protected void execute() {
  boolean signalledCallback = false;
  try {
    ......
  } catch (IOException e) {
    ......
  } finally {
    client.dispatcher().finished(this);
  }
}

无论这个请求的结果如何,都会执行finaly块里面的代码,我们来看看里面是什么。

void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
}

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
    //移除队列
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    //这里开始检查是否为异步请求,检查等候的队列,readyAsyncCalls
    if (promoteCalls) promoteCalls();
    runningCallsCount = runningCallsCount();
    idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
        idleCallback.run();
    }
}

private void promoteCalls() {
    //检查 运行队列 和 等待队列
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
    //将等待队列加入到运行队列中
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall call = i.next();
        //相同host的请求没有达到最大,加入到运行队列
        if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
    }

    if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
}

这里是通过调度器移除队列,并且判断是否存在等待队列,如果存在,检查执行队列是否达到最大值,如果没有将等待队列变为执行队列。这样也就确保了等待队列被执行。

好了接下来我们回到AsyncCall这个类上面来,我们会发现它继承了NamedRunnable。

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = String.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

然后发现了,其实本质上AsyncCall就是一个Runnable,线程池会执行AsyncCall的execute()方法。接下来我们来看看AsyncCall的execute()源码。

//AsyncCall的execute()源码
@Override protected void execute() {
  boolean signalledCallback = false;
  try {
    Response response = getResponseWithInterceptorChain();
    if (retryAndFollowUpInterceptor.isCanceled()) {
      signalledCallback = true;
      responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
    } else {
      signalledCallback = true;
      responseCallback.onResponse(RealCall.this, response);
    }
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      responseCallback.onFailure(RealCall.this, e);
    }
  } finally {
    client.dispatcher().finished(this);
  }
}

从这里我们发现getResponseWithInterceptorChain()返回一个response,很明显这是在请求网络,我们就着重看这个。我们先看getResponseWithInterceptorChain()的源码。

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    //责任链
    List<Interceptor> interceptors = new ArrayList<>();
    配置OkhttpClient时设置的拦截器,可以由用户自己设置。
    interceptors.addAll(client.interceptors());
    负责处理失败后的充实与重定向。
    interceptors.add(retryAndFollowUpInterceptor);
    负责把用户构造的请求装换为发送到服务器的请求。
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //缓存处理配置
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //连接服务器,负责和服务器建立连接,这里才是真正的请求网络
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
        //配置OkHttpClient时设置的networkInterceptors
        interceptors.addAll(client.networkInterceptors());
    }
    //执行流操作
    interceptors.add(new CallServerInterceptor(forWebSocket));
    //创建责任链
    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    //执行责任链
    return chain.proceed(originalRequest);
}

我们发现,这里其实是运用了责任链模式,这里也是OkHttp最精髓的地方。从上述代码中,可以看出都实现了Interceptor接口,这是okhttp最核心的部分,采用责任链的模式来使每个功能分开,每个Interceptor(拦截器)自行完成自己的任务,并且将不属于自己的任务交给下一个,简化了各自的责任和逻辑。我们暂时先不看拦截器的实现,这里接着往下看,这里是通过RealInterceptorChain创建了责任链,也就是接来执行RealInterceptorChain的proceed()方法。

@Override public Response proceed(Request request) throws IOException {
    return proceed(request, streamAllocation, httpCodec, connection);
}

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
    RealConnection connection) throws IOException {
        
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
        throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
            + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
        throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
            + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    //这里是具体实现
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
        throw new IllegalStateException("network interceptor " + interceptor
            + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
        throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    return response;
}

从这段实现可以看出,是按照添加到 interceptors 集合的顺序,逐个往下调用拦截器的intercept()方法,然后最终返回处理后的response。到这里其实整个网络请求的整个流程就到此结束了,其实重点就是里面的拦截器的具体实现。关于拦截器后面我会单独来写一篇来讲述里面每个拦截器的实现。这里最后放一张网络请求的流程图。

OkHttp的请求过程

参考

上一篇下一篇

猜你喜欢

热点阅读