OkHttp源码解析之调度器Dispatcher

2019-03-05  本文已影响14人  dlihasa
什么是Dispatcher

Dispatcher的作用是维护众多请求的状态,并维护一个线程池,用于执行请求,发送的同步/异步请求都会在dispatcher中管理其状态。

从源码分析Dispatcher

(1)Dispatcher的成员变量

private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;

  /** Executes calls. Created lazily. */
  private @Nullable ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

① ExecutorService线程池,用于执行请求。
② readyAsyncCalls 异步请求的就绪队列,用于保存等待执行的请求。
③ runningAsyncCalls 异步请求的执行队列,包括已经取消(cancel)但是还没被finish掉的请求。
④ runningSyncCalls 同步请求的执行队列,包括已经取消(cancel)但是还没被finish掉的请求。

(2)同步请求中Dispatcher的作用

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }
  }

在上述代码中,主要有两处:
① client.dispatcher().executed(this);跟踪源码,可以发现在Dispatcher中实际执行了下面这个代码

/** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

将同步请求加入到同步请求执行队列中。
② 请求执行完毕之后,在finally中执行了client.dispatcher().finished(this);跟踪源码,可以发现在Dispatcher中实际执行了下面代码

/** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

对此上一篇分析中已经说过,有需要的可以去上一篇中查看。

(3)异步请求中Dispatcher的作用

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

在上面代码中看最后一句 client.dispatcher().enqueue(new AsyncCall(responseCallback));跟踪源码,如下:

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

上一篇文章中也分析过了,在异步请求队列中请求数量小于设置的最大请求数量且对同一主机的请求数量小于设置的最大同一主机请求数量时,将请求添加到异步请求执行队列中,并且线程池执行该请求,否则将请求添加到异步请求就绪队列中等待。

关于线程池,来看一下其定义:

public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

ThreadPoolExecutor第一个参数为0,代表该线程池中核心线程数为0,如果空余一段时间,该线程池会将全部线程销毁。第二个参数为最大线程数。

那么readyAsyncCalls中的请求什么时候才能进入runningAsyncCalls队列中执行呢?一定是执行队列中有请求执行完毕并移出之后,那么我们来看下runningAsyncCalls队列中的请求执行代码executorService().execute(call);上一篇我们已经知道了具体执行的代码为RealCall的内部类AsyncCall线程中的执行代码,直接查看:

 @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

我们可以注意到在整个的方法完成之后finally中的这句client.dispatcher().finished(this);跟踪源码可以得到

void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

关键部分来了,calls.remove(call)先将已经结束的请求从正在执行的请求队列中移除,然后if (promoteCalls) promoteCalls();在异步时条件判断条件一直为true,promoteCalls()方法调整请求队列。

private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

经过一系列的判断,正在执行的异步请求队列元素个数小于最大请求数且就绪等待队列不为空,循环就绪等待队列,取出加入到正在执行队列,并且线程池执行该请求,循环跳出条件为最后一句:达到最大可请求数。

【END】OkHttp中关于调度器的部分就介绍到这里了。

上一篇下一篇

猜你喜欢

热点阅读