okhttp——任务模型

2019-04-18  本文已影响0人  oceanLong

简介

okhttp是Android中应用最广的http网络请求框架。结构优雅,性能强大。我们通过阅读它,对网络库的架构进行学习。本篇主要阅读okhttp任务队列的结构,了解okhttp的任务调度。

基本结构

网络请求是一个典型的生产/消费模型。我们的每一个请求都会加入一个缓冲队列,然后在合适的时候进行消费。

我们以异步请求为例:

基本架构

OKHttpClient提供方法生产Call,Dispatcher负责管理、分发和执行AsyncCall

下面,我们来细致地看一下代码。

详细代码

基本调用

还是以异步任务为例,一次典型的okhttp的调用:

String url = "http://wwww.baidu.com";
OkHttpClient okHttpClient = new OkHttpClient();
final Request request = new Request.Builder()
        .url(url)
        .get()//默认就是GET请求,可以不写
        .build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
        Log.d(TAG, "onFailure: ");
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {
        Log.d(TAG, "onResponse: " + response.body().string());
    }
});

我们可以看到,这个过程大致分为两部,生成一个新的call,将call加入队列。

Request

Request是okhttp中的http请求封装类,它管理了http协议中的参数,如Header,RequestBody等

class Request internal constructor(
  internal val url: HttpUrl,
  builder: Builder
) {
  internal val method: String = builder.method
  internal val headers: Headers = builder.headers.build()
  internal val body: RequestBody? = builder.body
  internal val tags: Map<Class<*>, Any> = Util.immutableMap(builder.tags)

  ......

}

OKHttpClient

OKHttpClient是okhttp中总管类,管理了okhttp几乎所有抽象模块和各种配置。


open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
  private val dispatcher: Dispatcher = builder.dispatcher
  private val proxy: Proxy? = builder.proxy
  private val protocols: List<Protocol> = builder.protocols
  private val connectionSpecs: List<ConnectionSpec> = builder.connectionSpecs
  private val interceptors: List<Interceptor> =
      Util.immutableList(builder.interceptors)
  private val networkInterceptors: List<Interceptor> =
      Util.immutableList(builder.networkInterceptors)
  private val eventListenerFactory: EventListener.Factory = builder.eventListenerFactory
  private val proxySelector: ProxySelector = builder.proxySelector
  private val cookieJar: CookieJar = builder.cookieJar
  private val cache: Cache? = builder.cache
  private val socketFactory: SocketFactory = builder.socketFactory
  private val sslSocketFactory: SSLSocketFactory?
  private val hostnameVerifier: HostnameVerifier = builder.hostnameVerifier
  private val certificatePinner: CertificatePinner
  private val proxyAuthenticator: Authenticator = builder.proxyAuthenticator
  private val authenticator: Authenticator = builder.authenticator
  private val connectionPool: ConnectionPool = builder.connectionPool
  private val dns: Dns = builder.dns
  private val followSslRedirects: Boolean = builder.followSslRedirects
  private val followRedirects: Boolean = builder.followRedirects
  private val retryOnConnectionFailure: Boolean = builder.retryOnConnectionFailure
  private val callTimeout: Int = builder.callTimeout
  private val connectTimeout: Int = builder.connectTimeout
  private val readTimeout: Int = builder.readTimeout
  private val writeTimeout: Int = builder.writeTimeout
  private val pingInterval: Int = builder.pingInterval
  private val internalCache: InternalCache? = builder.internalCache
  private val certificateChainCleaner: CertificateChainCleaner?
  ......

  /** Prepares the [request] to be executed at some point in the future. */
  override fun newCall(request: Request): Call {
    return RealCall.newRealCall(this, request, false /* for web socket */)
  }

}

RealCall

RealCall是okhttp中任务的封装类

  companion object {
    fun newRealCall(
      client: OkHttpClient,
      originalRequest: Request,
      forWebSocket: Boolean
    ): RealCall {
      // Safely publish the Call instance to the EventListener.
      return RealCall(client, originalRequest, forWebSocket).apply {
        transmitter = Transmitter(client, this)
      }
    }
  }

internal class RealCall private constructor(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers.  */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
  /**
   * There is a cycle between the [Call] and [Transmitter] that makes this awkward.
   * This is set after immediately after creating the call instance.
   */
  private lateinit var transmitter: Transmitter

  // Guarded by this.
  var executed: Boolean = false

  @Synchronized override fun isExecuted(): Boolean = executed

  override fun isCanceled(): Boolean = transmitter.isCanceled

  override fun request(): Request = originalRequest

  override fun execute(): Response {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    transmitter.timeoutEnter()
    transmitter.callStart()
    try {
      client.dispatcher().executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher().finished(this)
    }
  }

  override fun enqueue(responseCallback: Callback) {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    transmitter.callStart()
    client.dispatcher().enqueue(AsyncCall(responseCallback))
  }

  override fun cancel() {
    transmitter.cancel()
  }
...
}

在异步任务的例子中,我们通常使用enqueue将我们的请求加入请求队列。
client.dispatcher().enqueue(AsyncCall(responseCallback))
这句话中,我们可以看到,我们新建了AsyncCall加入到OKHttpClient的dispatcher的队列中。

我们先看AsyncCall封装了什么

  internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : NamedRunnable("OkHttp %s", redactedUrl()) {
    @Volatile private var callsPerHost = AtomicInteger(0)

    fun callsPerHost(): AtomicInteger = callsPerHost

    fun reuseCallsPerHostFrom(other: AsyncCall) {
      this.callsPerHost = other.callsPerHost
    }

    fun host(): String = originalRequest.url().host()

    fun request(): Request = originalRequest

    fun get(): RealCall = this@RealCall

    /**
     * Attempt to enqueue this async call on `executorService`. This will attempt to clean up
     * if the executor has been shut down by reporting the call as failed.
     */
    fun executeOn(executorService: ExecutorService) {
      assert(!Thread.holdsLock(client.dispatcher()))
      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        transmitter.noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher().finished(this) // This call is no longer running!
        }
      }
    }

    override fun execute() {
      var signalledCallback = false
      transmitter.timeoutEnter()
      try {
        val response = getResponseWithInterceptorChain()
        signalledCallback = true
        responseCallback.onResponse(this@RealCall, response)
      } catch (e: IOException) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
        } else {
          responseCallback.onFailure(this@RealCall, e)
        }
      } finally {
        client.dispatcher().finished(this)
      }
    }
  }

我们可以看到,AsyncCall主要提供了executeOnexecute两个方法。

executeOn

executeOnAsyncCall自身的方法。它的主要内容是对异常的处理。如果一切正常,就用传入的ExecutorService执行当前AsyncCall

execute

execute是实现的Runnable的run方法。

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

我们可以看到,这个方法,只是为了给线程起名。
AsyncCall在线程池中执行时,execute方法就会被调用。execute方法的主要逻辑是:
getResponseWithInterceptorChain,它的实现,我们再不展开,属于okhttp网络能力了。

铺垫了上面这么多,我们终于可以看看okhttp的任务队列如何设计的了。

Dispatcher

  private var idleCallback: Runnable? = null

  /** Executes calls. Created lazily.  */
  private var executorService: ExecutorService? = null

  /** Ready async calls in the order they'll be run.  */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet.  */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running synchronous calls. Includes canceled calls that haven't finished yet.  */
  private val runningSyncCalls = ArrayDeque<RealCall>()

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.get().forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host())
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

这一段主要是AsyncCall的callsPerHost复用逻辑,注释说明比较清晰。如果有相同的AsyncCall存在于runningAsyncCalls中,则会增加callsPerHost。接下来,看一下promoteAndExecute的实现:

  private fun promoteAndExecute(): Boolean {
    assert(!Thread.holdsLock(this))

    val executableCalls = ArrayList<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost().incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService())
    }

    return isRunning
  }

promoteAndExecute方法,是将readyAsyncCalls队列中的任务,在最大任务数没有超标的情况下,移入runningAsyncCalls队列中。并对涉及转移的方法,调用executeOn方法。executeOn被调用后,就会执行到,RealCall中AsyncCall中的execute方法了。

小结

以上就是okhttp在请求和任务上的基本结构,还没有涉及到具体的网络请求。核心类是RealCallDispatcher,通过ArrayDequeAsyncCall完成任务的调度。

如有问题,欢迎指正。

上一篇下一篇

猜你喜欢

热点阅读