OkHttp3原理浅析<一>

2021-01-11  本文已影响0人  慕_先生

OkHttp是现在Android使用最频繁的网络请求框架.Retrofit也是使用OKHttp的网络框架。
现在来看下okhtp的流程图


OkHttp流程图.png

本次贴的代码是okhttp3.10.0

异步流程是:

1.RealCall(Call的唯一实现)包装了Request,途经 分发器 Dispatcher
2.分发器会判断是加入运行队列还是等待队列(最大请求连接不超过64 并且 同一域名不超过5个 会进入运行中队列,否者就进入等待队列),下一步到拦截器 Interceptors
3.Okhttp内部默认5大拦截器,拦截器通过责任链模式依次传递请求和响应

同步流程:

同步是直接添加一个队列,发起网络请求然后阻塞,等待这个请求完成才会执行下一个任务(也就是请求)

先看OkHttpClient的newCall方法,返回的是Call,Call是接口,它的唯一实现是RealCall,所以调用的方法都在RealCall中实现

@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }
final class RealCall implements Call {
    @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }
  }

  private void captureCallStackTrace() {
    Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
    retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
  }

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
}

然后RealCall会调用同步 executed() 或者 异步 enqueue() 方法
调用异步的方法,RealCall会传一个包装了回调的 AsyncCall,AsyncCall是继承了NamedRunnable,NamedRunnable 实现了 Runnable ,所以 AsyncCall 实际上是一个任务

final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        //拦截器责任链
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          eventListener.callFailed(RealCall.this, e);
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }
}



最后会调用到分发器的 executed() 和 enqueue() 方法

public final class Dispatcher {
    //Okhttp默认最大请求数
    private int maxRequests = 64;
    //Okhttp默认域名最大请求数
    private int maxRequestsPerHost = 5;
    private @Nullable Runnable idleCallback;

    //线程池
    private @Nullable ExecutorService executorService;

    //异步等待队列
    private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

    //异步运行队列
    private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

    //同步队列
    private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

    public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
    }

    public Dispatcher() {
    }

    public synchronized ExecutorService executorService() {
      if (executorService == null) {
        //创建线程池 配置核心线程数为0,最大线程数为Integer.MAX_VALUE,并且是SynchronousQueue队列
        //所以导致任务入队就会失败,马上就创建线程来跑任务,同时也有运行队列来限制请求数
        //所以这样创建线程池会提高线程池的吞吐量,高并发
        executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
      }
      return executorService;
    }

    synchronized void enqueue(AsyncCall call) {
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);
          executorService().execute(call);
        } else {
          readyAsyncCalls.add(call);
      }
    }
}

  private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // 运行队列还是大于最大请求数64,直接return
    if (readyAsyncCalls.isEmpty()) return; 

      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
        //判断同一域名不超过5个
          if (runningCallsForHost(call) < maxRequestsPerHost) {
              i.remove();
              runningAsyncCalls.add(call);
              executorService().execute(call);
          }

        if (runningAsyncCalls.size() >= maxRequests) return; // 
    }
    
    synchronized void executed(RealCall call) {
      runningSyncCalls.add(call);
    }
  
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //只有异步才会执行这一步
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
      }
    }
 }

executed没什么好说的加入队列,执行网络请求,现在说enqueue
Dispatcher在执行任务前先判断 运行队列最大请求不超过64个 并且 同一个域名不超过5个 才会添加到运行队列 否者放入等待队列。
符合以上条件后会放入运行队列并执行线程池任务 executorService().execute(call)
Okhttp的线程池是一个高并发高吞吐量的线程池,因为核心线程数为0,最大线程数为Integer.MAX_VALUE,使用的是SynchronousQueue队列,SynchronousQueue 也是一个队列的,但它的特别之处在于它内部没有容器,每次任务往队列里面添加的时候都会失败,然后会新建线程来跑任务,同时又有64最大请求数和最多5个域名请求限制,所以实现了高并发高吞吐量
执行完上列操作后,后面会执行Dispatcher的finished方法,在这里面会从队列里面移除call

//异步完成
finished(runningAsyncCalls, call, true);
//同步完成
finished(runningSyncCalls, call, false);

如果是异步,会调用promoteCalls方法,从readyAsyncCalls中去获取任务添加到runningAsyncCalls中,当然也有64最大请求和最多5个同一域名的限制
也可以设置Dispatcher最大请求数和域名

public synchronized void setMaxRequests(int maxRequests)
public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost)
上一篇 下一篇

猜你喜欢

热点阅读