android

okhttp源码解析--dispatcher

2019-02-12  本文已影响31人  二妹是只猫
Okhttpclient流程图.png

1. dispatcher到底是什么、做了什么?

public final class Dispatcher {
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;

  /** Executes calls. Created lazily. */
  private @Nullable ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

通过上一篇的学习了解到不论是同步还是异步请求,最终调用的都是dispatcher中的方法,并分别将请求线程AsyncCall添加到异步队列(readyAsyncCalls、runningAsyncCalls)和同步队列(runningSyncCalls)中.

 synchronized void enqueue(AsyncCall call) {
   if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
     runningAsyncCalls.add(call);
     executorService().execute(call);
   } else {
     readyAsyncCalls.add(call);
   }
 }
/** Used by {@code Call#execute} to signal it is in-flight. */
 synchronized void executed(RealCall call) {
   runningSyncCalls.add(call);
 }
 public synchronized ExecutorService executorService() {
   if (executorService == null) {
     executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
   }
   return executorService;
 }

executorService()创建了一个线程池,在通过上面两张代码,不难看出Dispatcher主要工作就是维护请求队列和通过线程池来执行网络请求

1.1 Dispatcher的readyAsyncCalls、runningAsyncCalls等队列是怎样进行控制的?

从上一段代码中可以看到同步方法executed很简单直接将请求添加到同步队列中(runningSyncCalls)。异步请求也只有几行代码,判断了下是否小于最大请求数(maxRequests)和小于正在进行中的网络请求最大值(maxRequestsPerHost),是--就添加到进行中异步队列(runningAsyncCalls)并执行线程池,否者就添加到异步等待队列(readyAsyncCalls)
看到这里我就产生了一个疑问,这里只是对消息队列进行了简单的添加,并没有和我们的线程池产生联系啊!不着急,我们继续。上一段代码我们以异步请求enqueue 方法继续分析 :当请求满足条件时执行了

runningAsyncCalls.add(call);
executorService().execute(call);

这两步操作:第一步添加到执行中的异步请求队列(runningAsyncCalls),第二步线程池直接执行当前网络请求(注意executorService()返回线程池、AsyncCall(继承自NamedRunnable)就是一个线程哦,这俩在上一篇中已介绍过就不多说了)

/**
 * Runnable implementation which always sets its thread name.
 */
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

当线程池执行网络请求时,实际就是执行了子线程的run()方法,AsyncCall继承的NamedRunnable就是一个Runnable,在它的run()方法中执行了它的抽象方法execute()
我们继续到AsyncCall中去看看execute()的具体实现:

  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

retryAndFollowUpInterceptor.isCanceled()首先判断请求是否取消,
取消就调用 responseCallbackonFailure(...)失败方法,
否则responseCallbackonResponse(...)返回数据。这样一个异步请求就完成了。
最终在finlly看到了这段代码client.dispatcher().finished(this)调用了dispatcher的finished方法

  /** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

  /** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

首先将执行完成的网络请求从runningSyncCalls中移除,第二步通过promoteCalls判断是否是异步请求,是就执行promoteCalls()

 private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }
  /** Returns the number of running calls that share a host with {@code call}. */
  private int runningCallsForHost(AsyncCall call) {
    int result = 0;
    for (AsyncCall c : runningAsyncCalls) {
      if (c.host().equals(call.host())) result++;
    }
    return result;
  }

首先判断请求进行中的异步队列(runningAsyncCalls)达到最大数量就return,异步等待队列(readyAsyncCalls)是为空就return。
然后遍历异步等待队列(readyAsyncCalls)

简单的总结一下:
executorService().execute(call)-->AsyncCall执行execute()-->dispatcher().finished(this)--> executorService().execute(call)

上一篇下一篇

猜你喜欢

热点阅读